Cosmetic improvements.
[invirt/third/libt4.git] / rsm.cc
diff --git a/rsm.cc b/rsm.cc
index 956f45d..c766145 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -83,9 +83,7 @@
 #include "rsm_client.h"
 #include <unistd.h>
 
 #include "rsm_client.h"
 #include <unistd.h>
 
-rsm::rsm(const string & _first, const string & _me) :
-    stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
-    partitioned (false), dopartition(false), break1(false), break2(false)
+rsm::rsm(const string & _first, const string & _me) : primary(_first)
 {
     cfg = unique_ptr<config>(new config(_first, _me, this));
 
 {
     cfg = unique_ptr<config>(new config(_first, _me, this));
 
@@ -103,7 +101,7 @@ rsm::rsm(const string & _first, const string & _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
+    testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
@@ -115,11 +113,6 @@ void rsm::start() {
     thread(&rsm::recovery, this).detach();
 }
 
     thread(&rsm::recovery, this).detach();
 }
 
-void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
-    lock ml(rsm_mutex);
-    procs[proc] = h;
-}
-
 // The recovery thread runs this function
 void rsm::recovery() {
     bool r = true;
 // The recovery thread runs this function
 void rsm::recovery() {
     bool r = true;
@@ -279,7 +272,7 @@ void rsm::commit_change(unsigned vid) {
     lock ml(rsm_mutex);
     commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
     lock ml(rsm_mutex);
     commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 void rsm::commit_change(unsigned vid, lock &) {
 }
 
 void rsm::commit_change(unsigned vid, lock &) {
@@ -293,7 +286,7 @@ void rsm::commit_change(unsigned vid, lock &) {
     recovery_cond.notify_one();
     sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
     recovery_cond.notify_one();
     sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 
 }
 
 
@@ -301,10 +294,9 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
-    unmarshall args(req, false);
     marshall rep;
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(args, rep);
-    r = marshall{ret, rep.content()}.content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
+    r = marshall(ret, rep.content()).content();
 }
 
 //
 }
 
 //
@@ -350,7 +342,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
-            breakpoint1();
+            breakpoint(1);
             lock rsm_mutex_lock(rsm_mutex);
             partition1(rsm_mutex_lock);
         }
             lock rsm_mutex_lock(rsm_mutex);
             partition1(rsm_mutex_lock);
         }
@@ -398,14 +390,14 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
     string r;
     execute(proc, req, r);
     last_myvs = vs;
     string r;
     execute(proc, req, r);
     last_myvs = vs;
-    breakpoint1();
+    breakpoint(1);
     return rsm_protocol::OK;
 }
 
 //
 // RPC handler: Send back the local node's state to the caller
 //
     return rsm_protocol::OK;
 }
 
 //
 // RPC handler: Send back the local node's state to the caller
 //
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
@@ -473,7 +465,7 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 // RPC handler: Responds with the list of known nodes for fall-back on a
 // primary failure
 //
 // RPC handler: Responds with the list of known nodes for fall-back on a
 // primary failure
 //
-rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
     vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
     vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
@@ -516,7 +508,8 @@ bool rsm::amiprimary() {
 
 // Test RPCs -- simulate partitions and failures
 
 
 // Test RPCs -- simulate partitions and failures
 
-void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
+void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
+    VERIFY(rsm_mutex_lock);
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
@@ -529,7 +522,7 @@ void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
     rsmrpc->set_reachable(heal);
 }
 
     rsmrpc->set_reachable(heal);
 }
 
-rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
     lock ml(rsm_mutex);
     LOG("heal " << heal << " (dopartition " <<
             dopartition << ", partitioned " << partitioned << ")");
     lock ml(rsm_mutex);
     LOG("heal " << heal << " (dopartition " <<
             dopartition << ", partitioned " << partitioned << ")");
@@ -543,16 +536,9 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r,
 
 // simulate failure at breakpoint 1 and 2
 
 
 // simulate failure at breakpoint 1 and 2
 
-void rsm::breakpoint1() {
-    if (break1) {
-        LOG("Dying at breakpoint 1 in rsm!");
-        exit(1);
-    }
-}
-
-void rsm::breakpoint2() {
-    if (break2) {
-        LOG("Dying at breakpoint 2 in rsm!");
+void rsm::breakpoint(int b) {
+    if (breakpoints[b-1]) {
+        LOG("Dying at breakpoint " << b << " in rsm!");
         exit(1);
     }
 }
         exit(1);
     }
 }
@@ -565,12 +551,12 @@ void rsm::partition1(lock & rsm_mutex_lock) {
     }
 }
 
     }
 }
 
-rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
     LOG("breakpoint " << b);
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
     LOG("breakpoint " << b);
-    if (b == 1) break1 = true;
-    else if (b == 2) break2 = true;
+    if (b == 1) breakpoints[1-1] = true;
+    else if (b == 2) breakpoints[2-1] = true;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
     else r = rsm_test_protocol::ERR;
     return r;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
     else r = rsm_test_protocol::ERR;
     return r;