Global destructor clean-ups and python test fixes
[invirt/third/libt4.git] / rsm.cc
diff --git a/rsm.cc b/rsm.cc
index 255d281..cb986fe 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
-#include <sys/types.h>
-#include <unistd.h>
-
-#include "types.h"
-#include "handle.h"
 #include "rsm.h"
 #include "rsm.h"
+#include "handle.h"
 #include "rsm_client.h"
 #include "rsm_client.h"
+#include <unistd.h>
+
+using std::vector;
 
 
-rsm::rsm(const string & _first, const string & _me) :
-    stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
-    partitioned (false), dopartition(false), break1(false), break2(false)
+rsm_state_transfer::~rsm_state_transfer() {}
+
+rsm::rsm(const string & _first, const string & _me) : primary(_first)
 {
 {
-    cfg = new config(_first, _me, this);
+    cfg = unique_ptr<config>(new config(_first, _me, this));
 
     if (_first == _me) {
         // Commit the first view here. We can not have acceptor::acceptor
 
     if (_first == _me) {
         // Commit the first view here. We can not have acceptor::acceptor
@@ -106,23 +105,20 @@ rsm::rsm(const string & _first, const string & _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = new rpcs((in_port_t)stoi(_me) + 1);
+    testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
-
-    {
-        lock ml(rsm_mutex);
-        thread(&rsm::recovery, this).detach();
-    }
 }
 
 }
 
-void rsm::reg1(int proc, handler *h) {
+void rsm::start() {
     lock ml(rsm_mutex);
     lock ml(rsm_mutex);
-    procs[proc] = h;
+    rsmrpc->start();
+    testsvr->start();
+    thread(&rsm::recovery, this).detach();
 }
 
 // The recovery thread runs this function
 }
 
 // The recovery thread runs this function
-void rsm::recovery() [[noreturn]] {
+void rsm::recovery() {
     bool r = true;
     lock ml(rsm_mutex);
 
     bool r = true;
     lock ml(rsm_mutex);
 
@@ -131,22 +127,22 @@ void rsm::recovery() [[noreturn]] {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary, ml)) {
             // XXX iannucci 2013/09/15 -- I don't understand whether accessing
             // cfg->view_id in this manner involves a race.  I suspect not.
             if (join(primary, ml)) {
-                LOG("recovery: joined");
+                LOG << "joined";
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary?
+                std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
         vid_insync = vid_commit;
                 ml.lock();
             }
         }
         vid_insync = vid_commit;
-        LOG("recovery: sync vid_insync " << vid_insync);
+        LOG << "sync vid_insync " << vid_insync;
         if (primary == cfg->myaddr()) {
             r = sync_with_backups(ml);
         } else {
             r = sync_with_primary(ml);
         }
         if (primary == cfg->myaddr()) {
             r = sync_with_backups(ml);
         } else {
             r = sync_with_primary(ml);
         }
-        LOG("recovery: sync done");
+        LOG << "sync done";
 
         // If there was a commited viewchange during the synchronization, restart
         // the recovery
 
         // If there was a commited viewchange during the synchronization, restart
         // the recovery
@@ -158,7 +154,7 @@ void rsm::recovery() [[noreturn]] {
             myvs.seqno = 1;
             inviewchange = false;
         }
             myvs.seqno = 1;
             inviewchange = false;
         }
-        LOG("recovery: go to sleep " << insync << " " << inviewchange);
+        LOG << "go to sleep " << insync << " " << inviewchange;
         recovery_cond.wait(ml);
     }
 }
         recovery_cond.wait(ml);
     }
 }
@@ -180,8 +176,8 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
     cfg->get_view(vid_insync, backups);
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
     cfg->get_view(vid_insync, backups);
-    backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
-    LOG("backups " << backups);
+    backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
+    LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
     insync = false;
     return true;
     sync_cond.wait(rsm_mutex_lock);
     insync = false;
     return true;
@@ -208,26 +204,26 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
     rsm_protocol::transferres r;
     handle h(m);
     int ret = 0;
     rsm_protocol::transferres r;
     handle h(m);
     int ret = 0;
-    LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000),
+            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+        LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
         return false;
     }
     if (stf && last_myvs != r.last) {
         stf->unmarshal_state(r.state);
     }
     last_myvs = r.last;
         return false;
     }
     if (stf && last_myvs != r.last) {
         stf->unmarshal_state(r.state);
     }
     last_myvs = r.last;
-    LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
     return true;
 }
 
     return true;
 }
 
@@ -251,23 +247,23 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
     int ret = 0;
     string log;
 
     int ret = 0;
     string log;
 
-    LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")";
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
     rpcc *cl;
     {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), log,
+            ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret);
+        LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
         return false;
     }
         return false;
     }
-    LOG("succeeded " << log);
+    LOG << "succeeded " << log;
     cfg->restore(log);
     return true;
 }
     cfg->restore(log);
     return true;
 }
@@ -280,32 +276,37 @@ void rsm::commit_change(unsigned vid) {
     lock ml(rsm_mutex);
     commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
     lock ml(rsm_mutex);
     commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
 }
 
 void rsm::commit_change(unsigned vid, lock &) {
     if (vid <= vid_commit)
         return;
-    LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
-            last_myvs.seqno << ") " << primary << " insync " << insync);
+    LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << ","
+        << last_myvs.seqno << ") " << primary << " insync " << insync;
     vid_commit = vid;
     inviewchange = true;
     set_primary(vid);
     recovery_cond.notify_one();
     sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
     vid_commit = vid;
     inviewchange = true;
     set_primary(vid);
     recovery_cond.notify_one();
     sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 
 }
 
 
-void rsm::execute(int procno, const string & req, string & r) {
-    LOG("execute");
+void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
+    LOG << "execute";
     handler *h = procs[procno];
     VERIFY(h);
     handler *h = procs[procno];
     VERIFY(h);
-    unmarshall args(req, false);
     marshall rep;
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(args, rep);
-    r = marshall{ret, rep.content()}.content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
+    r = marshall(ret, rep.content()).content();
+}
+
+static void logHexString(locked_ostream && log, const string & s) {
+    log << std::setfill('0') << std::setw(2) << std::hex;
+    for (size_t i=0; i<s.size(); i++)
+        log << (unsigned int)(unsigned char)s[i];
 }
 
 //
 }
 
 //
@@ -314,22 +315,22 @@ void rsm::execute(int procno, const string & req, string & r) {
 // number, and invokes it on all members of the replicated state
 // machine.
 //
 // number, and invokes it on all members of the replicated state
 // machine.
 //
-rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) {
-    LOG("invoke procno 0x" << hex << procno);
+rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
+    LOG << "invoke procno 0x" << std::hex << procno;
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     viewstamp vs;
     {
         lock ml2(rsm_mutex);
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     viewstamp vs;
     {
         lock ml2(rsm_mutex);
-        LOG("Checking for inviewchange");
+        LOG << "Checking for inviewchange";
         if (inviewchange)
             return rsm_client_protocol::BUSY;
         if (inviewchange)
             return rsm_client_protocol::BUSY;
-        LOG("Checking for primacy");
+        LOG << "Checking for primacy";
         myaddr = cfg->myaddr();
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         myaddr = cfg->myaddr();
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
-        LOG("Assigning a viewstamp");
+        LOG << "Assigning a viewstamp";
         cfg->get_view(vid_commit, m);
         // assign the RPC the next viewstamp number
         vs = myvs;
         cfg->get_view(vid_commit, m);
         // assign the RPC the next viewstamp number
         vs = myvs;
@@ -337,26 +338,28 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str
     }
 
     // send an invoke RPC to all slaves in the current view with a timeout of 1 second
     }
 
     // send an invoke RPC to all slaves in the current view with a timeout of 1 second
-    LOG("Invoking slaves");
+    LOG << "Invoking slaves";
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
             handle h(m[i]);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
             handle h(m[i]);
-            LOG("Sending invoke to " << m[i]);
+            LOG << "Sending invoke to " << m[i];
             rpcc *cl = h.safebind();
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
             rpcc *cl = h.safebind();
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
-            LOG("Invoke returned " << ret);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+            LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
-            breakpoint1();
+            breakpoint(1);
             lock rsm_mutex_lock(rsm_mutex);
             partition1(rsm_mutex_lock);
         }
     }
             lock rsm_mutex_lock(rsm_mutex);
             partition1(rsm_mutex_lock);
         }
     }
+    logHexString(LOG, req);
     execute(procno, req, r);
     execute(procno, req, r);
+    logHexString(LOG, r);
     last_myvs = vs;
     return rsm_client_protocol::OK;
 }
     last_myvs = vs;
     return rsm_client_protocol::OK;
 }
@@ -368,27 +371,27 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) {
-    LOG("invoke procno 0x" << hex << proc);
+rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
+    LOG << "invoke procno 0x" << std::hex << proc;
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
         // check if !inviewchange
     lock ml(invoke_mutex);
     vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
         // check if !inviewchange
-        LOG("Checking for view change");
+        LOG << "Checking for view change";
         if (inviewchange)
             return rsm_protocol::ERR;
         // check if slave
         if (inviewchange)
             return rsm_protocol::ERR;
         // check if slave
-        LOG("Checking for slave status");
+        LOG << "Checking for slave status";
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
         cfg->get_view(vid_commit, m);
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
         cfg->get_view(vid_commit, m);
-        if (find(m.begin(), m.end(), myaddr) == m.end())
+        if (std::find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
             return rsm_protocol::ERR;
         // check sequence number
-        LOG("Checking sequence number");
+        LOG << "Checking sequence number";
         if (vs != myvs)
             return rsm_protocol::ERR;
         myvs++;
         if (vs != myvs)
             return rsm_protocol::ERR;
         myvs++;
@@ -396,18 +399,18 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & r
     string r;
     execute(proc, req, r);
     last_myvs = vs;
     string r;
     execute(proc, req, r);
     last_myvs = vs;
-    breakpoint1();
+    breakpoint(1);
     return rsm_protocol::OK;
 }
 
 //
 // RPC handler: Send back the local node's state to the caller
 //
     return rsm_protocol::OK;
 }
 
 //
 // RPC handler: Send back the local node's state to the caller
 //
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
-    LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
-            last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs ("
+        << last_myvs.vid << "," << last_myvs.seqno << ")";
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
@@ -424,7 +427,7 @@ rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid)
     lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
-    backups.erase(find(backups.begin(), backups.end(), m));
+    backups.erase(std::find(backups.begin(), backups.end(), m));
     if (backups.empty())
         sync_cond.notify_one();
     return rsm_protocol::OK;
     if (backups.empty())
         sync_cond.notify_one();
     return rsm_protocol::OK;
@@ -437,18 +440,18 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
-    LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" <<
-            last_myvs.vid << "," << last_myvs.seqno << ")");
+    LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=("
+        << last_myvs.vid << "," << last_myvs.seqno << ")";
     if (cfg->ismember(m, vid_commit)) {
     if (cfg->ismember(m, vid_commit)) {
-        LOG(m << " is still a member -- nothing to do");
+        LOG << m << " is still a member -- nothing to do";
         log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
         log = cfg->dump();
     } else if (cfg->myaddr() != primary) {
-        LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!");
+        LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!";
         ret = rsm_protocol::BUSY;
     } else {
         // We cache vid_commit to avoid adding m to a view which already contains
         // m due to race condition
         ret = rsm_protocol::BUSY;
     } else {
         // We cache vid_commit to avoid adding m to a view which already contains
         // m due to race condition
-        LOG("calling down to config layer");
+        LOG << "calling down to config layer";
         unsigned vid_cache = vid_commit;
         bool succ;
         {
         unsigned vid_cache = vid_commit;
         bool succ;
         {
@@ -458,9 +461,9 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
         }
         if (cfg->ismember(m, cfg->view_id())) {
             log = cfg->dump();
         }
         if (cfg->ismember(m, cfg->view_id())) {
             log = cfg->dump();
-            LOG("ret " << ret << " log " << log);
+            LOG << "ret " << ret << " log " << log;
         } else {
         } else {
-            LOG("failed; proposer couldn't add " << succ);
+            LOG << "failed; proposer couldn't add " << succ;
             ret = rsm_protocol::BUSY;
         }
     }
             ret = rsm_protocol::BUSY;
         }
     }
@@ -468,17 +471,16 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 }
 
 //
 }
 
 //
-// RPC handler: Send back all the nodes this local knows about to client
-// so the client can switch to a different primary
-// when it existing primary fails
+// RPC handler: Responds with the list of known nodes for fall-back on a
+// primary failure
 //
 //
-rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
     vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
     vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     r = m;
-    LOG("return " << m << " m " << primary);
+    LOG << "return " << m << " m " << primary;
     return rsm_client_protocol::OK;
 }
 
     return rsm_client_protocol::OK;
 }
 
@@ -492,7 +494,7 @@ void rsm::set_primary(unsigned vid) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
-        LOG("set_primary: primary stays " << primary);
+        LOG << "primary stays " << primary;
         return;
     }
 
         return;
     }
 
@@ -500,7 +502,7 @@ void rsm::set_primary(unsigned vid) {
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
     for (unsigned i = 0; i < p.size(); i++) {
         if (isamember(p[i], c)) {
             primary = p[i];
-            LOG("set_primary: primary is " << primary);
+            LOG << "primary is " << primary;
             return;
         }
     }
             return;
         }
     }
@@ -513,51 +515,39 @@ bool rsm::amiprimary() {
 }
 
 
 }
 
 
-// Testing server
-
-// Simulate partitions
+// Test RPCs -- simulate partitions and failures
 
 
-// assumes caller holds rsm_mutex
-void rsm::net_repair(bool heal, lock &) {
+void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
+    VERIFY(rsm_mutex_lock);
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
             handle h(m[i]);
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
             handle h(m[i]);
-            LOG("member " << m[i] << " " << heal);
+            LOG << "member " << m[i] << " " << heal;
             if (h.safebind()) h.safebind()->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);
 }
 
             if (h.safebind()) h.safebind()->set_reachable(heal);
         }
     }
     rsmrpc->set_reachable(heal);
 }
 
-rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
     lock ml(rsm_mutex);
     lock ml(rsm_mutex);
-    LOG("heal " << heal << " (dopartition " <<
-            dopartition << ", partitioned " << partitioned << ")");
-    if (heal) {
+    LOG << "heal " << heal << " (dopartition "
+        << dopartition << ", partitioned " << partitioned << ")";
+    if (heal)
         net_repair(heal, ml);
         net_repair(heal, ml);
-        partitioned = false;
-    } else {
+    else
         dopartition = true;
         dopartition = true;
-        partitioned = false;
-    }
-    r = rsm_test_protocol::OK;
-    return r;
+    partitioned = false;
+    return r = rsm_test_protocol::OK;
 }
 
 // simulate failure at breakpoint 1 and 2
 
 }
 
 // simulate failure at breakpoint 1 and 2
 
-void rsm::breakpoint1() {
-    if (break1) {
-        LOG("Dying at breakpoint 1 in rsm!");
-        exit(1);
-    }
-}
-
-void rsm::breakpoint2() {
-    if (break2) {
-        LOG("Dying at breakpoint 2 in rsm!");
+void rsm::breakpoint(int b) {
+    if (breakpoints[b-1]) {
+        LOG << "Dying at breakpoint " << b << " in rsm!";
         exit(1);
     }
 }
         exit(1);
     }
 }
@@ -570,12 +560,12 @@ void rsm::partition1(lock & rsm_mutex_lock) {
     }
 }
 
     }
 }
 
-rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
-    LOG("breakpoint " << b);
-    if (b == 1) break1 = true;
-    else if (b == 2) break2 = true;
+    LOG << "breakpoint " << b;
+    if (b == 1) breakpoints[1-1] = true;
+    else if (b == 2) breakpoints[2-1] = true;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
     else r = rsm_test_protocol::ERR;
     return r;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
     else r = rsm_test_protocol::ERR;
     return r;