X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/ebd5aef6dc92accb509b1cc67eaf72159f35cdfa..eb3d5c6416c0f0d1cad35e52af3231de7866fea8:/rsm.cc diff --git a/rsm.cc b/rsm.cc index de37b5d..672243c 100644 --- a/rsm.cc +++ b/rsm.cc @@ -125,7 +125,7 @@ void rsm::recovery() { // XXX iannucci 2013/09/15 -- I don't understand whether accessing // cfg->view_id in this manner involves a race. I suspect not. if (join(primary, ml)) { - LOG("joined"); + LOG << "joined"; commit_change(cfg->view_id(), ml); } else { ml.unlock(); @@ -134,13 +134,13 @@ void rsm::recovery() { } } vid_insync = vid_commit; - LOG("sync vid_insync " << vid_insync); + LOG << "sync vid_insync " << vid_insync; if (primary == cfg->myaddr()) { r = sync_with_backups(ml); } else { r = sync_with_primary(ml); } - LOG("sync done"); + LOG << "sync done"; // If there was a commited viewchange during the synchronization, restart // the recovery @@ -152,7 +152,7 @@ void rsm::recovery() { myvs.seqno = 1; inviewchange = false; } - LOG("go to sleep " << insync << " " << inviewchange); + LOG << "go to sleep " << insync << " " << inviewchange; recovery_cond.wait(ml); } } @@ -175,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); - LOG("backups " << backups); + LOG << "backups " << backups; sync_cond.wait(rsm_mutex_lock); insync = false; return true; @@ -202,7 +202,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_protocol::transferres r; handle h(m); int ret = 0; - LOG("contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; rpcc *cl; { rsm_mutex_lock.unlock(); @@ -214,14 +214,14 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; return false; } if (stf && last_myvs != r.last) { stf->unmarshal_state(r.state); } last_myvs = r.last; - LOG("transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; return true; } @@ -245,7 +245,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { int ret = 0; string log; - LOG("contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"; rpcc *cl; { rsm_mutex_lock.unlock(); @@ -258,10 +258,10 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("couldn't reach " << m << " " << hex << cl << " " << dec << ret); + LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; return false; } - LOG("succeeded " << log); + LOG << "succeeded " << log; cfg->restore(log); return true; } @@ -280,8 +280,8 @@ void rsm::commit_change(unsigned vid) { void rsm::commit_change(unsigned vid, lock &) { if (vid <= vid_commit) return; - LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," << - last_myvs.seqno << ") " << primary << " insync " << insync); + LOG << "new view (" << vid << ") last vs (" << last_myvs.vid << "," + << last_myvs.seqno << ") " << primary << " insync " << insync; vid_commit = vid; inviewchange = true; set_primary(vid); @@ -293,7 +293,7 @@ void rsm::commit_change(unsigned vid, lock &) { void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) { - LOG("execute"); + LOG << "execute"; handler *h = procs[procno]; VERIFY(h); marshall rep; @@ -308,21 +308,21 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r // machine. // rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) { - LOG("invoke procno 0x" << hex << procno); + LOG << "invoke procno 0x" << hex << procno; lock ml(invoke_mutex); vector m; string myaddr; viewstamp vs; { lock ml2(rsm_mutex); - LOG("Checking for inviewchange"); + LOG << "Checking for inviewchange"; if (inviewchange) return rsm_client_protocol::BUSY; - LOG("Checking for primacy"); + LOG << "Checking for primacy"; myaddr = cfg->myaddr(); if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; - LOG("Assigning a viewstamp"); + LOG << "Assigning a viewstamp"; cfg->get_view(vid_commit, m); // assign the RPC the next viewstamp number vs = myvs; @@ -330,18 +330,18 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id } // send an invoke RPC to all slaves in the current view with a timeout of 1 second - LOG("Invoking slaves"); + LOG << "Invoking slaves"; for (unsigned i = 0; i < m.size(); i++) { if (m[i] != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY handle h(m[i]); - LOG("Sending invoke to " << m[i]); + LOG << "Sending invoke to " << m[i]; rpcc *cl = h.safebind(); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req); - LOG("Invoke returned " << ret); + LOG << "Invoke returned " << ret; if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; breakpoint(1); @@ -349,15 +349,17 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id partition1(rsm_mutex_lock); } } - LOG(setfill('0') << setw(2) << hex; + { + auto && log = LOG << setfill('0') << setw(2) << hex; for (size_t i=0; i m; string myaddr; { lock ml2(rsm_mutex); // check if !inviewchange - LOG("Checking for view change"); + LOG << "Checking for view change"; if (inviewchange) return rsm_protocol::ERR; // check if slave - LOG("Checking for slave status"); + LOG << "Checking for slave status"; myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; @@ -389,7 +391,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp if (find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number - LOG("Checking sequence number"); + LOG << "Checking sequence number"; if (vs != myvs) return rsm_protocol::ERR; myvs++; @@ -407,8 +409,8 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); - LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << - last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" + << last_myvs.vid << "," << last_myvs.seqno << ")"; if (!insync || vid != vid_insync) return rsm_protocol::BUSY; if (stf && last != last_myvs) @@ -438,18 +440,18 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last auto ret = rsm_protocol::OK; lock ml(rsm_mutex); - LOG("join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" << - last_myvs.vid << "," << last_myvs.seqno << ")"); + LOG << "join request from " << m << "; last=(" << last.vid << "," << last.seqno << "), mylast=(" + << last_myvs.vid << "," << last_myvs.seqno << ")"; if (cfg->ismember(m, vid_commit)) { - LOG(m << " is still a member -- nothing to do"); + LOG << m << " is still a member -- nothing to do"; log = cfg->dump(); } else if (cfg->myaddr() != primary) { - LOG("but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!"); + LOG << "but I, " << cfg->myaddr() << ", am not the primary, " << primary << "!"; ret = rsm_protocol::BUSY; } else { // We cache vid_commit to avoid adding m to a view which already contains // m due to race condition - LOG("calling down to config layer"); + LOG << "calling down to config layer"; unsigned vid_cache = vid_commit; bool succ; { @@ -459,9 +461,9 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last } if (cfg->ismember(m, cfg->view_id())) { log = cfg->dump(); - LOG("ret " << ret << " log " << log); + LOG << "ret " << ret << " log " << log; } else { - LOG("failed; proposer couldn't add " << succ); + LOG << "failed; proposer couldn't add " << succ; ret = rsm_protocol::BUSY; } } @@ -478,7 +480,7 @@ rsm_client_protocol::status rsm::client_members(vector & r, int) { cfg->get_view(vid_commit, m); m.push_back(primary); r = m; - LOG("return " << m << " m " << primary); + LOG << "return " << m << " m " << primary; return rsm_client_protocol::OK; } @@ -492,7 +494,7 @@ void rsm::set_primary(unsigned vid) { VERIFY (c.size() > 0); if (isamember(primary,c)) { - LOG("primary stays " << primary); + LOG << "primary stays " << primary; return; } @@ -500,7 +502,7 @@ void rsm::set_primary(unsigned vid) { for (unsigned i = 0; i < p.size(); i++) { if (isamember(p[i], c)) { primary = p[i]; - LOG("primary is " << primary); + LOG << "primary is " << primary; return; } } @@ -522,7 +524,7 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); - LOG("member " << m[i] << " " << heal); + LOG << "member " << m[i] << " " << heal; if (h.safebind()) h.safebind()->set_reachable(heal); } } @@ -531,8 +533,8 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) { lock ml(rsm_mutex); - LOG("heal " << heal << " (dopartition " << - dopartition << ", partitioned " << partitioned << ")"); + LOG << "heal " << heal << " (dopartition " + << dopartition << ", partitioned " << partitioned << ")"; if (heal) net_repair(heal, ml); else @@ -545,7 +547,7 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, void rsm::breakpoint(int b) { if (breakpoints[b-1]) { - LOG("Dying at breakpoint " << b << " in rsm!"); + LOG << "Dying at breakpoint " << b << " in rsm!"; exit(1); } } @@ -561,7 +563,7 @@ void rsm::partition1(lock & rsm_mutex_lock) { rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); - LOG("breakpoint " << b); + LOG << "breakpoint " << b; if (b == 1) breakpoints[1-1] = true; else if (b == 2) breakpoints[2-1] = true; else if (b == 3 || b == 4) cfg->breakpoint(b);