1 // the caching lock server implementation
3 #include "lock_server.h"
7 #include "lang/verify.h"
9 #include "threaded_log.h"
10 #include "rpc/marshall.h"
13 using std::ostringstream;
14 using std::istringstream;
17 lock_state::lock_state():
22 lock_state::lock_state(const lock_state &other) {
26 lock_state& lock_state::operator=(const lock_state& o) {
29 wanted_by = o.wanted_by;
30 old_requests = o.old_requests;
34 marshall & operator<<(marshall &m, const lock_state &d) {
35 return m << d.held << d.held_by << d.wanted_by;
38 unmarshall & operator>>(unmarshall &u, lock_state &d) {
39 return u >> d.held >> d.held_by >> d.wanted_by;
42 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
43 lock sl(lock_table_lock);
44 // by the semantics of map, this will create
45 // the lock if it doesn't already exist
46 return lock_table[lid];
49 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
50 std::thread(&lock_server::revoker, this).detach();
51 std::thread(&lock_server::retryer, this).detach();
52 rsm->set_state_transfer(this);
55 void lock_server::revoker() [[noreturn]] {
57 lock_protocol::lockid_t lid;
58 revoke_fifo.deq(&lid);
59 LOG("Revoking " << lid);
60 if (rsm && !rsm->amiprimary())
63 lock_state &st = get_lock_state(lid);
73 //while (t-- && !proxy)
74 proxy = handle(held_by.first).safebind();
77 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
78 LOG("Revoke returned " << ret);
83 void lock_server::retryer() [[noreturn]] {
85 lock_protocol::lockid_t lid;
87 if (rsm && !rsm->amiprimary())
90 LOG("Sending retry for " << lid);
91 lock_state &st = get_lock_state(lid);
95 if (st.wanted_by.empty())
97 front = st.wanted_by.front();
103 //while (t-- && !proxy)
104 proxy = handle(front.first).safebind();
107 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
108 LOG("Retry returned " << ret);
113 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
114 LOG_FUNC_ENTER_SERVER;
115 holder h = holder(id, xid);
116 lock_state &st = get_lock_state(lid);
119 // deal with duplicated requests
120 if (st.old_requests.count(id)) {
121 lock_protocol::xid_t old_xid = st.old_requests[id];
123 return lock_protocol::RPCERR;
124 else if (old_xid == xid) {
125 if (st.held && st.held_by == h) {
126 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
127 return lock_protocol::OK;
132 // grant the lock if it's available and I'm next in line
133 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
134 if (!st.wanted_by.empty())
135 st.wanted_by.pop_front();
136 st.old_requests[id] = xid;
140 LOG("Lock " << lid << " held by " << h.first);
141 if (st.wanted_by.size())
142 revoke_fifo.enq(lid);
143 return lock_protocol::OK;
148 for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
149 if (i->first == id) {
150 // make sure client is obeying serialization
151 if (i->second != xid) {
152 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
153 return lock_protocol::RPCERR;
160 st.wanted_by.push_back(h);
162 LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
164 // send revoke if we're first in line
165 if (st.wanted_by.front() == h)
166 revoke_fifo.enq(lid);
168 return lock_protocol::RETRY;
171 int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
172 LOG_FUNC_ENTER_SERVER;
173 lock_state &st = get_lock_state(lid);
175 if (st.held && st.held_by == holder(id, xid)) {
177 LOG("Lock " << lid << " not held");
179 if (st.wanted_by.size())
181 return lock_protocol::OK;
184 string lock_server::marshal_state() {
185 lock sl(lock_table_lock);
192 void lock_server::unmarshal_state(string state) {
193 lock sl(lock_table_lock);
194 unmarshall rep(state);
199 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
200 LOG("stat request for " << lid);
203 return lock_protocol::OK;