So many changes. Broken.
[invirt/third/libt4.git] / rsm.cc
diff --git a/rsm.cc b/rsm.cc
index 5812b33..7104a09 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
-#include "rsm.h"
-#include "handle.h"
-#include "rsm_client.h"
+#include "include/rsm.h"
+#include "include/rsm_client.h"
 #include <unistd.h>
 
 #include <unistd.h>
 
+using std::vector;
+using namespace std::chrono;
+
 rsm_state_transfer::~rsm_state_transfer() {}
 
 rsm::rsm(const string & _first, const string & _me) : primary(_first)
 {
 rsm_state_transfer::~rsm_state_transfer() {}
 
 rsm::rsm(const string & _first, const string & _me) : primary(_first)
 {
-    cfg = unique_ptr<config>(new config(_first, _me, this));
+    cfg = std::make_unique<config>(_first, _me, this);
 
     if (_first == _me) {
         // Commit the first view here. We can not have acceptor::acceptor
 
     if (_first == _me) {
         // Commit the first view here. We can not have acceptor::acceptor
@@ -103,7 +105,7 @@ rsm::rsm(const string & _first, const string & _me) : primary(_first)
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
+    testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
@@ -129,7 +131,7 @@ void rsm::recovery() {
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary?
+                std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
                 ml.lock();
             }
         }
@@ -173,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     rsm_mutex_lock.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
     rsm_mutex_lock.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
-    cfg->get_view(vid_insync, backups);
+    backups = cfg->get_view(vid_insync);
     backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
     backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
@@ -200,15 +202,14 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
 bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
 {
     rsm_protocol::transferres r;
 bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
 {
     rsm_protocol::transferres r;
-    handle h(m);
     int ret = 0;
     LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     int ret = 0;
     LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
-    rpcc *cl;
+    shared_ptr<rpcc> cl;
     {
         rsm_mutex_lock.unlock();
     {
         rsm_mutex_lock.unlock();
-        cl = h.safebind();
+        cl = rpcc::bind_cached(m);
         if (cl) {
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
+            ret = cl->call_timeout(rsm_protocol::transferreq, 100ms,
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
@@ -218,7 +219,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         return false;
     }
     if (stf && last_myvs != r.last) {
         return false;
     }
     if (stf && last_myvs != r.last) {
-        stf->unmarshal_state(r.state);
+        stf->unmarshall_state(r.state);
     }
     last_myvs = r.last;
     LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     }
     last_myvs = r.last;
     LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
@@ -227,10 +228,8 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
 
 bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
 
 bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
-    handle h(m);
-    rpcc *cl = h.safebind();
     bool done = false;
     bool done = false;
-    if (cl) {
+    if (auto cl = rpcc::bind_cached(m)) {
         int r;
         auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
         done = (ret == rsm_protocol::OK);
         int r;
         auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
         done = (ret == rsm_protocol::OK);
@@ -241,21 +240,16 @@ bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) {
 
 
 bool rsm::join(const string & m, lock & rsm_mutex_lock) {
 
 
 bool rsm::join(const string & m, lock & rsm_mutex_lock) {
-    handle h(m);
     int ret = 0;
     string log;
 
     LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
     int ret = 0;
     string log;
 
     LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
-    rpcc *cl;
-    {
-        rsm_mutex_lock.unlock();
-        cl = h.safebind();
-        if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
-                    cfg->myaddr(), last_myvs);
-        }
-        rsm_mutex_lock.lock();
-    }
+
+    rsm_mutex_lock.unlock();
+    auto cl = rpcc::bind_cached(m);
+    if (cl)
+        ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs);
+    rsm_mutex_lock.lock();
 
     if (cl == 0 || ret != rsm_protocol::OK) {
         LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
 
     if (cl == 0 || ret != rsm_protocol::OK) {
         LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
@@ -297,8 +291,8 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
     handler *h = procs[procno];
     VERIFY(h);
     marshall rep;
     handler *h = procs[procno];
     VERIFY(h);
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
-    r = marshall(ret, rep.content()).content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep);
+    r = marshall(ret, rep);
 }
 
 static void logHexString(locked_ostream && log, const string & s) {
 }
 
 static void logHexString(locked_ostream && log, const string & s) {
@@ -329,24 +323,23 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         LOG << "Assigning a viewstamp";
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         LOG << "Assigning a viewstamp";
-        cfg->get_view(vid_commit, m);
+        m = cfg->get_view(vid_commit);
         // assign the RPC the next viewstamp number
         vs = myvs;
         myvs++;
     }
 
         // assign the RPC the next viewstamp number
         vs = myvs;
         myvs++;
     }
 
-    // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+    // send an invoke RPC to all slaves in the current view with a timeout
     LOG << "Invoking slaves";
     LOG << "Invoking slaves";
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != myaddr) {
+    for (auto & mm : m) {
+        if (mm != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
             // if invoke on slave fails, return rsm_client_protocol::BUSY
-            handle h(m[i]);
-            LOG << "Sending invoke to " << m[i];
-            rpcc *cl = h.safebind();
+            LOG << "Sending invoke to " << mm;
+            auto cl = rpcc::bind_cached(mm);
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req);
             LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
@@ -372,7 +365,6 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
     LOG << "invoke procno 0x" << std::hex << proc;
     lock ml(invoke_mutex);
 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
     LOG << "invoke procno 0x" << std::hex << proc;
     lock ml(invoke_mutex);
-    vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
     string myaddr;
     {
         lock ml2(rsm_mutex);
@@ -385,7 +377,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
-        cfg->get_view(vid_commit, m);
+        vector<string> m = cfg->get_view(vid_commit);
         if (std::find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
         if (std::find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
@@ -412,7 +404,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const strin
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
-        r.state = stf->marshal_state();
+        r.state = stf->marshall_state();
     r.last = last_myvs;
     return rsm_protocol::OK;
 }
     r.last = last_myvs;
     return rsm_protocol::OK;
 }
@@ -473,9 +465,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 // primary failure
 //
 rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
 // primary failure
 //
 rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
-    vector<string> m;
     lock ml(rsm_mutex);
     lock ml(rsm_mutex);
-    cfg->get_view(vid_commit, m);
+    vector<string> m = cfg->get_view(vid_commit);
     m.push_back(primary);
     r = m;
     LOG << "return " << m << " m " << primary;
     m.push_back(primary);
     r = m;
     LOG << "return " << m << " m " << primary;
@@ -486,9 +477,7 @@ rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
-    vector<string> c, p;
-    cfg->get_view(vid, c);
-    cfg->get_view(vid - 1, p);
+    vector<string> c = cfg->get_view(vid), p = cfg->get_view(vid - 1);
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
@@ -497,9 +486,9 @@ void rsm::set_primary(unsigned vid) {
     }
 
     VERIFY(p.size() > 0);
     }
 
     VERIFY(p.size() > 0);
-    for (unsigned i = 0; i < p.size(); i++) {
-        if (isamember(p[i], c)) {
-            primary = p[i];
+    for (auto & pp : p) {
+        if (isamember(pp, c)) {
+            primary = pp;
             LOG << "primary is " << primary;
             return;
         }
             LOG << "primary is " << primary;
             return;
         }
@@ -517,13 +506,11 @@ bool rsm::amiprimary() {
 
 void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     VERIFY(rsm_mutex_lock);
 
 void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     VERIFY(rsm_mutex_lock);
-    vector<string> m;
-    cfg->get_view(vid_commit, m);
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != cfg->myaddr()) {
-            handle h(m[i]);
-            LOG << "member " << m[i] << " " << heal;
-            if (h.safebind()) h.safebind()->set_reachable(heal);
+    for (auto & mm : cfg->get_view(vid_commit)) {
+        if (mm != cfg->myaddr()) {
+            LOG << "member " << mm << " " << heal;
+            if (auto cl = rpcc::bind_cached(mm))
+                cl->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);