X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b..4e881433f37417ccbda89c09ffdf936855d462d4:/rsm.cc diff --git a/rsm.cc b/rsm.cc index 54713cb..956f45d 100644 --- a/rsm.cc +++ b/rsm.cc @@ -78,19 +78,16 @@ // The rule is that a module releases its internal locks before it // upcalls, but can keep its locks when calling down. -#include -#include - -#include "types.h" -#include "handle.h" #include "rsm.h" +#include "handle.h" #include "rsm_client.h" +#include rsm::rsm(const string & _first, const string & _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - cfg = new config(_first, _me, this); + cfg = unique_ptr(new config(_first, _me, this)); if (_first == _me) { // Commit the first view here. We can not have acceptor::acceptor @@ -106,23 +103,25 @@ rsm::rsm(const string & _first, const string & _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = new rpcs((in_port_t)stoi(_me) + 1); + testsvr = unique_ptr(new rpcs((in_port_t)stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); +} - { - lock ml(rsm_mutex); - thread(&rsm::recovery, this).detach(); - } +void rsm::start() { + lock ml(rsm_mutex); + rsmrpc->start(); + testsvr->start(); + thread(&rsm::recovery, this).detach(); } -void rsm::reg1(int proc, handler *h) { +void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) { lock ml(rsm_mutex); procs[proc] = h; } // The recovery thread runs this function -void rsm::recovery() [[noreturn]] { +void rsm::recovery() { bool r = true; lock ml(rsm_mutex); @@ -131,7 +130,7 @@ void rsm::recovery() [[noreturn]] { // XXX iannucci 2013/09/15 -- I don't understand whether accessing // cfg->view_id in this manner involves a race. I suspect not. if (join(primary, ml)) { - LOG("recovery: joined"); + LOG("joined"); commit_change(cfg->view_id(), ml); } else { ml.unlock(); @@ -140,13 +139,13 @@ void rsm::recovery() [[noreturn]] { } } vid_insync = vid_commit; - LOG("recovery: sync vid_insync " << vid_insync); + LOG("sync vid_insync " << vid_insync); if (primary == cfg->myaddr()) { r = sync_with_backups(ml); } else { r = sync_with_primary(ml); } - LOG("recovery: sync done"); + LOG("sync done"); // If there was a commited viewchange during the synchronization, restart // the recovery @@ -158,7 +157,7 @@ void rsm::recovery() [[noreturn]] { myvs.seqno = 1; inviewchange = false; } - LOG("recovery: go to sleep " << insync << " " << inviewchange); + LOG("go to sleep " << insync << " " << inviewchange); recovery_cond.wait(ml); } } @@ -286,7 +285,7 @@ void rsm::commit_change(unsigned vid) { void rsm::commit_change(unsigned vid, lock &) { if (vid <= vid_commit) return; - LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," << + LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," << last_myvs.seqno << ") " << primary << " insync " << insync); vid_commit = vid; inviewchange = true; @@ -298,7 +297,7 @@ void rsm::commit_change(unsigned vid, lock &) { } -void rsm::execute(int procno, const string & req, string & r) { +void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) { LOG("execute"); handler *h = procs[procno]; VERIFY(h); @@ -314,7 +313,7 @@ void rsm::execute(int procno, const string & req, string & r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) { +rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) { LOG("invoke procno 0x" << hex << procno); lock ml(invoke_mutex); vector m; @@ -357,6 +356,9 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str } } execute(procno, req, r); + for (size_t i=0; i m; @@ -468,9 +470,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last } // -// RPC handler: Send back all the nodes this local knows about to client -// so the client can switch to a different primary -// when it existing primary fails +// RPC handler: Responds with the list of known nodes for fall-back on a +// primary failure // rsm_client_protocol::status rsm::client_members(vector &r, int) { vector m; @@ -492,7 +493,7 @@ void rsm::set_primary(unsigned vid) { VERIFY (c.size() > 0); if (isamember(primary,c)) { - LOG("set_primary: primary stays " << primary); + LOG("primary stays " << primary); return; } @@ -500,7 +501,7 @@ void rsm::set_primary(unsigned vid) { for (unsigned i = 0; i < p.size(); i++) { if (isamember(p[i], c)) { primary = p[i]; - LOG("set_primary: primary is " << primary); + LOG("primary is " << primary); return; } } @@ -513,12 +514,9 @@ bool rsm::amiprimary() { } -// Testing server +// Test RPCs -- simulate partitions and failures -// Simulate partitions - -// assumes caller holds rsm_mutex -void rsm::net_repair(bool heal, lock &) { +void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { @@ -535,15 +533,12 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, lock ml(rsm_mutex); LOG("heal " << heal << " (dopartition " << dopartition << ", partitioned " << partitioned << ")"); - if (heal) { + if (heal) net_repair(heal, ml); - partitioned = false; - } else { + else dopartition = true; - partitioned = false; - } - r = rsm_test_protocol::OK; - return r; + partitioned = false; + return r = rsm_test_protocol::OK; } // simulate failure at breakpoint 1 and 2