1 // the caching lock server implementation
3 #include "lock_server.h"
8 #include "lang/verify.h"
11 #include "rpc/marshall.h"
14 using std::ostringstream;
15 using std::istringstream;
18 lock_state::lock_state():
23 lock_state::lock_state(const lock_state &other) {
27 lock_state& lock_state::operator=(const lock_state& o) {
30 wanted_by = o.wanted_by;
31 old_requests = o.old_requests;
35 template <class A, class B>
36 ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
37 o << "<" << d.first << "," << d.second << ">";
41 marshall & operator<<(marshall &m, const lock_state &d) {
42 return m << d.held << d.held_by << d.wanted_by;
45 unmarshall & operator>>(unmarshall &u, lock_state &d) {
46 return u >> d.held >> d.held_by >> d.wanted_by;
49 lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
50 lock sl(lock_table_lock);
51 // by the semantics of map, this will create
52 // the lock if it doesn't already exist
53 return lock_table[lid];
56 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
57 std::thread(&lock_server::revoker, this).detach();
58 std::thread(&lock_server::retryer, this).detach();
59 rsm->set_state_transfer(this);
62 void lock_server::revoker() {
64 lock_protocol::lockid_t lid;
65 revoke_fifo.deq(&lid);
66 LOG("Revoking " << lid);
67 if (rsm && !rsm->amiprimary())
70 lock_state &st = get_lock_state(lid);
80 //while (t-- && !proxy)
81 proxy = handle(held_by.first).safebind();
84 rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
85 LOG("Revoke returned " << ret);
90 void lock_server::retryer() {
92 lock_protocol::lockid_t lid;
94 if (rsm && !rsm->amiprimary())
97 LOG("Sending retry for " << lid);
98 lock_state &st = get_lock_state(lid);
102 if (st.wanted_by.empty())
104 front = st.wanted_by.front();
107 rlock_protocol::status ret = -1;
112 //while (t-- && !proxy)
113 proxy = handle(front.first).safebind();
116 ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
117 LOG("Retry returned " << ret);
122 int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
123 LOG_FUNC_ENTER_SERVER;
124 holder h = holder(id, xid);
125 lock_state &st = get_lock_state(lid);
128 // deal with duplicated requests
129 if (st.old_requests.count(id)) {
130 lock_protocol::xid_t old_xid = st.old_requests[id];
132 return lock_protocol::RPCERR;
133 else if (old_xid == xid) {
134 if (st.held && st.held_by == h) {
135 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
136 return lock_protocol::OK;
141 // grant the lock if it's available and I'm next in line
142 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
143 if (!st.wanted_by.empty())
144 st.wanted_by.pop_front();
145 st.old_requests[id] = xid;
149 LOG("Lock " << lid << " held by " << h.first);
150 if (st.wanted_by.size())
151 revoke_fifo.enq(lid);
152 return lock_protocol::OK;
157 for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
158 if (i->first == id) {
159 // make sure client is obeying serialization
160 if (i->second != xid) {
161 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
162 return lock_protocol::RPCERR;
169 st.wanted_by.push_back(h);
171 LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
173 // send revoke if we're first in line
174 if (st.wanted_by.front() == h)
175 revoke_fifo.enq(lid);
177 return lock_protocol::RETRY;
180 int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
181 LOG_FUNC_ENTER_SERVER;
182 lock_state &st = get_lock_state(lid);
184 if (st.held && st.held_by == holder(id, xid)) {
186 LOG("Lock " << lid << " not held");
188 if (st.wanted_by.size())
190 return lock_protocol::OK;
193 string lock_server::marshal_state() {
194 lock sl(lock_table_lock);
201 void lock_server::unmarshal_state(string state) {
202 lock sl(lock_table_lock);
203 unmarshall rep(state);
208 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
209 printf("stat request\n");
212 return lock_protocol::OK;