1 // the caching lock server implementation
3 #include "lock_server.h"
7 #include "lang/verify.h"
10 #include "rpc/marshall.h"
13 using std::ostringstream;
14 using std::istringstream;
17 lock_state::lock_state():
22 lock_state::lock_state(const lock_state &other) {
26 lock_state& lock_state::operator=(const lock_state& o) {
29 wanted_by = o.wanted_by;
30 old_requests = o.old_requests;
34 marshall & operator<<(marshall &m, const lock_state &d) {
35 return m << d.held << d.held_by << d.wanted_by;
38 unmarshall & operator>>(unmarshall &u, lock_state &d) {
39 return u >> d.held >> d.held_by >> d.wanted_by;
42 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
43 lock sl(lock_table_lock);
44 // by the semantics of map, this will create
45 // the lock if it doesn't already exist
46 return lock_table[lid];
49 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
50 std::thread(&lock_server::revoker, this).detach();
51 std::thread(&lock_server::retryer, this).detach();
52 rsm->set_state_transfer(this);
55 void lock_server::revoker() [[noreturn]] {
57 lock_protocol::lockid_t lid;
58 revoke_fifo.deq(&lid);
59 LOG("Revoking " << lid);
60 if (rsm && !rsm->amiprimary())
63 lock_state &st = get_lock_state(lid);
73 //while (t-- && !proxy)
74 proxy = handle(held_by.first).safebind();
77 rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
78 LOG("Revoke returned " << ret);
83 void lock_server::retryer() [[noreturn]] {
85 lock_protocol::lockid_t lid;
87 if (rsm && !rsm->amiprimary())
90 LOG("Sending retry for " << lid);
91 lock_state &st = get_lock_state(lid);
95 if (st.wanted_by.empty())
97 front = st.wanted_by.front();
100 rlock_protocol::status ret = -1;
105 //while (t-- && !proxy)
106 proxy = handle(front.first).safebind();
109 ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
110 LOG("Retry returned " << ret);
115 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
116 LOG_FUNC_ENTER_SERVER;
117 holder h = holder(id, xid);
118 lock_state &st = get_lock_state(lid);
121 // deal with duplicated requests
122 if (st.old_requests.count(id)) {
123 lock_protocol::xid_t old_xid = st.old_requests[id];
125 return lock_protocol::RPCERR;
126 else if (old_xid == xid) {
127 if (st.held && st.held_by == h) {
128 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
129 return lock_protocol::OK;
134 // grant the lock if it's available and I'm next in line
135 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
136 if (!st.wanted_by.empty())
137 st.wanted_by.pop_front();
138 st.old_requests[id] = xid;
142 LOG("Lock " << lid << " held by " << h.first);
143 if (st.wanted_by.size())
144 revoke_fifo.enq(lid);
145 return lock_protocol::OK;
150 for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
151 if (i->first == id) {
152 // make sure client is obeying serialization
153 if (i->second != xid) {
154 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
155 return lock_protocol::RPCERR;
162 st.wanted_by.push_back(h);
164 LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
166 // send revoke if we're first in line
167 if (st.wanted_by.front() == h)
168 revoke_fifo.enq(lid);
170 return lock_protocol::RETRY;
173 int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
174 LOG_FUNC_ENTER_SERVER;
175 lock_state &st = get_lock_state(lid);
177 if (st.held && st.held_by == holder(id, xid)) {
179 LOG("Lock " << lid << " not held");
181 if (st.wanted_by.size())
183 return lock_protocol::OK;
186 string lock_server::marshal_state() {
187 lock sl(lock_table_lock);
194 void lock_server::unmarshal_state(string state) {
195 lock sl(lock_table_lock);
196 unmarshall rep(state);
201 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
202 LOG("stat request for " << lid);
205 return lock_protocol::OK;