X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/26ade07ab0e62b98b452fbbd18edba0450035e35..c06ef44e7af1571710fd31dd0ab068dd77b1eb2d:/rsm.cc?ds=sidebyside diff --git a/rsm.cc b/rsm.cc index 5812b33..711f0ad 100644 --- a/rsm.cc +++ b/rsm.cc @@ -79,10 +79,11 @@ // upcalls, but can keep its locks when calling down. #include "rsm.h" -#include "handle.h" #include "rsm_client.h" #include +using std::vector; + rsm_state_transfer::~rsm_state_transfer() {} rsm::rsm(const string & _first, const string & _me) : primary(_first) @@ -103,7 +104,7 @@ rsm::rsm(const string & _first, const string & _me) : primary(_first) rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1)); + testsvr.reset(new rpcs((in_port_t)std::stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); } @@ -129,7 +130,7 @@ void rsm::recovery() { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - this_thread::sleep_for(seconds(3)); // XXX make another node in cfg primary? + std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary? ml.lock(); } } @@ -200,13 +201,12 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) { bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; - handle h(m); int ret = 0; LOG << "contact " << m << " w. my last_myvs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; - rpcc *cl; + shared_ptr cl; { rsm_mutex_lock.unlock(); - cl = h.safebind(); + cl = rpcc::bind_cached(m); if (cl) { ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100), r, cfg->myaddr(), last_myvs, vid_insync); @@ -227,10 +227,8 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); - handle h(m); - rpcc *cl = h.safebind(); bool done = false; - if (cl) { + if (auto cl = rpcc::bind_cached(m)) { int r; auto ret = (rsm_protocol::status)cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); done = (ret == rsm_protocol::OK); @@ -241,21 +239,16 @@ bool rsm::statetransferdone(const string & m, lock & rsm_mutex_lock) { bool rsm::join(const string & m, lock & rsm_mutex_lock) { - handle h(m); int ret = 0; string log; LOG << "contacting " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")"; - rpcc *cl; - { - rsm_mutex_lock.unlock(); - cl = h.safebind(); - if (cl != 0) { - ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, - cfg->myaddr(), last_myvs); - } - rsm_mutex_lock.lock(); - } + + rsm_mutex_lock.unlock(); + auto cl = rpcc::bind_cached(m); + if (cl) + ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs); + rsm_mutex_lock.lock(); if (cl == 0 || ret != rsm_protocol::OK) { LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret; @@ -340,9 +333,8 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id for (unsigned i = 0; i < m.size(); i++) { if (m[i] != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY - handle h(m[i]); LOG << "Sending invoke to " << m[i]; - rpcc *cl = h.safebind(); + auto cl = rpcc::bind_cached(m[i]); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; @@ -521,9 +513,9 @@ void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { - handle h(m[i]); LOG << "member " << m[i] << " " << heal; - if (h.safebind()) h.safebind()->set_reachable(heal); + if (auto cl = rpcc::bind_cached(m[i])) + cl->set_reachable(heal); } } rsmrpc->set_reachable(heal);