So many changes. Broken.
[invirt/third/libt4.git] / rsm.cc
diff --git a/rsm.cc b/rsm.cc
index c651e86..7104a09 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
-#include "rsm.h"
-#include "rsm_client.h"
+#include "include/rsm.h"
+#include "include/rsm_client.h"
 #include <unistd.h>
 
 using std::vector;
+using namespace std::chrono;
 
 rsm_state_transfer::~rsm_state_transfer() {}
 
@@ -130,7 +131,7 @@ void rsm::recovery() {
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
+                std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
@@ -174,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     rsm_mutex_lock.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
-    cfg->get_view(vid_insync, backups);
+    backups = cfg->get_view(vid_insync);
     backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
@@ -208,7 +209,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         rsm_mutex_lock.unlock();
         cl = rpcc::bind_cached(m);
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
+            ret = cl->call_timeout(rsm_protocol::transferreq, 100ms,
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
@@ -218,7 +219,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         return false;
     }
     if (stf && last_myvs != r.last) {
-        stf->unmarshal_state(r.state);
+        stf->unmarshall_state(r.state);
     }
     last_myvs = r.last;
     LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
@@ -247,7 +248,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
     auto cl = rpcc::bind_cached(m);
     if (cl)
-        ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+        ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs);
     rsm_mutex_lock.lock();
 
     if (cl == 0 || ret != rsm_protocol::OK) {
@@ -290,8 +291,8 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
     handler *h = procs[procno];
     VERIFY(h);
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
-    r = marshall(ret, rep.content()).content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep);
+    r = marshall(ret, rep);
 }
 
 static void logHexString(locked_ostream && log, const string & s) {
@@ -322,23 +323,23 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         LOG << "Assigning a viewstamp";
-        cfg->get_view(vid_commit, m);
+        m = cfg->get_view(vid_commit);
         // assign the RPC the next viewstamp number
         vs = myvs;
         myvs++;
     }
 
-    // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+    // send an invoke RPC to all slaves in the current view with a timeout
     LOG << "Invoking slaves";
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != myaddr) {
+    for (auto & mm : m) {
+        if (mm != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
-            LOG << "Sending invoke to " << m[i];
-            auto cl = rpcc::bind_cached(m[i]);
+            LOG << "Sending invoke to " << mm;
+            auto cl = rpcc::bind_cached(mm);
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req);
             LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
@@ -364,7 +365,6 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
     LOG << "invoke procno 0x" << std::hex << proc;
     lock ml(invoke_mutex);
-    vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
@@ -377,7 +377,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
-        cfg->get_view(vid_commit, m);
+        vector<string> m = cfg->get_view(vid_commit);
         if (std::find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
@@ -404,7 +404,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const strin
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
-        r.state = stf->marshal_state();
+        r.state = stf->marshall_state();
     r.last = last_myvs;
     return rsm_protocol::OK;
 }
@@ -465,9 +465,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 // primary failure
 //
 rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
-    vector<string> m;
     lock ml(rsm_mutex);
-    cfg->get_view(vid_commit, m);
+    vector<string> m = cfg->get_view(vid_commit);
     m.push_back(primary);
     r = m;
     LOG << "return " << m << " m " << primary;
@@ -478,9 +477,7 @@ rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
-    vector<string> c, p;
-    cfg->get_view(vid, c);
-    cfg->get_view(vid - 1, p);
+    vector<string> c = cfg->get_view(vid), p = cfg->get_view(vid - 1);
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
@@ -489,9 +486,9 @@ void rsm::set_primary(unsigned vid) {
     }
 
     VERIFY(p.size() > 0);
-    for (unsigned i = 0; i < p.size(); i++) {
-        if (isamember(p[i], c)) {
-            primary = p[i];
+    for (auto & pp : p) {
+        if (isamember(pp, c)) {
+            primary = pp;
             LOG << "primary is " << primary;
             return;
         }
@@ -509,12 +506,10 @@ bool rsm::amiprimary() {
 
 void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     VERIFY(rsm_mutex_lock);
-    vector<string> m;
-    cfg->get_view(vid_commit, m);
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != cfg->myaddr()) {
-            LOG << "member " << m[i] << " " << heal;
-            if (auto cl = rpcc::bind_cached(m[i]))
+    for (auto & mm : cfg->get_view(vid_commit)) {
+        if (mm != cfg->myaddr()) {
+            LOG << "member " << mm << " " << heal;
+            if (auto cl = rpcc::bind_cached(mm))
                 cl->set_reachable(heal);
         }
     }