X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/02967a43024ce81912cd1ec96a800397457f8066..ab9eee5d7f1fbe7a3fe6229d4a78136efb14371b:/rsm.cc diff --git a/rsm.cc b/rsm.cc index c651e86..7104a09 100644 --- a/rsm.cc +++ b/rsm.cc @@ -78,11 +78,12 @@ // The rule is that a module releases its internal locks before it // upcalls, but can keep its locks when calling down. -#include "rsm.h" -#include "rsm_client.h" +#include "include/rsm.h" +#include "include/rsm_client.h" #include using std::vector; +using namespace std::chrono; rsm_state_transfer::~rsm_state_transfer() {} @@ -130,7 +131,7 @@ void rsm::recovery() { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary? + std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary? ml.lock(); } } @@ -174,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { rsm_mutex_lock.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; - cfg->get_view(vid_insync, backups); + backups = cfg->get_view(vid_insync); backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr())); LOG << "backups " << backups; sync_cond.wait(rsm_mutex_lock); @@ -208,7 +209,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.unlock(); cl = rpcc::bind_cached(m); if (cl) { - ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100), + ret = cl->call_timeout(rsm_protocol::transferreq, 100ms, r, cfg->myaddr(), last_myvs, vid_insync); } rsm_mutex_lock.lock(); @@ -218,7 +219,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) return false; } if (stf && last_myvs != r.last) { - stf->unmarshal_state(r.state); + stf->unmarshall_state(r.state); } last_myvs = r.last; LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; @@ -247,7 +248,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); auto cl = rpcc::bind_cached(m); if (cl) - ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs); + ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs); rsm_mutex_lock.lock(); if (cl == 0 || ret != rsm_protocol::OK) { @@ -290,8 +291,8 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r handler *h = procs[procno]; VERIFY(h); marshall rep; - auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep); - r = marshall(ret, rep.content()).content(); + auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep); + r = marshall(ret, rep); } static void logHexString(locked_ostream && log, const string & s) { @@ -322,23 +323,23 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; LOG << "Assigning a viewstamp"; - cfg->get_view(vid_commit, m); + m = cfg->get_view(vid_commit); // assign the RPC the next viewstamp number vs = myvs; myvs++; } - // send an invoke RPC to all slaves in the current view with a timeout of 1 second + // send an invoke RPC to all slaves in the current view with a timeout LOG << "Invoking slaves"; - for (unsigned i = 0; i < m.size(); i++) { - if (m[i] != myaddr) { + for (auto & mm : m) { + if (mm != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY - LOG << "Sending invoke to " << m[i]; - auto cl = rpcc::bind_cached(m[i]); + LOG << "Sending invoke to " << mm; + auto cl = rpcc::bind_cached(mm); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; - auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req); + auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req); LOG << "Invoke returned " << ret; if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; @@ -364,7 +365,6 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) { LOG << "invoke procno 0x" << std::hex << proc; lock ml(invoke_mutex); - vector m; string myaddr; { lock ml2(rsm_mutex); @@ -377,7 +377,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; - cfg->get_view(vid_commit, m); + vector m = cfg->get_view(vid_commit); if (std::find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number @@ -404,7 +404,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const strin if (!insync || vid != vid_insync) return rsm_protocol::BUSY; if (stf && last != last_myvs) - r.state = stf->marshal_state(); + r.state = stf->marshall_state(); r.last = last_myvs; return rsm_protocol::OK; } @@ -465,9 +465,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last // primary failure // rsm_client_protocol::status rsm::client_members(vector & r, int) { - vector m; lock ml(rsm_mutex); - cfg->get_view(vid_commit, m); + vector m = cfg->get_view(vid_commit); m.push_back(primary); r = m; LOG << "return " << m << " m " << primary; @@ -478,9 +477,7 @@ rsm_client_protocol::status rsm::client_members(vector & r, int) { // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - vector c, p; - cfg->get_view(vid, c); - cfg->get_view(vid - 1, p); + vector c = cfg->get_view(vid), p = cfg->get_view(vid - 1); VERIFY (c.size() > 0); if (isamember(primary,c)) { @@ -489,9 +486,9 @@ void rsm::set_primary(unsigned vid) { } VERIFY(p.size() > 0); - for (unsigned i = 0; i < p.size(); i++) { - if (isamember(p[i], c)) { - primary = p[i]; + for (auto & pp : p) { + if (isamember(pp, c)) { + primary = pp; LOG << "primary is " << primary; return; } @@ -509,12 +506,10 @@ bool rsm::amiprimary() { void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { VERIFY(rsm_mutex_lock); - vector m; - cfg->get_view(vid_commit, m); - for (unsigned i = 0; i < m.size(); i++) { - if (m[i] != cfg->myaddr()) { - LOG << "member " << m[i] << " " << heal; - if (auto cl = rpcc::bind_cached(m[i])) + for (auto & mm : cfg->get_view(vid_commit)) { + if (mm != cfg->myaddr()) { + LOG << "member " << mm << " " << heal; + if (auto cl = rpcc::bind_cached(mm)) cl->set_reachable(heal); } }