X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b:/config.cc diff --git a/config.cc b/config.cc index 0f9ab4c..35654d8 100644 --- a/config.cc +++ b/config.cc @@ -1,11 +1,5 @@ -#include -#include -#include #include "config.h" -#include "paxos.h" #include "handle.h" -#include "tprintf.h" -#include "lang/verify.h" // The config module maintains views. As a node joins or leaves a // view, the next view will be the same as previous view, except with @@ -39,294 +33,192 @@ // all views, the other nodes can bring this re-joined node up to // date. -static void * -heartbeatthread(void *x) +config::config(const string &_first, const string &_me, config_view_change *_vc) + : my_view_id(0), first(_first), me(_me), vc(_vc), + paxos(this, me == _first, me, me) { - config *r = (config *) x; - r->heartbeater(); - return 0; + get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); + lock cfg_mutex_lock(cfg_mutex); + reconstruct(cfg_mutex_lock); + thread(&config::heartbeater, this).detach(); } -config::config(std::string _first, std::string _me, config_view_change *_vc) - : myvid (0), first (_first), me (_me), vc (_vc) -{ - VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0); - VERIFY(pthread_cond_init(&config_cond, NULL) == 0); - - std::ostringstream ost; - ost << me; - - acc = new acceptor(this, me == _first, me, ost.str()); - pro = new proposer(this, acc, me); - - // XXX hack; maybe should have its own port number - pxsrpc = acc->get_rpcs(); - pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat); - - { - ScopedLock ml(&cfg_mutex); - - reconstruct(); - - pthread_t th; - VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0); - } +void config::restore(const string &s) { + lock cfg_mutex_lock(cfg_mutex); + paxos.restore(s); + reconstruct(cfg_mutex_lock); } -void -config::restore(std::string s) -{ - ScopedLock ml(&cfg_mutex); - acc->restore(s); - reconstruct(); +void config::get_view(unsigned instance, vector &m) { + lock cfg_mutex_lock(cfg_mutex); + get_view(instance, m, cfg_mutex_lock); } -std::vector -config::get_view(unsigned instance) -{ - ScopedLock ml(&cfg_mutex); - return get_view_wo(instance); +void config::get_view(unsigned instance, vector &m, lock &) { + string value = paxos.value(instance); + LOG("get_view(" << instance << "): returns " << value); + m = members(value); } -// caller should hold cfg_mutex -std::vector -config::get_view_wo(unsigned instance) -{ - std::string value = acc->value(instance); - tprintf("get_view(%d): returns %s\n", instance, value.c_str()); - return members(value); +vector config::members(const string &value) const { + return explode(value); } -std::vector -config::members(std::string value) -{ - std::istringstream ist(value); - std::string m; - std::vector view; - while (ist >> m) { - view.push_back(m); - } - return view; +string config::value(const vector &members) const { + return implode(members); } -std::string -config::value(std::vector m) -{ - std::ostringstream ost; - for (unsigned i = 0; i < m.size(); i++) { - ost << m[i]; - ost << " "; - } - return ost.str(); -} - -// caller should hold cfg_mutex -void -config::reconstruct() -{ - if (acc->instance() > 0) { - std::string m; - myvid = acc->instance(); - mems = get_view_wo(myvid); - tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str()); - } +void config::reconstruct(lock &cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); + my_view_id = paxos.instance(); + if (my_view_id > 0) { + get_view(my_view_id, mems, cfg_mutex_lock); + LOG("view " << my_view_id << " " << mems); + } } // Called by Paxos's acceptor. -void -config::paxos_commit(unsigned instance, std::string value) -{ - std::string m; - std::vector newmem; - ScopedLock ml(&cfg_mutex); - - newmem = members(value); - tprintf("config::paxos_commit: %d: %s\n", instance, - print_members(newmem).c_str()); - - for (unsigned i = 0; i < mems.size(); i++) { - tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str()); - if (!isamember(mems[i], newmem) && me != mems[i]) { - tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); - mgr.delete_handle(mems[i]); +void config::paxos_commit(unsigned instance, const string &value) { + lock cfg_mutex_lock(cfg_mutex); + + vector newmem = members(value); + LOG("instance " << instance << ": " << newmem); + + for (auto mem : mems) { + LOG("is " << mem << " still a member?"); + if (!isamember(mem, newmem) && me != mem) { + LOG("delete " << mem); + invalidate_handle(mem); + } } - } - mems = newmem; - myvid = instance; - if (vc) { - unsigned vid = myvid; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - vc->commit_change(vid); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - } + mems = newmem; + my_view_id = instance; + if (vc) { + cfg_mutex_lock.unlock(); + vc->commit_change(instance); + cfg_mutex_lock.lock(); + } } -bool -config::ismember(std::string m, unsigned vid) -{ - bool r; - ScopedLock ml(&cfg_mutex); - std::vector v = get_view_wo(vid); - r = isamember(m, v); - return r; +bool config::ismember(const string &m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); + vector v; + get_view(vid, v, cfg_mutex_lock); + return isamember(m, v); } -bool -config::add(std::string new_m, unsigned vid) -{ - std::vector m; - std::vector curm; - ScopedLock ml(&cfg_mutex); - if (vid != myvid) - return false; - tprintf("config::add %s\n", new_m.c_str()); - m = mems; - m.push_back(new_m); - curm = mems; - std::string v = value(m); - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, curm, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::add: proposer returned success\n"); - } else { - tprintf("config::add: proposer returned failure\n"); - } - return r; +bool config::add(const string &new_m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); + LOG("adding " << new_m << " to " << vid); + if (vid != my_view_id) { + LOG("that's not my view id, " << my_view_id << "!"); + return false; + } + LOG("calling down to paxos layer"); + vector m(mems), cmems(mems); + m.push_back(new_m); + LOG("old mems " << cmems << " " << value(cmems)); + LOG("new mems " << m << " " << value(m)); + unsigned nextvid = my_view_id + 1; + cfg_mutex_lock.unlock(); + bool r = paxos.run(nextvid, cmems, value(m)); + cfg_mutex_lock.lock(); + LOG("paxos proposer returned " << (r ? "success" : "failure")); + return r; } // caller should hold cfg_mutex -bool -config::remove_wo(std::string m) -{ - tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str()); - std::vector n; - for (unsigned i = 0; i < mems.size(); i++) { - if (mems[i] != m) n.push_back(mems[i]); - } - std::string v = value(n); - std::vector cmems = mems; - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, cmems, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::remove: proposer returned success\n"); - } else { - tprintf("config::remove: proposer returned failure\n"); - } - return r; -} - -void -config::heartbeater() -{ - struct timeval now; - struct timespec next_timeout; - std::string m; - heartbeat_t h; - bool stable; - unsigned vid; - std::vector cmems; - ScopedLock ml(&cfg_mutex); - - while (1) { - - gettimeofday(&now, NULL); - next_timeout.tv_sec = now.tv_sec + 3; - next_timeout.tv_nsec = 0; - tprintf("heartbeater: go to sleep\n"); - pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout); - - stable = true; - vid = myvid; - cmems = get_view_wo(vid); - tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str()); - - if (!isamember(me, cmems)) { - tprintf("heartbeater: not member yet; skip hearbeat\n"); - continue; - } - - //find the node with the smallest id - m = me; - for (unsigned i = 0; i < cmems.size(); i++) { - if (m > cmems[i]) - m = cmems[i]; - } - - if (m == me) { - //if i am the one with smallest id, ping the rest of the nodes - for (unsigned i = 0; i < cmems.size(); i++) { - if (cmems[i] != me) { - if ((h = doheartbeat(cmems[i])) != OK) { - stable = false; - m = cmems[i]; - break; - } - } - } - } else { - //the rest of the nodes ping the one with smallest id - if ((h = doheartbeat(m)) != OK) - stable = false; +bool config::remove(const string &m, lock &cfg_mutex_lock) { + LOG("my_view_id " << my_view_id << " remove? " << m); + vector n; + for (auto mem : mems) { + if (mem != m) + n.push_back(mem); } + vector cmems = mems; + unsigned nextvid = my_view_id + 1; + cfg_mutex_lock.unlock(); + bool r = paxos.run(nextvid, cmems, value(n)); + cfg_mutex_lock.lock(); + LOG("proposer returned " << (r ? "success" : "failure")); + return r; +} - if (!stable && vid == myvid) { - remove_wo(m); +void config::heartbeater() [[noreturn]] { + lock cfg_mutex_lock(cfg_mutex); + + while (1) { + auto next_timeout = steady_clock::now() + milliseconds(300); + LOG("go to sleep"); + config_cond.wait_until(cfg_mutex_lock, next_timeout); + + unsigned vid = my_view_id; + vector cmems; + get_view(vid, cmems, cfg_mutex_lock); + LOG("current membership " << cmems); + + if (!isamember(me, cmems)) { + LOG("not member yet; skip hearbeat"); + continue; + } + + // who has the smallest ID? + string m = min(me, *min_element(cmems.begin(), cmems.end())); + + if (m == me) { + // ping the other nodes + for (string mem : cmems) { + if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK) + continue; + if (vid == my_view_id) + remove(mem, cfg_mutex_lock); + break; + } + } else { + // ping the node with the smallest ID + if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id) + remove(m, cfg_mutex_lock); + } } - } } -paxos_protocol::status -config::heartbeat(std::string m, unsigned vid, int &r) -{ - ScopedLock ml(&cfg_mutex); - int ret = paxos_protocol::ERR; - r = (int) myvid; - tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid); - if (vid == myvid) { - ret = paxos_protocol::OK; - } else if (pro->isrunning()) { - VERIFY (vid == myvid + 1 || vid + 1 == myvid); - ret = paxos_protocol::OK; - } else { - ret = paxos_protocol::ERR; - } - return ret; +paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { + lock cfg_mutex_lock(cfg_mutex); + r = (int) my_view_id; + LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); + if (vid == my_view_id) + return paxos_protocol::OK; + else if (paxos.isrunning()) { + VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); + return paxos_protocol::OK; + } + return paxos_protocol::ERR; } -config::heartbeat_t -config::doheartbeat(std::string m) -{ - int ret = rpc_const::timeout_failure; - int r; - unsigned vid = myvid; - heartbeat_t res = OK; - - tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); - handle h(m); - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call(paxos_protocol::heartbeat, me, vid, r, - rpcc::to(1000)); - } - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (ret != paxos_protocol::OK) { - if (ret == rpc_const::atmostonce_failure || - ret == rpc_const::oldsrv_failure) { - mgr.delete_handle(m); - } else { - tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", - m.c_str(), ret, vid, r); - if (ret < 0) res = FAILURE; - else res = VIEWERR; +config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { + unsigned vid = my_view_id; + LOG("heartbeat to " << m << " (" << vid << ")"); + handle h(m); + + cfg_mutex_lock.unlock(); + int r = 0, ret = rpc_const::bind_failure; + if (rpcc *cl = h.safebind()) + ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); + cfg_mutex_lock.lock(); + + heartbeat_t res = OK; + switch (ret) { + case paxos_protocol::OK: + break; + case rpc_const::atmostonce_failure: + case rpc_const::oldsrv_failure: + invalidate_handle(m); + break; + default: + LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + res = (ret < 0) ? FAILURE : VIEWERR; } - } - tprintf("doheartbeat done %d\n", res); - return res; + LOG("done " << res); + return res; } -