X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/f0dcb6b97d6d40f67698d1f71ac26970f1776f82..3efa02fb3a9f0a0566a7f3c99a1efb94e30ea4a6:/config.cc diff --git a/config.cc b/config.cc index 7df1bbc..a177362 100644 --- a/config.cc +++ b/config.cc @@ -1,5 +1,6 @@ #include "config.h" -#include "handle.h" + +using std::vector; // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with @@ -33,6 +34,8 @@ // all views, the other nodes can bring this re-joined node up to // date. +config_view_change::~config_view_change() {} + config::config(const string & _first, const string & _me, config_view_change *_vc) : my_view_id(0), first(_first), me(_me), vc(_vc), paxos(this, me == _first, me, me) @@ -57,7 +60,7 @@ void config::get_view(unsigned instance, vector & m) { void config::get_view(unsigned instance, vector & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); string value = paxos.value(instance); - LOG("get_view(" << instance << "): returns " << value); + LOG << "get_view(" << instance << "): returns " << value; m = explode(value); } @@ -66,7 +69,7 @@ void config::reconstruct(lock & cfg_mutex_lock) { my_view_id = paxos.instance(); if (my_view_id > 0) { get_view(my_view_id, mems, cfg_mutex_lock); - LOG("view " << my_view_id << " " << mems); + LOG << "view " << my_view_id << " " << mems; } } @@ -75,13 +78,13 @@ void config::paxos_commit(unsigned instance, const string & value) { lock cfg_mutex_lock(cfg_mutex); vector newmem = explode(value); - LOG("instance " << instance << ": " << newmem); + LOG << "instance " << instance << ": " << newmem; for (auto mem : mems) { - LOG("is " << mem << " still a member?"); + LOG << "is " << mem << " still a member?"; if (!isamember(mem, newmem) && me != mem) { - LOG("delete " << mem); - handle(mem).invalidate(); + LOG << "delete " << mem; + rpcc::unbind_cached(mem); } } @@ -103,28 +106,28 @@ bool config::ismember(const string & m, unsigned vid) { bool config::add(const string & new_m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); - LOG("adding " << new_m << " to " << vid); + LOG << "adding " << new_m << " to " << vid; if (vid != my_view_id) { - LOG("that's not my view id, " << my_view_id << "!"); + LOG << "that's not my view id, " << my_view_id << "!"; return false; } - LOG("calling down to paxos layer"); + LOG << "calling down to paxos layer"; vector m(mems), cmems(mems); m.push_back(new_m); - LOG("old mems " << cmems << " " << implode(cmems)); - LOG("new mems " << m << " " << implode(m)); + LOG << "old mems " << cmems << " " << implode(cmems); + LOG << "new mems " << m << " " << implode(m); unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, implode(m)); cfg_mutex_lock.lock(); - LOG("paxos proposer returned " << (r ? "success" : "failure")); + LOG << "paxos proposer returned " << (r ? "success" : "failure"); return r; } // caller should hold cfg_mutex bool config::remove(const string & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); - LOG("my_view_id " << my_view_id << " remove? " << m); + LOG << "my_view_id " << my_view_id << " remove? " << m; vector n; for (auto mem : mems) { if (mem != m) @@ -135,7 +138,7 @@ bool config::remove(const string & m, lock & cfg_mutex_lock) { cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, implode(n)); cfg_mutex_lock.lock(); - LOG("proposer returned " << (r ? "success" : "failure")); + LOG << "proposer returned " << (r ? "success" : "failure"); return r; } @@ -144,21 +147,21 @@ void config::heartbeater() { while (1) { auto next_timeout = steady_clock::now() + milliseconds(300); - LOG("go to sleep"); + LOG << "go to sleep"; config_cond.wait_until(cfg_mutex_lock, next_timeout); unsigned vid = my_view_id; vector cmems; get_view(vid, cmems, cfg_mutex_lock); - LOG("current membership " << cmems); + LOG << "current membership " << cmems; if (!isamember(me, cmems)) { - LOG("not member yet; skip hearbeat"); + LOG << "not member yet; skip hearbeat"; continue; } // who has the smallest ID? - string m = min(me, *min_element(cmems.begin(), cmems.end())); + string m = std::min(me, *std::min_element(cmems.begin(), cmems.end())); if (m == me) { // ping the other nodes @@ -180,7 +183,7 @@ void config::heartbeater() { paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; - LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); + LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id; if (vid == my_view_id) return paxos_protocol::OK; else if (paxos.isrunning()) { @@ -193,12 +196,11 @@ paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); unsigned vid = my_view_id; - LOG("heartbeat to " << m << " (" << vid << ")"); - handle h(m); + LOG << "heartbeat to " << m << " (" << vid << ")"; cfg_mutex_lock.unlock(); int r = 0, ret = rpc_protocol::bind_failure; - if (rpcc *cl = h.safebind()) + if (auto cl = rpcc::bind_cached(m)) ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); cfg_mutex_lock.lock(); @@ -208,12 +210,12 @@ config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) break; case rpc_protocol::atmostonce_failure: case rpc_protocol::oldsrv_failure: - h.invalidate(); + rpcc::unbind_cached(m); break; default: - LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r; res = (ret < 0) ? FAILURE : VIEWERR; } - LOG("done " << res); + LOG << "done " << res; return res; }