X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/a4175b2e216a20b86cc872dea8a08005c60617a5..3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b:/config.cc diff --git a/config.cc b/config.cc index 5127cb2..35654d8 100644 --- a/config.cc +++ b/config.cc @@ -1,12 +1,5 @@ -#include -#include -#include -#include #include "config.h" -#include "paxos.h" #include "handle.h" -#include "tprintf.h" -#include "lang/verify.h" // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with @@ -40,281 +33,192 @@ // all views, the other nodes can bring this re-joined node up to // date. -config::config( - const std::string &_first, - const std::string &_me, - config_view_change *_vc) - : my_view_id(0), first(_first), me(_me), vc(_vc) +config::config(const string &_first, const string &_me, config_view_change *_vc) + : my_view_id(0), first(_first), me(_me), vc(_vc), + paxos(this, me == _first, me, me) { - paxos_acceptor = new acceptor(this, me == _first, me, me); - paxos_proposer = new proposer(this, paxos_acceptor, me); - - // XXX hack; maybe should have its own port number - paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat); - - { - lock ml(cfg_mutex); - reconstruct(); - std::thread(&config::heartbeater, this).detach(); - } + get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); + lock cfg_mutex_lock(cfg_mutex); + reconstruct(cfg_mutex_lock); + thread(&config::heartbeater, this).detach(); } -void -config::restore(const std::string &s) -{ - lock ml(cfg_mutex); - paxos_acceptor->restore(s); - reconstruct(); +void config::restore(const string &s) { + lock cfg_mutex_lock(cfg_mutex); + paxos.restore(s); + reconstruct(cfg_mutex_lock); } -void -config::get_view(unsigned instance, std::vector &m) -{ - lock ml(cfg_mutex); - get_view_wo(instance, m); +void config::get_view(unsigned instance, vector &m) { + lock cfg_mutex_lock(cfg_mutex); + get_view(instance, m, cfg_mutex_lock); } -// caller should hold cfg_mutex -void -config::get_view_wo(unsigned instance, std::vector &m) -{ - std::string value = paxos_acceptor->value(instance); - tprintf("get_view(%d): returns %s\n", instance, value.c_str()); - members(value, m); +void config::get_view(unsigned instance, vector &m, lock &) { + string value = paxos.value(instance); + LOG("get_view(" << instance << "): returns " << value); + m = members(value); } -void -config::members(const std::string &value, std::vector &view) const -{ - std::istringstream ist(value); - std::string m; - view.clear(); - while (ist >> m) { - view.push_back(m); - } +vector config::members(const string &value) const { + return explode(value); } -std::string -config::value(const std::vector &m) const -{ - std::ostringstream ost; - for (unsigned i = 0; i < m.size(); i++) { - ost << m[i]; - ost << " "; - } - return ost.str(); +string config::value(const vector &members) const { + return implode(members); } -// caller should hold cfg_mutex -void -config::reconstruct() -{ - if (paxos_acceptor->instance() > 0) { - std::string m; - my_view_id = paxos_acceptor->instance(); - get_view_wo(my_view_id, mems); - tprintf("config::reconstruct: %d %s\n", - my_view_id, print_members(mems).c_str()); +void config::reconstruct(lock &cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); + my_view_id = paxos.instance(); + if (my_view_id > 0) { + get_view(my_view_id, mems, cfg_mutex_lock); + LOG("view " << my_view_id << " " << mems); } } // Called by Paxos's acceptor. -void -config::paxos_commit(unsigned instance, const std::string &value) -{ - std::string m; - std::vector newmem; - lock ml(cfg_mutex); +void config::paxos_commit(unsigned instance, const string &value) { + lock cfg_mutex_lock(cfg_mutex); - members(value, newmem); - tprintf("config::paxos_commit: %d: %s\n", instance, - print_members(newmem).c_str()); + vector newmem = members(value); + LOG("instance " << instance << ": " << newmem); - for (unsigned i = 0; i < mems.size(); i++) { - tprintf("config::paxos_commit: is %s still a member?\n", - mems[i].c_str()); - if (!isamember(mems[i], newmem) && me != mems[i]) { - tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); - mgr.delete_handle(mems[i]); + for (auto mem : mems) { + LOG("is " << mem << " still a member?"); + if (!isamember(mem, newmem) && me != mem) { + LOG("delete " << mem); + invalidate_handle(mem); } } mems = newmem; my_view_id = instance; if (vc) { - ml.unlock(); + cfg_mutex_lock.unlock(); vc->commit_change(instance); - ml.lock(); + cfg_mutex_lock.lock(); } } -bool -config::ismember(const std::string &m, unsigned vid) -{ - lock ml(cfg_mutex); - std::vector v; - get_view_wo(vid, v); +bool config::ismember(const string &m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); + vector v; + get_view(vid, v, cfg_mutex_lock); return isamember(m, v); } -bool -config::add(const std::string &new_m, unsigned vid) -{ - std::vector m; - std::vector curm; - lock ml(cfg_mutex); - if (vid != my_view_id) +bool config::add(const string &new_m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); + LOG("adding " << new_m << " to " << vid); + if (vid != my_view_id) { + LOG("that's not my view id, " << my_view_id << "!"); return false; - tprintf("config::add %s\n", new_m.c_str()); - m = mems; - m.push_back(new_m); - curm = mems; - std::string v = value(m); - int nextvid = my_view_id + 1; - bool r; - { - ml.unlock(); - r = paxos_proposer->run(nextvid, curm, v); - ml.lock(); } - tprintf("config::add: proposer returned %s\n", - r ? "success" : "failure"); + LOG("calling down to paxos layer"); + vector m(mems), cmems(mems); + m.push_back(new_m); + LOG("old mems " << cmems << " " << value(cmems)); + LOG("new mems " << m << " " << value(m)); + unsigned nextvid = my_view_id + 1; + cfg_mutex_lock.unlock(); + bool r = paxos.run(nextvid, cmems, value(m)); + cfg_mutex_lock.lock(); + LOG("paxos proposer returned " << (r ? "success" : "failure")); return r; } // caller should hold cfg_mutex -bool -config::remove(const std::string &m) -{ - adopt_lock ml(cfg_mutex); - tprintf("config::remove: my_view_id %d remove? %s\n", - my_view_id, m.c_str()); - std::vector n; - for (unsigned i = 0; i < mems.size(); i++) { - if (mems[i] != m) - n.push_back(mems[i]); - } - std::string v = value(n); - std::vector cmems = mems; - int nextvid = my_view_id + 1; - bool r; - { - ml.unlock(); - r = paxos_proposer->run(nextvid, cmems, v); - ml.lock(); +bool config::remove(const string &m, lock &cfg_mutex_lock) { + LOG("my_view_id " << my_view_id << " remove? " << m); + vector n; + for (auto mem : mems) { + if (mem != m) + n.push_back(mem); } - tprintf("config::remove: proposer returned %s\n", - r ? "success" : "failure"); + vector cmems = mems; + unsigned nextvid = my_view_id + 1; + cfg_mutex_lock.unlock(); + bool r = paxos.run(nextvid, cmems, value(n)); + cfg_mutex_lock.lock(); + LOG("proposer returned " << (r ? "success" : "failure")); return r; } -void -config::heartbeater() -{ - std::string m; - heartbeat_t h; - bool stable; - unsigned vid; - std::vector cmems; - lock ml(cfg_mutex); +void config::heartbeater() [[noreturn]] { + lock cfg_mutex_lock(cfg_mutex); while (1) { - auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3); - tprintf("heartbeater: go to sleep\n"); - config_cond.wait_until(ml, next_timeout); + auto next_timeout = steady_clock::now() + milliseconds(300); + LOG("go to sleep"); + config_cond.wait_until(cfg_mutex_lock, next_timeout); - stable = true; - vid = my_view_id; - get_view_wo(vid, cmems); - tprintf("heartbeater: current membership %s\n", - print_members(cmems).c_str()); + unsigned vid = my_view_id; + vector cmems; + get_view(vid, cmems, cfg_mutex_lock); + LOG("current membership " << cmems); if (!isamember(me, cmems)) { - tprintf("heartbeater: not member yet; skip hearbeat\n"); + LOG("not member yet; skip hearbeat"); continue; } // who has the smallest ID? - m = me; - for (unsigned i = 0; i < cmems.size(); i++) { - if (m > cmems[i]) - m = cmems[i]; - } + string m = min(me, *min_element(cmems.begin(), cmems.end())); if (m == me) { // ping the other nodes - for (unsigned i = 0; i < cmems.size(); i++) { - if (cmems[i] != me) { - if ((h = doheartbeat(cmems[i])) != OK) { - stable = false; - m = cmems[i]; - break; - } - } + for (string mem : cmems) { + if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK) + continue; + if (vid == my_view_id) + remove(mem, cfg_mutex_lock); + break; } } else { // ping the node with the smallest ID - if ((h = doheartbeat(m)) != OK) - stable = false; - } - - if (!stable && vid == my_view_id) { - remove(m); + if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id) + remove(m, cfg_mutex_lock); } } } -paxos_protocol::status -config::heartbeat(std::string m, unsigned vid, int &r) -{ - lock ml(cfg_mutex); - int ret = paxos_protocol::ERR; +paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; - tprintf("heartbeat from %s(%d) my_view_id %d\n", - m.c_str(), vid, my_view_id); - if (vid == my_view_id) { - ret = paxos_protocol::OK; - } else if (paxos_proposer->isrunning()) { + LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); + if (vid == my_view_id) + return paxos_protocol::OK; + else if (paxos.isrunning()) { VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); - ret = paxos_protocol::OK; - } else { - ret = paxos_protocol::ERR; + return paxos_protocol::OK; } - return ret; + return paxos_protocol::ERR; } -config::heartbeat_t -config::doheartbeat(const std::string &m) -{ - adopt_lock ml(cfg_mutex); - int ret = rpc_const::timeout_failure; - int r; +config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { unsigned vid = my_view_id; - heartbeat_t res = OK; - - tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + LOG("heartbeat to " << m << " (" << vid << ")"); handle h(m); - { - ml.unlock(); - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call(paxos_protocol::heartbeat, me, vid, r, - rpcc::to(1000)); - } - ml.lock(); - } - if (ret != paxos_protocol::OK) { - if (ret == rpc_const::atmostonce_failure || - ret == rpc_const::oldsrv_failure) { - mgr.delete_handle(m); - } else { - tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", - m.c_str(), ret, vid, r); - if (ret < 0) res = FAILURE; - else res = VIEWERR; - } + + cfg_mutex_lock.unlock(); + int r = 0, ret = rpc_const::bind_failure; + if (rpcc *cl = h.safebind()) + ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); + cfg_mutex_lock.lock(); + + heartbeat_t res = OK; + switch (ret) { + case paxos_protocol::OK: + break; + case rpc_const::atmostonce_failure: + case rpc_const::oldsrv_failure: + invalidate_handle(m); + break; + default: + LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + res = (ret < 0) ? FAILURE : VIEWERR; } - tprintf("doheartbeat done %d\n", res); + LOG("done " << res); return res; } -