1 // the caching lock server implementation
4 #include "lock_server.h"
9 lock_state::lock_state():
14 lock_state::lock_state(const lock_state &other) {
18 lock_state& lock_state::operator=(const lock_state& o) {
21 wanted_by = o.wanted_by;
22 old_requests = o.old_requests;
26 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
27 lock sl(lock_table_lock);
28 // this will create the lock if it doesn't already exist
29 return lock_table[lid];
32 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
33 thread(&lock_server::revoker, this).detach();
34 thread(&lock_server::retryer, this).detach();
35 rsm->set_state_transfer(this);
38 void lock_server::revoker() [[noreturn]] {
40 lock_protocol::lockid_t lid;
41 revoke_fifo.deq(&lid);
42 LOG("Revoking " << lid);
43 if (rsm && !rsm->amiprimary())
46 lock_state &st = get_lock_state(lid);
56 //while (t-- && !proxy)
57 proxy = handle(held_by.first).safebind();
60 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
61 LOG("Revoke returned " << ret);
66 void lock_server::retryer() [[noreturn]] {
68 lock_protocol::lockid_t lid;
70 if (rsm && !rsm->amiprimary())
73 LOG("Sending retry for " << lid);
74 lock_state &st = get_lock_state(lid);
78 if (st.wanted_by.empty())
80 front = st.wanted_by.front();
86 //while (t-- && !proxy)
87 proxy = handle(front.first).safebind();
90 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
91 LOG("Retry returned " << ret);
96 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
97 LOG("lid=" << lid << " client=" << id << "," << xid);
98 holder_t h = holder_t(id, xid);
99 lock_state &st = get_lock_state(lid);
102 // deal with duplicated requests
103 if (st.old_requests.count(id)) {
104 lock_protocol::xid_t old_xid = st.old_requests[id];
106 return lock_protocol::RPCERR;
107 else if (old_xid == xid) {
108 if (st.held && st.held_by == h) {
109 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
110 return lock_protocol::OK;
115 // grant the lock if it's available and I'm next in line
116 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
117 if (!st.wanted_by.empty())
118 st.wanted_by.pop_front();
119 st.old_requests[id] = xid;
123 LOG("Lock " << lid << " held by " << h.first);
124 if (st.wanted_by.size())
125 revoke_fifo.enq(lid);
126 return lock_protocol::OK;
131 for (auto p : st.wanted_by) {
133 // make sure client is obeying serialization
134 if (p.second != xid) {
135 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
136 return lock_protocol::RPCERR;
143 st.wanted_by.push_back(h);
145 LOG("wanted_by=" << st.wanted_by);
147 // send revoke if we're first in line
148 if (st.wanted_by.front() == h)
149 revoke_fifo.enq(lid);
151 return lock_protocol::RETRY;
154 int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
155 LOG("lid=" << lid << " client=" << id << "," << xid);
156 lock_state &st = get_lock_state(lid);
158 if (st.held && st.held_by == holder_t(id, xid)) {
160 LOG("Lock " << lid << " not held");
162 if (st.wanted_by.size())
164 return lock_protocol::OK;
167 string lock_server::marshal_state() {
168 lock sl(lock_table_lock);
170 rep << nacquire << lock_table;
171 return rep.content();
174 void lock_server::unmarshal_state(string state) {
175 lock sl(lock_table_lock);
176 unmarshall rep(state, false);
177 rep >> nacquire >> lock_table;
180 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
181 LOG("stat request for " << lid);
184 return lock_protocol::OK;