X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/4e881433f37417ccbda89c09ffdf936855d462d4..f0dcb6b97d6d40f67698d1f71ac26970f1776f82:/rsm.cc diff --git a/rsm.cc b/rsm.cc index 956f45d..c766145 100644 --- a/rsm.cc +++ b/rsm.cc @@ -83,9 +83,7 @@ #include "rsm_client.h" #include -rsm::rsm(const string & _first, const string & _me) : - stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), - partitioned (false), dopartition(false), break1(false), break2(false) +rsm::rsm(const string & _first, const string & _me) : primary(_first) { cfg = unique_ptr(new config(_first, _me, this)); @@ -103,7 +101,7 @@ rsm::rsm(const string & _first, const string & _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = unique_ptr(new rpcs((in_port_t)stoi(_me) + 1)); + testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); } @@ -115,11 +113,6 @@ void rsm::start() { thread(&rsm::recovery, this).detach(); } -void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) { - lock ml(rsm_mutex); - procs[proc] = h; -} - // The recovery thread runs this function void rsm::recovery() { bool r = true; @@ -279,7 +272,7 @@ void rsm::commit_change(unsigned vid) { lock ml(rsm_mutex); commit_change(vid, ml); if (cfg->ismember(cfg->myaddr(), vid_commit)) - breakpoint2(); + breakpoint(2); } void rsm::commit_change(unsigned vid, lock &) { @@ -293,7 +286,7 @@ void rsm::commit_change(unsigned vid, lock &) { recovery_cond.notify_one(); sync_cond.notify_one(); if (cfg->ismember(cfg->myaddr(), vid_commit)) - breakpoint2(); + breakpoint(2); } @@ -301,10 +294,9 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r LOG("execute"); handler *h = procs[procno]; VERIFY(h); - unmarshall args(req, false); marshall rep; - auto ret = (rsm_protocol::status)(*h)(args, rep); - r = marshall{ret, rep.content()}.content(); + auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep); + r = marshall(ret, rep.content()).content(); } // @@ -350,7 +342,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; - breakpoint1(); + breakpoint(1); lock rsm_mutex_lock(rsm_mutex); partition1(rsm_mutex_lock); } @@ -398,14 +390,14 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp string r; execute(proc, req, r); last_myvs = vs; - breakpoint1(); + breakpoint(1); return rsm_protocol::OK; } // // RPC handler: Send back the local node's state to the caller // -rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src, +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << @@ -473,7 +465,7 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last // RPC handler: Responds with the list of known nodes for fall-back on a // primary failure // -rsm_client_protocol::status rsm::client_members(vector &r, int) { +rsm_client_protocol::status rsm::client_members(vector & r, int) { vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); @@ -516,7 +508,8 @@ bool rsm::amiprimary() { // Test RPCs -- simulate partitions and failures -void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { +void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { + VERIFY(rsm_mutex_lock); vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { @@ -529,7 +522,7 @@ void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { rsmrpc->set_reachable(heal); } -rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) { +rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) { lock ml(rsm_mutex); LOG("heal " << heal << " (dopartition " << dopartition << ", partitioned " << partitioned << ")"); @@ -543,16 +536,9 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, // simulate failure at breakpoint 1 and 2 -void rsm::breakpoint1() { - if (break1) { - LOG("Dying at breakpoint 1 in rsm!"); - exit(1); - } -} - -void rsm::breakpoint2() { - if (break2) { - LOG("Dying at breakpoint 2 in rsm!"); +void rsm::breakpoint(int b) { + if (breakpoints[b-1]) { + LOG("Dying at breakpoint " << b << " in rsm!"); exit(1); } } @@ -565,12 +551,12 @@ void rsm::partition1(lock & rsm_mutex_lock) { } } -rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) { +rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); LOG("breakpoint " << b); - if (b == 1) break1 = true; - else if (b == 2) break2 = true; + if (b == 1) breakpoints[1-1] = true; + else if (b == 2) breakpoints[2-1] = true; else if (b == 3 || b == 4) cfg->breakpoint(b); else r = rsm_test_protocol::ERR; return r;