1 // the caching lock server implementation
3 #include "lock_server.h"
8 lock_state::lock_state():
13 lock_state::lock_state(const lock_state &other) {
17 lock_state& lock_state::operator=(const lock_state& o) {
20 wanted_by = o.wanted_by;
21 old_requests = o.old_requests;
25 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
26 lock sl(lock_table_lock);
27 // this will create the lock if it doesn't already exist
28 return lock_table[lid];
31 lock_server::lock_server(rsm *r) : rsm_ (r) {
32 thread(&lock_server::revoker, this).detach();
33 thread(&lock_server::retryer, this).detach();
34 rsm_->set_state_transfer(this);
37 void lock_server::revoker() [[noreturn]] {
39 lock_protocol::lockid_t lid;
40 revoke_fifo.deq(&lid);
41 LOG("Revoking " << lid);
42 if (rsm_ && !rsm_->amiprimary())
45 lock_state &st = get_lock_state(lid);
55 //while (t-- && !proxy)
56 proxy = handle(held_by.first).safebind();
59 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
60 LOG("Revoke returned " << ret);
65 void lock_server::retryer() [[noreturn]] {
67 lock_protocol::lockid_t lid;
69 if (rsm_ && !rsm_->amiprimary())
72 LOG("Sending retry for " << lid);
73 lock_state &st = get_lock_state(lid);
77 if (st.wanted_by.empty())
79 front = st.wanted_by.front();
85 //while (t-- && !proxy)
86 proxy = handle(front.first).safebind();
89 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
90 LOG("Retry returned " << ret);
95 int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
96 LOG("lid=" << lid << " client=" << id << "," << xid);
97 holder_t h = holder_t(id, xid);
98 lock_state &st = get_lock_state(lid);
101 // deal with duplicated requests
102 if (st.old_requests.count(id)) {
103 lock_protocol::xid_t old_xid = st.old_requests[id];
105 return lock_protocol::RPCERR;
106 else if (old_xid == xid) {
107 if (st.held && st.held_by == h) {
108 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
109 return lock_protocol::OK;
114 // grant the lock if it's available and I'm next in line
115 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
116 if (!st.wanted_by.empty())
117 st.wanted_by.pop_front();
118 st.old_requests[id] = xid;
122 LOG("Lock " << lid << " held by " << h.first);
123 if (st.wanted_by.size())
124 revoke_fifo.enq(lid);
125 return lock_protocol::OK;
130 for (auto p : st.wanted_by) {
132 // make sure client is obeying serialization
133 if (p.second != xid) {
134 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
135 return lock_protocol::RPCERR;
142 st.wanted_by.push_back(h);
144 LOG("wanted_by=" << st.wanted_by);
146 // send revoke if we're first in line
147 if (st.wanted_by.front() == h)
148 revoke_fifo.enq(lid);
150 return lock_protocol::RETRY;
153 int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
154 LOG("lid=" << lid << " client=" << id << "," << xid);
155 lock_state &st = get_lock_state(lid);
157 if (st.held && st.held_by == holder_t(id, xid)) {
159 LOG("Lock " << lid << " not held");
161 if (st.wanted_by.size())
163 return lock_protocol::OK;
166 string lock_server::marshal_state() {
167 lock sl(lock_table_lock);
169 rep << nacquire << lock_table;
170 return rep.content();
173 void lock_server::unmarshal_state(const string & state) {
174 lock sl(lock_table_lock);
175 unmarshall rep(state, false);
176 rep >> nacquire >> lock_table;
179 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
180 LOG("stat request for " << lid);
183 return lock_protocol::OK;