More renaming
[invirt/third/libt4.git] / lock_server.cc
index 0f82080..f5a1fc4 100644 (file)
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <stdlib.h>
+// the caching lock server implementation
+
+#include "lock_server.h"
+#include <sstream>
 #include <stdio.h>
 #include <unistd.h>
-#include "lock_server_cache_rsm.h"
-#include "paxos.h"
-#include "rsm.h"
-
-// Main loop of lock_server
+#include <arpa/inet.h>
+#include "lang/verify.h"
+#include "handle.h"
+#include "tprintf.h"
+#include "rpc/marshall.h"
+#include "lock.h"
 
-char tprintf_thread_prefix = 's';
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
 
-int
-main(int argc, char *argv[])
+lock_state::lock_state():
+    held(false)
 {
-    setvbuf(stdout, NULL, _IONBF, 0);
-    setvbuf(stderr, NULL, _IONBF, 0);
+}
+
+lock_state::lock_state(const lock_state &other) {
+    *this = other;
+}
+
+lock_state& lock_state::operator=(const lock_state& o) {
+    held = o.held;
+    held_by = o.held_by;
+    wanted_by = o.wanted_by;
+    old_requests = o.old_requests;
+    return *this;
+}
 
-    srandom(getpid());
+template <class A, class B>
+ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
+    o << "<" << d.first << "," << d.second << ">";
+    return o;
+}
+
+marshall & operator<<(marshall &m, const lock_state &d) {
+       return m << d.held << d.held_by << d.wanted_by;
+}
 
-    if(argc != 3){
-        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
-        exit(1);
+unmarshall & operator>>(unmarshall &u, lock_state &d) {
+       return u >> d.held >> d.held_by >> d.wanted_by;
+}
+
+lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+    lock sl(lock_table_lock);
+    // by the semantics of map, this will create
+    // the lock if it doesn't already exist
+    return lock_table[lid];
+}
+
+lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
+    std::thread(&lock_server::revoker, this).detach();
+    std::thread(&lock_server::retryer, this).detach();
+    rsm->set_state_transfer(this);
+}
+
+void lock_server::revoker() {
+    while (1) {
+        lock_protocol::lockid_t lid;
+        revoke_fifo.deq(&lid);
+        LOG("Revoking " << lid);
+        if (rsm && !rsm->amiprimary())
+            continue;
+
+        lock_state &st = get_lock_state(lid);
+        holder held_by;
+        {
+            lock sl(st.m);
+            held_by = st.held_by;
+        }
+
+        rpcc *proxy = NULL;
+        // try a few times?
+        //int t=5;
+        //while (t-- && !proxy)
+        proxy = handle(held_by.first).safebind();
+        if (proxy) {
+            int r;
+            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
+            LOG("Revoke returned " << ret);
+        }
     }
+}
 
-    rsm rsm(argv[1], argv[2]);
-    lock_server_cache_rsm ls(&rsm);
-    rsm.set_state_transfer(&ls);
+void lock_server::retryer() {
+    while (1) {
+        lock_protocol::lockid_t lid;
+        retry_fifo.deq(&lid);
+        if (rsm && !rsm->amiprimary())
+            continue;
 
-    rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls);
-    rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls);
-    rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls);
+        LOG("Sending retry for " << lid);
+        lock_state &st = get_lock_state(lid);
+        holder front;
+        {
+            lock sl(st.m);
+            if (st.wanted_by.empty())
+                continue;
+            front = st.wanted_by.front();
+        }
 
-    while(1)
-        sleep(1000);
+        rlock_protocol::status ret = -1;
+
+        rpcc *proxy = NULL;
+        // try a few times?
+        //int t=5;
+        //while (t-- && !proxy)
+        proxy = handle(front.first).safebind();
+        if (proxy) {
+            int r;
+            ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
+            LOG("Retry returned " << ret);
+        }
+    }
 }
+
+int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+    LOG_FUNC_ENTER_SERVER;
+    holder h = holder(id, xid);
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+
+    // deal with duplicated requests
+    if (st.old_requests.count(id)) {
+        lock_protocol::xid_t old_xid = st.old_requests[id];
+        if (old_xid > xid)
+            return lock_protocol::RPCERR;
+        else if (old_xid == xid) {
+            if (st.held && st.held_by == h) {
+                LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+                return lock_protocol::OK;
+            }
+        }
+    }
+
+    // grant the lock if it's available and I'm next in line
+    if (!st.held && (st.wanted_by.empty() || st.wanted_by.front() == h)) {
+        if (!st.wanted_by.empty())
+            st.wanted_by.pop_front();
+        st.old_requests[id] = xid;
+
+        st.held = true;
+        st.held_by = h;
+        LOG("Lock " << lid << " held by " << h.first);
+        if (st.wanted_by.size())
+            revoke_fifo.enq(lid);
+        return lock_protocol::OK;
+    }
+
+    // get in line
+    bool found = false;
+    for (list<holder>::iterator i = st.wanted_by.begin(); i != st.wanted_by.end(); i++) {
+        if (i->first == id) {
+            // make sure client is obeying serialization
+            if (i->second != xid) {
+                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << i->second);
+                return lock_protocol::RPCERR;
+            }
+            found = true;
+            break;
+        }
+    }
+    if (!found)
+        st.wanted_by.push_back(h);
+
+    LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+
+    // send revoke if we're first in line
+    if (st.wanted_by.front() == h)
+        revoke_fifo.enq(lid);
+
+    return lock_protocol::RETRY;
+}
+
+int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
+    LOG_FUNC_ENTER_SERVER;
+    lock_state &st = get_lock_state(lid);
+    lock sl(st.m);
+    if (st.held && st.held_by == holder(id, xid)) {
+        st.held = false;
+        LOG("Lock " << lid << " not held");
+    }
+    if (st.wanted_by.size())
+        retry_fifo.enq(lid);
+    return lock_protocol::OK;
+}
+
+string lock_server::marshal_state() {
+    lock sl(lock_table_lock);
+    marshall rep;
+    rep << nacquire;
+    rep << lock_table;
+    return rep.str();
+}
+
+void lock_server::unmarshal_state(string state) {
+    lock sl(lock_table_lock);
+    unmarshall rep(state);
+    rep >> nacquire;
+    rep >> lock_table;
+}
+
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+    printf("stat request\n");
+    VERIFY(0);
+    r = nacquire;
+    return lock_protocol::OK;
+}
+