So many changes. Broken.
[invirt/third/libt4.git] / lock_server.cc
index 40098d7..1241409 100644 (file)
@@ -1,76 +1,59 @@
 // the caching lock server implementation
 
-#include "lock_server.h"
+#include "include/lock_server.h"
 #include <unistd.h>
 #include <arpa/inet.h>
-#include "handle.h"
 
-lock_state::lock_state():
-    held(false)
-{
+lock_server::lock_state::lock_state(const lock_state & other) {
+    held = other.held;
+    held_by = other.held_by;
+    wanted_by = other.wanted_by;
+    old_requests = other.old_requests;
 }
 
-lock_state::lock_state(const lock_state &other) {
-    *this = other;
-}
-
-lock_state& lock_state::operator=(const lock_state& o) {
-    held = o.held;
-    held_by = o.held_by;
-    wanted_by = o.wanted_by;
-    old_requests = o.old_requests;
-    return *this;
-}
-
-lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
     // this will create the lock if it doesn't already exist
     return lock_table[lid];
 }
 
-lock_server::lock_server(rsm *r) : rsm_ (r) {
+lock_server::lock_server(rsm & r) : rsm_ (&r) {
     thread(&lock_server::revoker, this).detach();
     thread(&lock_server::retryer, this).detach();
-    rsm_->set_state_transfer(this);
+    r.set_state_transfer(this);
+
+    r.reg(lock_protocol::acquire, &lock_server::acquire, this);
+    r.reg(lock_protocol::release, &lock_server::release, this);
 }
 
-void lock_server::revoker() [[noreturn]] {
+void lock_server::revoker () {
     while (1) {
-        lock_protocol::lockid_t lid;
-        revoke_fifo.deq(&lid);
-        LOG("Revoking " << lid);
+        lock_protocol::lockid_t lid = revoke_fifo.deq();
+        LOG << "Revoking " << lid;
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
-        lock_state &st = get_lock_state(lid);
-        holder_t held_by;
-        {
-            lock sl(st.m);
-            held_by = st.held_by;
-        }
+        lock_state & st = get_lock_state(lid);
+        lock sl(st.m);
+        holder_t held_by = st.held_by;
+        sl.unlock();
 
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(held_by.first).safebind();
-        if (proxy) {
+        if (auto cl = rpcc::bind_cached(held_by.first)) {
             int r;
-            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
-            LOG("Revoke returned " << ret);
+            auto ret = (rlock_protocol::status)cl->call(rlock_protocol::revoke, r, lid, held_by.second);
+            LOG << "Revoke returned " << ret;
         }
     }
 }
 
-void lock_server::retryer() [[noreturn]] {
+void lock_server::retryer() {
     while (1) {
-        lock_protocol::lockid_t lid;
-        retry_fifo.deq(&lid);
+        lock_protocol::lockid_t lid = retry_fifo.deq();
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
-        LOG("Sending retry for " << lid);
-        lock_state &st = get_lock_state(lid);
+        LOG << "Sending retry for " << lid;
+        lock_state & st = get_lock_state(lid);
         holder_t front;
         {
             lock sl(st.m);
@@ -79,23 +62,18 @@ void lock_server::retryer() [[noreturn]] {
             front = st.wanted_by.front();
         }
 
-        rpcc *proxy = NULL;
-        // try a few times?
-        //int t=5;
-        //while (t-- && !proxy)
-        proxy = handle(front.first).safebind();
-        if (proxy) {
+        if (auto cl = rpcc::bind_cached(front.first)) {
             int r;
-            auto ret = (rlock_protocol::status)proxy->call(rlock_protocol::retry, r, lid, front.second);
-            LOG("Retry returned " << ret);
+            auto ret = (rlock_protocol::status)cl->call(rlock_protocol::retry, r, lid, front.second);
+            LOG << "Retry returned " << ret;
         }
     }
 }
 
-int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
-    LOG("lid=" << lid << " client=" << id << "," << xid);
+lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+    LOG << "lid=" << lid << " client=" << id << "," << xid;
     holder_t h = holder_t(id, xid);
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
 
     // deal with duplicated requests
@@ -105,7 +83,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
             return lock_protocol::RPCERR;
         else if (old_xid == xid) {
             if (st.held && st.held_by == h) {
-                LOG("Client " << id << " sent duplicate acquire xid=" << xid);
+                LOG << "Client " << id << " sent duplicate acquire xid=" << xid;
                 return lock_protocol::OK;
             }
         }
@@ -119,7 +97,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
 
         st.held = true;
         st.held_by = h;
-        LOG("Lock " << lid << " held by " << h.first);
+        LOG << "Lock " << lid << " held by " << h.first;
         if (st.wanted_by.size())
             revoke_fifo.enq(lid);
         return lock_protocol::OK;
@@ -131,7 +109,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
         if (p.first == id) {
             // make sure client is obeying serialization
             if (p.second != xid) {
-                LOG("Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second);
+                LOG << "Client " << id << " sent acquire xid=" << xid << " with in-progress xid=" << p.second;
                 return lock_protocol::RPCERR;
             }
             found = true;
@@ -141,7 +119,7 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
     if (!found)
         st.wanted_by.push_back(h);
 
-    LOG("wanted_by=" << st.wanted_by);
+    LOG << "wanted_by=" << st.wanted_by;
 
     // send revoke if we're first in line
     if (st.wanted_by.front() == h)
@@ -150,36 +128,25 @@ int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t &
     return lock_protocol::RETRY;
 }
 
-int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
-    LOG("lid=" << lid << " client=" << id << "," << xid);
-    lock_state &st = get_lock_state(lid);
+lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+    LOG << "lid=" << lid << " client=" << id << "," << xid;
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     if (st.held && st.held_by == holder_t(id, xid)) {
         st.held = false;
-        LOG("Lock " << lid << " not held");
+        LOG << "Lock " << lid << " not held";
     }
     if (st.wanted_by.size())
         retry_fifo.enq(lid);
     return lock_protocol::OK;
 }
 
-string lock_server::marshal_state() {
+string lock_server::marshall_state() {
     lock sl(lock_table_lock);
-    marshall rep;
-    rep << nacquire << lock_table;
-    return rep.content();
+    return marshall(nacquire, lock_table);
 }
 
-void lock_server::unmarshal_state(const string & state) {
+void lock_server::unmarshall_state(const string & state) {
     lock sl(lock_table_lock);
-    unmarshall rep(state, false);
-    rep >> nacquire >> lock_table;
-}
-
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
-    LOG("stat request for " << lid);
-    VERIFY(0);
-    r = nacquire;
-    return lock_protocol::OK;
+    unmarshall(state, nacquire, lock_table);
 }
-