1 // the caching lock server implementation
4 #include "lock_server.h"
9 lock_state::lock_state():
14 lock_state::lock_state(const lock_state &other) {
18 lock_state& lock_state::operator=(const lock_state& o) {
21 wanted_by = o.wanted_by;
22 old_requests = o.old_requests;
26 marshall & operator<<(marshall &m, const lock_state &d) {
27 return m << d.held << d.held_by << d.wanted_by;
30 unmarshall & operator>>(unmarshall &u, lock_state &d) {
31 return u >> d.held >> d.held_by >> d.wanted_by;
34 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
35 lock sl(lock_table_lock);
36 // by the semantics of map, this will create
37 // the lock if it doesn't already exist
38 return lock_table[lid];
41 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
42 std::thread(&lock_server::revoker, this).detach();
43 std::thread(&lock_server::retryer, this).detach();
44 rsm->set_state_transfer(this);
47 void lock_server::revoker() [[noreturn]] {
49 lock_protocol::lockid_t lid;
50 revoke_fifo.deq(&lid);
51 LOG("Revoking " << lid);
52 if (rsm && !rsm->amiprimary())
55 lock_state &st = get_lock_state(lid);
65 //while (t-- && !proxy)
66 proxy = handle(held_by.first).safebind();
69 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
70 LOG("Revoke returned " << ret);
75 void lock_server::retryer() [[noreturn]] {
77 lock_protocol::lockid_t lid;
79 if (rsm && !rsm->amiprimary())
82 LOG("Sending retry for " << lid);
83 lock_state &st = get_lock_state(lid);
87 if (st.wanted_by.empty())
89 front = st.wanted_by.front();
95 //while (t-- && !proxy)
96 proxy = handle(front.first).safebind();
99 auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
100 LOG("Retry returned " << ret);
105 int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
106 LOG("lid=" << lid << " client=" << id << "," << xid);
107 holder_t h = holder_t(id, xid);
108 lock_state &st = get_lock_state(lid);
111 // deal with duplicated requests
112 if (st.old_requests.count(id)) {
113 lock_protocol::xid_t old_xid = st.old_requests[id];
115 return lock_protocol::RPCERR;
116 else if (old_xid == xid) {
117 if (st.held && st.held_by == h) {
118 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
119 return lock_protocol::OK;
124 // grant the lock if it's available and I'm next in line
125 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
126 if (!st.wanted_by.empty())
127 st.wanted_by.pop_front();
128 st.old_requests[id] = xid;
132 LOG("Lock " << lid << " held by " << h.first);
133 if (st.wanted_by.size())
134 revoke_fifo.enq(lid);
135 return lock_protocol::OK;
140 for (auto p : st.wanted_by) {
142 // make sure client is obeying serialization
143 if (p.second != xid) {
144 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
145 return lock_protocol::RPCERR;
152 st.wanted_by.push_back(h);
154 LOG("wanted_by=" << st.wanted_by);
156 // send revoke if we're first in line
157 if (st.wanted_by.front() == h)
158 revoke_fifo.enq(lid);
160 return lock_protocol::RETRY;
163 int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
164 LOG("lid=" << lid << " client=" << id << "," << xid);
165 lock_state &st = get_lock_state(lid);
167 if (st.held && st.held_by == holder_t(id, xid)) {
169 LOG("Lock " << lid << " not held");
171 if (st.wanted_by.size())
173 return lock_protocol::OK;
176 string lock_server::marshal_state() {
177 lock sl(lock_table_lock);
184 void lock_server::unmarshal_state(string state) {
185 lock sl(lock_table_lock);
186 unmarshall rep(state);
191 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
192 LOG("stat request for " << lid);
195 return lock_protocol::OK;