1 // the caching lock server implementation
3 #include "include/lock_server.h"
7 lock_server::lock_state::lock_state(const lock_state & other) {
9 held_by = other.held_by;
10 wanted_by = other.wanted_by;
11 old_requests = other.old_requests;
14 lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
15 lock sl(lock_table_lock);
16 // this will create the lock if it doesn't already exist
17 return lock_table[lid];
20 lock_server::lock_server(rsm & r) : rsm_ (&r) {
21 thread(&lock_server::revoker, this).detach();
22 thread(&lock_server::retryer, this).detach();
23 r.set_state_transfer(this);
25 r.reg(lock_protocol::acquire, &lock_server::acquire, this);
26 r.reg(lock_protocol::release, &lock_server::release, this);
29 void lock_server::revoker () {
31 lock_protocol::lockid_t lid = revoke_fifo.deq();
32 LOG << "Revoking " << lid;
33 if (rsm_ && !rsm_->amiprimary())
36 lock_state & st = get_lock_state(lid);
38 holder_t held_by = st.held_by;
41 if (auto cl = rpcc::bind_cached(held_by.first)) {
43 auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
44 LOG << "Revoke returned " << ret;
49 void lock_server::retryer() {
51 lock_protocol::lockid_t lid = retry_fifo.deq();
52 if (rsm_ && !rsm_->amiprimary())
55 LOG << "Sending retry for " << lid;
56 lock_state & st = get_lock_state(lid);
60 if (st.wanted_by.empty())
62 front = st.wanted_by.front();
65 if (auto cl = rpcc::bind_cached(front.first)) {
67 auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
68 LOG << "Retry returned " << ret;
73 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
74 LOG << "lid=" << lid << " client=" << id << "," << xid;
75 holder_t h = holder_t(id, xid);
76 lock_state & st = get_lock_state(lid);
79 // deal with duplicated requests
80 if (st.old_requests.count(id)) {
81 lock_protocol::xid_t old_xid = st.old_requests[id];
83 return lock_protocol::RPCERR;
84 else if (old_xid == xid) {
85 if (st.held && st.held_by == h) {
86 LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
87 return lock_protocol::OK;
92 // grant the lock if it's available and I'm next in line
93 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
94 if (!st.wanted_by.empty())
95 st.wanted_by.pop_front();
96 st.old_requests[id] = xid;
100 LOG << "Lock " << lid << " held by " << h.first;
101 if (st.wanted_by.size())
102 revoke_fifo.enq(lid);
103 return lock_protocol::OK;
108 for (auto p : st.wanted_by) {
110 // make sure client is obeying serialization
111 if (p.second != xid) {
112 LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
113 return lock_protocol::RPCERR;
120 st.wanted_by.push_back(h);
122 LOG << "wanted_by=" << st.wanted_by;
124 // send revoke if we're first in line
125 if (st.wanted_by.front() == h)
126 revoke_fifo.enq(lid);
128 return lock_protocol::RETRY;
131 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
132 LOG << "lid=" << lid << " client=" << id << "," << xid;
133 lock_state & st = get_lock_state(lid);
135 if (st.held && st.held_by == holder_t(id, xid)) {
137 LOG << "Lock " << lid << " not held";
139 if (st.wanted_by.size())
141 return lock_protocol::OK;
144 string lock_server::marshall_state() {
145 lock sl(lock_table_lock);
146 return marshall(nacquire, lock_table);
149 void lock_server::unmarshall_state(const string & state) {
150 lock sl(lock_table_lock);
151 unmarshall(state, nacquire, lock_table);