1 // the caching lock server implementation
3 #include "lock_server.h"
7 lock_state::lock_state():
12 lock_state::lock_state(const lock_state & other) {
16 lock_state & lock_state::operator=(const lock_state & o) {
19 wanted_by = o.wanted_by;
20 old_requests = o.old_requests;
24 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
25 lock sl(lock_table_lock);
26 // this will create the lock if it doesn't already exist
27 return lock_table[lid];
30 lock_server::lock_server(rsm & r) : rsm_ (&r) {
31 thread(&lock_server::revoker, this).detach();
32 thread(&lock_server::retryer, this).detach();
33 r.set_state_transfer(this);
35 r.reg(lock_protocol::acquire, &lock_server::acquire, this);
36 r.reg(lock_protocol::release, &lock_server::release, this);
37 r.reg(lock_protocol::stat, &lock_server::stat, this);
40 void lock_server::revoker () {
42 lock_protocol::lockid_t lid = revoke_fifo.deq();
43 LOG << "Revoking " << lid;
44 if (rsm_ && !rsm_->amiprimary())
47 lock_state & st = get_lock_state(lid);
49 holder_t held_by = st.held_by;
52 if (auto cl = rpcc::bind_cached(held_by.first)) {
54 auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
55 LOG << "Revoke returned " << ret;
60 void lock_server::retryer() {
62 lock_protocol::lockid_t lid = retry_fifo.deq();
63 if (rsm_ && !rsm_->amiprimary())
66 LOG << "Sending retry for " << lid;
67 lock_state & st = get_lock_state(lid);
71 if (st.wanted_by.empty())
73 front = st.wanted_by.front();
76 if (auto cl = rpcc::bind_cached(front.first)) {
78 auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
79 LOG << "Retry returned " << ret;
84 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
85 LOG << "lid=" << lid << " client=" << id << "," << xid;
86 holder_t h = holder_t(id, xid);
87 lock_state & st = get_lock_state(lid);
90 // deal with duplicated requests
91 if (st.old_requests.count(id)) {
92 lock_protocol::xid_t old_xid = st.old_requests[id];
94 return lock_protocol::RPCERR;
95 else if (old_xid == xid) {
96 if (st.held && st.held_by == h) {
97 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
98 return lock_protocol::OK;
103 // grant the lock if it's available and I'm next in line
104 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
105 if (!st.wanted_by.empty())
106 st.wanted_by.pop_front();
107 st.old_requests[id] = xid;
111 LOG << "Lock " << lid << " held by " << h.first;
112 if (st.wanted_by.size())
113 revoke_fifo.enq(lid);
114 return lock_protocol::OK;
119 for (auto p : st.wanted_by) {
121 // make sure client is obeying serialization
122 if (p.second != xid) {
123 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
124 return lock_protocol::RPCERR;
131 st.wanted_by.push_back(h);
133 LOG << "wanted_by=" << st.wanted_by;
135 // send revoke if we're first in line
136 if (st.wanted_by.front() == h)
137 revoke_fifo.enq(lid);
139 return lock_protocol::RETRY;
142 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
143 LOG << "lid=" << lid << " client=" << id << "," << xid;
144 lock_state & st = get_lock_state(lid);
146 if (st.held && st.held_by == holder_t(id, xid)) {
148 LOG << "Lock " << lid << " not held";
150 if (st.wanted_by.size())
152 return lock_protocol::OK;
155 string lock_server::marshal_state() {
156 lock sl(lock_table_lock);
157 return marshall(nacquire, lock_table).content();
160 void lock_server::unmarshal_state(const string & state) {
161 lock sl(lock_table_lock);
162 unmarshall(state, false, nacquire, lock_table);
165 lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
166 LOG << "stat request for " << lid;
169 return lock_protocol::OK;