1 // the caching lock server implementation
3 #include "lock_server_cache_rsm.h"
8 #include "lang/verify.h"
11 #include "rpc/marshall.h"
13 lock_state::lock_state():
18 template <class A, class B>
19 ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
20 o << "<" << d.first << "," << d.second << ">";
25 marshall & operator<<(marshall &m, const list<A> &d) {
26 m << vector<A>(d.begin(), d.end());
31 unmarshall & operator>>(unmarshall &u, list<A> &d) {
34 d.assign(v.begin(), v.end());
39 template <class A, class B>
40 marshall & operator<<(marshall &m, const pair<A,B> &d) {
46 template <class A, class B>
47 unmarshall & operator>>(unmarshall &u, pair<A,B> &d) {
53 marshall & operator<<(marshall &m, const lock_state &d) {
60 unmarshall & operator>>(unmarshall &u, lock_state &d) {
68 lock_state & lock_server_cache_rsm::get_lock_state(lock_protocol::lockid_t lid) {
69 ScopedLock sl(lock_table_lock);
70 // by the semantics of map, this will create
71 // the lock if it doesn't already exist
72 return lock_table[lid];
75 static void *revokethread(void *x) {
76 lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
81 static void *retrythread(void *x) {
82 lock_server_cache_rsm *sc = (lock_server_cache_rsm *) x;
87 lock_server_cache_rsm::lock_server_cache_rsm(class rsm *_rsm) : rsm (_rsm) {
89 VERIFY(pthread_create(&th, NULL, &revokethread, (void *)this) == 0);
90 VERIFY(pthread_create(&th, NULL, &retrythread, (void *)this) == 0);
91 rsm->set_state_transfer(this);
94 void lock_server_cache_rsm::revoker() {
96 lock_protocol::lockid_t lid;
97 revoke_fifo.deq(&lid);
98 LOG("Revoking " << lid);
99 if (rsm && !rsm->amiprimary())
102 lock_state &st = get_lock_state(lid);
106 held_by = st.held_by;
112 //while (t-- && !proxy)
113 proxy = handle(held_by.first).safebind();
116 rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r);
117 LOG("Revoke returned " << ret);
122 void lock_server_cache_rsm::retryer() {
124 lock_protocol::lockid_t lid;
125 retry_fifo.deq(&lid);
126 if (rsm && !rsm->amiprimary())
129 LOG("Sending retry for " << lid);
130 lock_state &st = get_lock_state(lid);
134 if (st.wanted_by.empty())
136 front = st.wanted_by.front();
139 rlock_protocol::status ret = -1;
144 //while (t-- && !proxy)
145 proxy = handle(front.first).safebind();
148 ret = proxy->call(rlock_protocol::retry, lid, front.second, r);
149 LOG("Retry returned " << ret);
154 int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) {
155 LOG_FUNC_ENTER_SERVER;
156 holder h = holder(id, xid);
157 lock_state &st = get_lock_state(lid);
160 // deal with duplicated requests
161 if (st.old_requests.count(id)) {
162 lock_protocol::xid_t old_xid = st.old_requests[id];
164 return lock_protocol::RPCERR;
165 else if (old_xid == xid) {
166 if (st.held && st.held_by == h) {
167 LOG("Client " << id << " sent duplicate acquire xid=" << xid);
168 return lock_protocol::OK;
173 // grant the lock if it's available and I'm next in line
174 if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
175 if (!st.wanted_by.empty())
176 st.wanted_by.pop_front();
177 st.old_requests[id] = xid;
181 LOG("Lock " << lid << " held by " << h.first);
182 if (st.wanted_by.size())
183 revoke_fifo.enq(lid);
184 return lock_protocol::OK;
189 for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
190 if (i->first == id) {
191 // make sure client is obeying serialization
192 if (i->second != xid) {
193 LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
194 return lock_protocol::RPCERR;
201 st.wanted_by.push_back(h);
203 LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
205 // send revoke if we're first in line
206 if (st.wanted_by.front() == h)
207 revoke_fifo.enq(lid);
209 return lock_protocol::RETRY;
212 int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
213 LOG_FUNC_ENTER_SERVER;
214 lock_state &st = get_lock_state(lid);
216 if (st.held && st.held_by == holder(id, xid)) {
218 LOG("Lock " << lid << " not held");
220 if (st.wanted_by.size())
222 return lock_protocol::OK;
225 string lock_server_cache_rsm::marshal_state() {
226 ScopedLock sl(lock_table_lock);
233 void lock_server_cache_rsm::unmarshal_state(string state) {
234 ScopedLock sl(lock_table_lock);
235 unmarshall rep(state);
240 lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) {
241 printf("stat request\n");
243 return lock_protocol::OK;