X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..a4175b2e216a20b86cc872dea8a08005c60617a5:/rsm.cc?ds=sidebyside diff --git a/rsm.cc b/rsm.cc index 7664aa1..478e0e0 100644 --- a/rsm.cc +++ b/rsm.cc @@ -80,24 +80,22 @@ #include #include +#include +#include +#include #include "handle.h" #include "rsm.h" #include "tprintf.h" #include "lang/verify.h" #include "rsm_client.h" - -static void *recoverythread(void *x) { - rsm *r = (rsm *) x; - r->recovery(); - return 0; -} +#include "lock.h" rsm::rsm(std::string _first, std::string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - pthread_t th; + std::thread th; last_myvs.vid = 0; last_myvs.seqno = 0; @@ -125,29 +123,32 @@ rsm::rsm(std::string _first, std::string _me) : testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); { - ScopedLock ml(rsm_mutex); - VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0); + lock ml(rsm_mutex); + th = std::thread(&rsm::recovery, this); } } void rsm::reg1(int proc, handler *h) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); procs[proc] = h; } // The recovery thread runs this function void rsm::recovery() { bool r = true; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); while (1) { while (!cfg->ismember(cfg->myaddr(), vid_commit)) { + // XXX iannucci 2013/09/15 -- I don't understand whether accessing + // cfg->view_id in this manner involves a race. I suspect not. if (join(primary)) { tprintf("recovery: joined\n"); - commit_change_wo(cfg->vid()); + commit_change_wo(cfg->view_id()); } else { - ScopedUnlock su(rsm_mutex); - sleep (30); // XXX make another node in cfg primary? + ml.unlock(); + std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? + ml.lock(); } } vid_insync = vid_commit; @@ -170,7 +171,7 @@ void rsm::recovery() { inviewchange = false; } tprintf("recovery: go to sleep %d %d\n", insync, inviewchange); - recovery_cond.wait(rsm_mutex); + recovery_cond.wait(ml); } } @@ -187,24 +188,26 @@ std::ostream & operator<<(std::ostream &o, const std::vector &d) { } bool rsm::sync_with_backups() { + adopt_lock ml(rsm_mutex); + ml.unlock(); { - ScopedUnlock su(rsm_mutex); // Make sure that the state of lock_server_cache_rsm is stable during // synchronization; otherwise, the primary's state may be more recent // than replicas after the synchronization. - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); // By acquiring and releasing the invoke_mutex once, we make sure that // the state of lock_server_cache_rsm will not be changed until all // replicas are synchronized. The reason is that client_invoke arrives // after this point of time will see inviewchange == true, and returns // BUSY. } + ml.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; - backups = std::vector(cfg->get_view(vid_insync)); + cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); LOG("rsm::sync_with_backups " << backups); - sync_cond.wait(rsm_mutex); + sync_cond.wait(ml); insync = false; return true; } @@ -234,12 +237,14 @@ bool rsm::statetransfer(std::string m) m.c_str(), last_myvs.vid, last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl) { ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), last_myvs, vid_insync, r, rpcc::to(1000)); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), @@ -256,16 +261,18 @@ bool rsm::statetransfer(std::string m) } bool rsm::statetransferdone(std::string m) { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); handle h(m); rpcc *cl = h.safebind(); - if (!cl) - return false; - int r; - rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); - if (ret != rsm_protocol::OK) - return false; - return true; + bool done = false; + if (cl) { + int r; + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); + done = (ret == rsm_protocol::OK); + } + ml.lock(); + return done; } @@ -278,12 +285,14 @@ bool rsm::join(std::string m) { last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl != 0) { ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, r, rpcc::to(120000)); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { @@ -301,7 +310,7 @@ bool rsm::join(std::string m) { * completed a view change */ void rsm::commit_change(unsigned vid) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); commit_change_wo(vid); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); @@ -315,8 +324,8 @@ void rsm::commit_change_wo(unsigned vid) { vid_commit = vid; inviewchange = true; set_primary(vid); - recovery_cond.signal(); - sync_cond.signal(); + recovery_cond.notify_one(); + sync_cond.notify_one(); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); } @@ -344,12 +353,12 @@ void rsm::execute(int procno, std::string req, std::string &r) { // rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { LOG("rsm::client_invoke: procno 0x" << std::hex << procno); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; viewstamp vs; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); LOG("Checking for inviewchange"); if (inviewchange) return rsm_client_protocol::BUSY; @@ -358,7 +367,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; LOG("Assigning a viewstamp"); - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); // assign the RPC the next viewstamp number vs = myvs; myvs++; @@ -398,11 +407,11 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { LOG("rsm::invoke: procno 0x" << std::hex << proc); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); // check if !inviewchange LOG("Checking for view change"); if (inviewchange) @@ -412,7 +421,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); if (find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number @@ -433,7 +442,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d */ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, rsm_protocol::transferres &r) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); int ret = rsm_protocol::OK; tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); @@ -451,12 +460,12 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned * for view vid */ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; backups.erase(find(backups.begin(), backups.end(), m)); if (backups.empty()) - sync_cond.signal(); + sync_cond.notify_one(); return rsm_protocol::OK; } @@ -466,7 +475,7 @@ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { int ret = rsm_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); if (cfg->ismember(m, vid_commit)) { @@ -481,10 +490,11 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j unsigned vid_cache = vid_commit; bool succ; { - ScopedUnlock su(rsm_mutex); + ml.unlock(); succ = cfg->add(m, vid_cache); + ml.lock(); } - if (cfg->ismember(m, cfg->vid())) { + if (cfg->ismember(m, cfg->view_id())) { r.log = cfg->dump(); tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); } else { @@ -502,8 +512,8 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j */ rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { std::vector m; - ScopedLock ml(rsm_mutex); - m = cfg->get_view(vid_commit); + lock ml(rsm_mutex); + cfg->get_view(vid_commit, m); m.push_back(primary); r = m; tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), @@ -515,8 +525,9 @@ rsm_client_protocol::status rsm::client_members(int i, std::vector // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - std::vector c = cfg->get_view(vid); - std::vector p = cfg->get_view(vid - 1); + std::vector c, p; + cfg->get_view(vid, c); + cfg->get_view(vid - 1, p); VERIFY (c.size() > 0); if (isamember(primary,c)) { @@ -536,7 +547,7 @@ void rsm::set_primary(unsigned vid) { } bool rsm::amiprimary() { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); return primary == cfg->myaddr() && !inviewchange; } @@ -548,7 +559,7 @@ bool rsm::amiprimary() { // assumes caller holds rsm_mutex void rsm::net_repair_wo(bool heal) { std::vector m; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); @@ -560,7 +571,7 @@ void rsm::net_repair_wo(bool heal) { } rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", heal, dopartition, partitioned); if (heal) { @@ -600,7 +611,7 @@ void rsm::partition1() { rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { r = rsm_test_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("rsm::breakpointreq: %d\n", b); if (b == 1) break1 = true; else if (b == 2) break2 = true;