-// the lock server implementation
+// the caching lock server implementation
+#include "types.h"
#include "lock_server.h"
-#include <sstream>
-#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
+#include "handle.h"
-lock_server::lock_server():
- nacquire (0)
+lock_state::lock_state():
+ held(false)
{
}
-// caller must hold lock_lock
-mutex &
-lock_server::get_lock(lock_protocol::lockid_t lid) {
- lock_lock.acquire();
- // by the semantics of std::map, this will create
- // the mutex if it doesn't already exist
- mutex &l = locks[lid];
- lock_lock.release();
- return l;
+lock_state::lock_state(const lock_state &other) {
+ *this = other;
}
-lock_protocol::status
-lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r)
-{
- lock_protocol::status ret = lock_protocol::OK;
- printf("stat request from clt %d\n", clt);
- r = nacquire;
- return ret;
+lock_state& lock_state::operator=(const lock_state& o) {
+ held = o.held;
+ held_by = o.held_by;
+ wanted_by = o.wanted_by;
+ old_requests = o.old_requests;
+ return *this;
}
-lock_protocol::status
-lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
-{
- get_lock(lid).acquire();
+lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+ lock sl(lock_table_lock);
+ // this will create the lock if it doesn't already exist
+ return lock_table[lid];
+}
+
+lock_server::lock_server(rsm *r) : rsm_ (r) {
+ thread(&lock_server::revoker, this).detach();
+ thread(&lock_server::retryer, this).detach();
+ rsm_->set_state_transfer(this);
+}
+
+void lock_server::revoker() [[noreturn]] {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ revoke_fifo.deq(&lid);
+ LOG("Revoking " << lid);
+ if (rsm_ && !rsm_->amiprimary())
+ continue;
+
+ lock_state &st = get_lock_state(lid);
+ holder_t held_by;
+ {
+ lock sl(st.m);
+ held_by = st.held_by;
+ }
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(held_by.first).safebind();
+ if (proxy) {
+ int r;
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+ LOG("Revoke returned " << ret);
+ }
+ }
+}
+
+void lock_server::retryer() [[noreturn]] {
+ while (1) {
+ lock_protocol::lockid_t lid;
+ retry_fifo.deq(&lid);
+ if (rsm_ && !rsm_->amiprimary())
+ continue;
+
+ LOG("Sending retry for " << lid);
+ lock_state &st = get_lock_state(lid);
+ holder_t front;
+ {
+ lock sl(st.m);
+ if (st.wanted_by.empty())
+ continue;
+ front = st.wanted_by.front();
+ }
+
+ rpcc *proxy = NULL;
+ // try a few times?
+ //int t=5;
+ //while (t-- && !proxy)
+ proxy = handle(front.first).safebind();
+ if (proxy) {
+ int r;
+ auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
+ LOG("Retry returned " << ret);
+ }
+ }
+}
+
+int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+ LOG("lid=" << lid << " client=" << id << "," << xid);
+ holder_t h = holder_t(id, xid);
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+
+ // deal with duplicated requests
+ if (st.old_requests.count(id)) {
+ lock_protocol::xid_t old_xid = st.old_requests[id];
+ if (old_xid > xid)
+ return lock_protocol::RPCERR;
+ else if (old_xid == xid) {
+ if (st.held && st.held_by == h) {
+ LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+ return lock_protocol::OK;
+ }
+ }
+ }
+
+ // grant the lock if it's available and I'm next in line
+ if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
+ if (!st.wanted_by.empty())
+ st.wanted_by.pop_front();
+ st.old_requests[id] = xid;
+
+ st.held = true;
+ st.held_by = h;
+ LOG("Lock " << lid << " held by " << h.first);
+ if (st.wanted_by.size())
+ revoke_fifo.enq(lid);
+ return lock_protocol::OK;
+ }
+
+ // get in line
+ bool found = false;
+ for (auto p : st.wanted_by) {
+ if (p.first == id) {
+ // make sure client is obeying serialization
+ if (p.second != xid) {
+ LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
+ return lock_protocol::RPCERR;
+ }
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ st.wanted_by.push_back(h);
+
+ LOG("wanted_by=" << st.wanted_by);
+
+ // send revoke if we're first in line
+ if (st.wanted_by.front() == h)
+ revoke_fifo.enq(lid);
+
+ return lock_protocol::RETRY;
+}
+
+int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock_protocol::xid_t xid) {
+ LOG("lid=" << lid << " client=" << id << "," << xid);
+ lock_state &st = get_lock_state(lid);
+ lock sl(st.m);
+ if (st.held && st.held_by == holder_t(id, xid)) {
+ st.held = false;
+ LOG("Lock " << lid << " not held");
+ }
+ if (st.wanted_by.size())
+ retry_fifo.enq(lid);
return lock_protocol::OK;
}
-lock_protocol::status
-lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
-{
- get_lock(lid).release();
+string lock_server::marshal_state() {
+ lock sl(lock_table_lock);
+ marshall rep;
+ rep << nacquire << lock_table;
+ return rep.content();
+}
+
+void lock_server::unmarshal_state(string state) {
+ lock sl(lock_table_lock);
+ unmarshall rep(state, false);
+ rep >> nacquire >> lock_table;
+}
+
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+ LOG("stat request for " << lid);
+ VERIFY(0);
+ r = nacquire;
return lock_protocol::OK;
}
+