X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/603bac8fcb3697f283e6537d81b4a92e457ebbad..f0dcb6b97d6d40f67698d1f71ac26970f1776f82:/config.cc diff --git a/config.cc b/config.cc index 5373007..7df1bbc 100644 --- a/config.cc +++ b/config.cc @@ -33,7 +33,7 @@ // all views, the other nodes can bring this re-joined node up to // date. -config::config(const string &_first, const string &_me, config_view_change *_vc) +config::config(const string & _first, const string & _me, config_view_change *_vc) : my_view_id(0), first(_first), me(_me), vc(_vc), paxos(this, me == _first, me, me) { @@ -43,32 +43,25 @@ config::config(const string &_first, const string &_me, config_view_change *_vc) thread(&config::heartbeater, this).detach(); } -void config::restore(const string &s) { +void config::restore(const string & s) { lock cfg_mutex_lock(cfg_mutex); paxos.restore(s); reconstruct(cfg_mutex_lock); } -void config::get_view(unsigned instance, vector &m) { +void config::get_view(unsigned instance, vector & m) { lock cfg_mutex_lock(cfg_mutex); get_view(instance, m, cfg_mutex_lock); } -void config::get_view(unsigned instance, vector &m, lock &) { +void config::get_view(unsigned instance, vector & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); string value = paxos.value(instance); LOG("get_view(" << instance << "): returns " << value); - m = members(value); -} - -vector config::members(const string &value) const { - return explode(value); + m = explode(value); } -string config::value(const vector &members) const { - return implode(members); -} - -void config::reconstruct(lock &cfg_mutex_lock) { +void config::reconstruct(lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); my_view_id = paxos.instance(); if (my_view_id > 0) { @@ -78,17 +71,17 @@ void config::reconstruct(lock &cfg_mutex_lock) { } // Called by Paxos's acceptor. -void config::paxos_commit(unsigned instance, const string &value) { +void config::paxos_commit(unsigned instance, const string & value) { lock cfg_mutex_lock(cfg_mutex); - vector newmem = members(value); + vector newmem = explode(value); LOG("instance " << instance << ": " << newmem); for (auto mem : mems) { LOG("is " << mem << " still a member?"); if (!isamember(mem, newmem) && me != mem) { LOG("delete " << mem); - invalidate_handle(mem); + handle(mem).invalidate(); } } @@ -101,14 +94,14 @@ void config::paxos_commit(unsigned instance, const string &value) { } } -bool config::ismember(const string &m, unsigned vid) { +bool config::ismember(const string & m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); vector v; get_view(vid, v, cfg_mutex_lock); return isamember(m, v); } -bool config::add(const string &new_m, unsigned vid) { +bool config::add(const string & new_m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); LOG("adding " << new_m << " to " << vid); if (vid != my_view_id) { @@ -118,18 +111,19 @@ bool config::add(const string &new_m, unsigned vid) { LOG("calling down to paxos layer"); vector m(mems), cmems(mems); m.push_back(new_m); - LOG("old mems " << cmems << " " << value(cmems)); - LOG("new mems " << m << " " << value(m)); + LOG("old mems " << cmems << " " << implode(cmems)); + LOG("new mems " << m << " " << implode(m)); unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos.run(nextvid, cmems, value(m)); + bool r = paxos.run(nextvid, cmems, implode(m)); cfg_mutex_lock.lock(); LOG("paxos proposer returned " << (r ? "success" : "failure")); return r; } // caller should hold cfg_mutex -bool config::remove(const string &m, lock &cfg_mutex_lock) { +bool config::remove(const string & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); LOG("my_view_id " << my_view_id << " remove? " << m); vector n; for (auto mem : mems) { @@ -139,13 +133,13 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) { vector cmems = mems; unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos.run(nextvid, cmems, value(n)); + bool r = paxos.run(nextvid, cmems, implode(n)); cfg_mutex_lock.lock(); LOG("proposer returned " << (r ? "success" : "failure")); return r; } -void config::heartbeater() [[noreturn]] { +void config::heartbeater() { lock cfg_mutex_lock(cfg_mutex); while (1) { @@ -183,7 +177,7 @@ void config::heartbeater() [[noreturn]] { } } -paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { +paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); @@ -196,24 +190,25 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { return paxos_protocol::ERR; } -config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { +config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); unsigned vid = my_view_id; LOG("heartbeat to " << m << " (" << vid << ")"); handle h(m); cfg_mutex_lock.unlock(); - int r = 0, ret = rpc_const::bind_failure; + int r = 0, ret = rpc_protocol::bind_failure; if (rpcc *cl = h.safebind()) - ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(100), r, me, vid); + ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); cfg_mutex_lock.lock(); heartbeat_t res = OK; switch (ret) { case paxos_protocol::OK: break; - case rpc_const::atmostonce_failure: - case rpc_const::oldsrv_failure: - invalidate_handle(m); + case rpc_protocol::atmostonce_failure: + case rpc_protocol::oldsrv_failure: + h.invalidate(); break; default: LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);