X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/61809b48ade4c21b1b01931d520aa2abc7507032..4b9798f44ae94deabf87dd534337b55259272950:/rsm.cc diff --git a/rsm.cc b/rsm.cc index 6841e3d..66c8d97 100644 --- a/rsm.cc +++ b/rsm.cc @@ -89,19 +89,12 @@ #include "tprintf.h" #include "lang/verify.h" #include "rsm_client.h" - -static void *recoverythread(void *x) { - rsm *r = (rsm *) x; - r->recovery(); - return 0; -} +#include "lock.h" rsm::rsm(std::string _first, std::string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { - pthread_t th; - last_myvs.vid = 0; last_myvs.seqno = 0; myvs = last_myvs; @@ -115,42 +108,45 @@ rsm::rsm(std::string _first, std::string _me) : commit_change(1); } rsmrpc = cfg->get_rpcs(); - rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke); - rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members); - rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke); - rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq); - rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq); - rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq); + rsmrpc->reg(rsm_client_protocol::invoke, &rsm::client_invoke, this); + rsmrpc->reg(rsm_client_protocol::members, &rsm::client_members, this); + rsmrpc->reg(rsm_protocol::invoke, &rsm::invoke, this); + rsmrpc->reg(rsm_protocol::transferreq, &rsm::transferreq, this); + rsmrpc->reg(rsm_protocol::transferdonereq, &rsm::transferdonereq, this); + rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself testsvr = new rpcs(atoi(_me.c_str()) + 1); - testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq); - testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); + testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); + testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); { - ScopedLock ml(rsm_mutex); - VERIFY(pthread_create(&th, NULL, &recoverythread, (void *) this) == 0); + lock ml(rsm_mutex); + std::thread(&rsm::recovery, this).detach(); } } void rsm::reg1(int proc, handler *h) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); procs[proc] = h; } // The recovery thread runs this function void rsm::recovery() { bool r = true; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); while (1) { while (!cfg->ismember(cfg->myaddr(), vid_commit)) { + // XXX iannucci 2013/09/15 -- I don't understand whether accessing + // cfg->view_id in this manner involves a race. I suspect not. if (join(primary)) { tprintf("recovery: joined\n"); - commit_change_wo(cfg->vid()); + commit_change_wo(cfg->view_id()); } else { - ScopedUnlock su(rsm_mutex); - sleep (30); // XXX make another node in cfg primary? + ml.unlock(); + std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? + ml.lock(); } } vid_insync = vid_commit; @@ -173,7 +169,7 @@ void rsm::recovery() { inviewchange = false; } tprintf("recovery: go to sleep %d %d\n", insync, inviewchange); - recovery_cond.wait(rsm_mutex); + recovery_cond.wait(ml); } } @@ -190,24 +186,26 @@ std::ostream & operator<<(std::ostream &o, const std::vector &d) { } bool rsm::sync_with_backups() { + adopt_lock ml(rsm_mutex); + ml.unlock(); { - ScopedUnlock su(rsm_mutex); // Make sure that the state of lock_server_cache_rsm is stable during // synchronization; otherwise, the primary's state may be more recent // than replicas after the synchronization. - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); // By acquiring and releasing the invoke_mutex once, we make sure that // the state of lock_server_cache_rsm will not be changed until all // replicas are synchronized. The reason is that client_invoke arrives // after this point of time will see inviewchange == true, and returns // BUSY. } + ml.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; - backups = std::vector(cfg->get_view(vid_insync)); + cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); LOG("rsm::sync_with_backups " << backups); - sync_cond.wait(rsm_mutex); + sync_cond.wait(ml); insync = false; return true; } @@ -232,17 +230,19 @@ bool rsm::statetransfer(std::string m) { rsm_protocol::transferres r; handle h(m); - int ret; + int ret = 0; tprintf("rsm::statetransfer: contact %s w. my last_myvs(%d,%d)\n", m.c_str(), last_myvs.vid, last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl) { - ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), - last_myvs, vid_insync, r, rpcc::to(1000)); + ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000), + r, cfg->myaddr(), last_myvs, vid_insync); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { tprintf("rsm::statetransfer: couldn't reach %s %lx %d\n", m.c_str(), @@ -259,34 +259,38 @@ bool rsm::statetransfer(std::string m) } bool rsm::statetransferdone(std::string m) { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); handle h(m); rpcc *cl = h.safebind(); - if (!cl) - return false; - int r; - rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); - if (ret != rsm_protocol::OK) - return false; - return true; + bool done = false; + if (cl) { + int r; + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); + done = (ret == rsm_protocol::OK); + } + ml.lock(); + return done; } bool rsm::join(std::string m) { handle h(m); - int ret; + int ret = 0; rsm_protocol::joinres r; tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid, last_myvs.seqno); rpcc *cl; { - ScopedUnlock su(rsm_mutex); + adopt_lock ml(rsm_mutex); + ml.unlock(); cl = h.safebind(); if (cl != 0) { - ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, - r, rpcc::to(120000)); + ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r, + cfg->myaddr(), last_myvs); } + ml.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { @@ -304,7 +308,7 @@ bool rsm::join(std::string m) { * completed a view change */ void rsm::commit_change(unsigned vid) { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); commit_change_wo(vid); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); @@ -318,8 +322,8 @@ void rsm::commit_change_wo(unsigned vid) { vid_commit = vid; inviewchange = true; set_primary(vid); - recovery_cond.signal(); - sync_cond.signal(); + recovery_cond.notify_one(); + sync_cond.notify_one(); if (cfg->ismember(cfg->myaddr(), vid_commit)) breakpoint2(); } @@ -332,7 +336,7 @@ void rsm::execute(int procno, std::string req, std::string &r) { unmarshall args(req); marshall rep; std::string reps; - rsm_protocol::status ret = h->fn(args, rep); + rsm_protocol::status ret = (*h)(args, rep); marshall rep1; rep1 << ret; rep1 << rep.str(); @@ -345,14 +349,14 @@ void rsm::execute(int procno, std::string req, std::string &r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { +rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) { LOG("rsm::client_invoke: procno 0x" << std::hex << procno); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; viewstamp vs; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); LOG("Checking for inviewchange"); if (inviewchange) return rsm_client_protocol::BUSY; @@ -361,7 +365,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; LOG("Assigning a viewstamp"); - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); // assign the RPC the next viewstamp number vs = myvs; myvs++; @@ -379,7 +383,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: return rsm_client_protocol::BUSY; rsm_protocol::status ret; int r; - ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000)); + ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), r, procno, vs, req); LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; @@ -399,13 +403,13 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { +rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) { LOG("rsm::invoke: procno 0x" << std::hex << proc); - ScopedLock ml(invoke_mutex); + lock ml(invoke_mutex); std::vector m; std::string myaddr; { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); // check if !inviewchange LOG("Checking for view change"); if (inviewchange) @@ -415,7 +419,7 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); if (find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number @@ -434,9 +438,9 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d /** * RPC handler: Send back the local node's state to the caller */ -rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, - rsm_protocol::transferres &r) { - ScopedLock ml(rsm_mutex); +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src, + viewstamp last, unsigned vid) { + lock ml(rsm_mutex); int ret = rsm_protocol::OK; tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); @@ -453,23 +457,23 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned * RPC handler: Inform the local node (the primary) that node m has synchronized * for view vid */ -rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { - ScopedLock ml(rsm_mutex); +rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { + lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; backups.erase(find(backups.begin(), backups.end(), m)); if (backups.empty()) - sync_cond.signal(); + sync_cond.notify_one(); return rsm_protocol::OK; } // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { +rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) { int ret = rsm_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("joinreq: src %s last (%d,%d) mylast (%d,%d)\n", m.c_str(), last.vid, last.seqno, last_myvs.vid, last_myvs.seqno); if (cfg->ismember(m, vid_commit)) { @@ -484,10 +488,11 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j unsigned vid_cache = vid_commit; bool succ; { - ScopedUnlock su(rsm_mutex); + ml.unlock(); succ = cfg->add(m, vid_cache); + ml.lock(); } - if (cfg->ismember(m, cfg->vid())) { + if (cfg->ismember(m, cfg->view_id())) { r.log = cfg->dump(); tprintf("joinreq: ret %d log %s\n:", ret, r.log.c_str()); } else { @@ -503,10 +508,10 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j * so the client can switch to a different primary * when it existing primary fails */ -rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { +rsm_client_protocol::status rsm::client_members(std::vector &r, int i) { std::vector m; - ScopedLock ml(rsm_mutex); - m = cfg->get_view(vid_commit); + lock ml(rsm_mutex); + cfg->get_view(vid_commit, m); m.push_back(primary); r = m; tprintf("rsm::client_members return %s m %s\n", print_members(m).c_str(), @@ -518,8 +523,9 @@ rsm_client_protocol::status rsm::client_members(int i, std::vector // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - std::vector c = cfg->get_view(vid); - std::vector p = cfg->get_view(vid - 1); + std::vector c, p; + cfg->get_view(vid, c); + cfg->get_view(vid - 1, p); VERIFY (c.size() > 0); if (isamember(primary,c)) { @@ -539,7 +545,7 @@ void rsm::set_primary(unsigned vid) { } bool rsm::amiprimary() { - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); return primary == cfg->myaddr() && !inviewchange; } @@ -551,7 +557,7 @@ bool rsm::amiprimary() { // assumes caller holds rsm_mutex void rsm::net_repair_wo(bool heal) { std::vector m; - m = cfg->get_view(vid_commit); + cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { handle h(m[i]); @@ -562,8 +568,8 @@ void rsm::net_repair_wo(bool heal) { rsmrpc->set_reachable(heal); } -rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { - ScopedLock ml(rsm_mutex); +rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) { + lock ml(rsm_mutex); tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", heal, dopartition, partitioned); if (heal) { @@ -601,9 +607,9 @@ void rsm::partition1() { } } -rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { +rsm_test_protocol::status rsm::breakpointreq(int &r, int b) { r = rsm_test_protocol::OK; - ScopedLock ml(rsm_mutex); + lock ml(rsm_mutex); tprintf("rsm::breakpointreq: %d\n", b); if (b == 1) break1 = true; else if (b == 2) break2 = true;