X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..0989f6feac9c8e83847165c4abee5273463eaa63:/config.cc?ds=sidebyside diff --git a/config.cc b/config.cc index 0f9ab4c..04c869e 100644 --- a/config.cc +++ b/config.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -39,294 +40,280 @@ // all views, the other nodes can bring this re-joined node up to // date. -static void * -heartbeatthread(void *x) +config::config( + const std::string &_first, + const std::string &_me, + config_view_change *_vc) + : my_view_id(0), first(_first), me(_me), vc(_vc) { - config *r = (config *) x; - r->heartbeater(); - return 0; -} - -config::config(std::string _first, std::string _me, config_view_change *_vc) - : myvid (0), first (_first), me (_me), vc (_vc) -{ - VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0); - VERIFY(pthread_cond_init(&config_cond, NULL) == 0); - - std::ostringstream ost; - ost << me; - - acc = new acceptor(this, me == _first, me, ost.str()); - pro = new proposer(this, acc, me); + paxos_acceptor = new acceptor(this, me == _first, me, me); + paxos_proposer = new proposer(this, paxos_acceptor, me); - // XXX hack; maybe should have its own port number - pxsrpc = acc->get_rpcs(); - pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat); + // XXX hack; maybe should have its own port number + paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); - { - ScopedLock ml(&cfg_mutex); - - reconstruct(); - - pthread_t th; - VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0); - } + { + lock ml(cfg_mutex); + reconstruct(); + std::thread(&config::heartbeater, this).detach(); + } } void -config::restore(std::string s) +config::restore(const std::string &s) { - ScopedLock ml(&cfg_mutex); - acc->restore(s); - reconstruct(); + lock ml(cfg_mutex); + paxos_acceptor->restore(s); + reconstruct(); } -std::vector -config::get_view(unsigned instance) +void +config::get_view(unsigned instance, std::vector &m) { - ScopedLock ml(&cfg_mutex); - return get_view_wo(instance); + lock ml(cfg_mutex); + get_view_wo(instance, m); } // caller should hold cfg_mutex -std::vector -config::get_view_wo(unsigned instance) +void +config::get_view_wo(unsigned instance, std::vector &m) { - std::string value = acc->value(instance); - tprintf("get_view(%d): returns %s\n", instance, value.c_str()); - return members(value); + std::string value = paxos_acceptor->value(instance); + tprintf("get_view(%d): returns %s\n", instance, value.c_str()); + members(value, m); } -std::vector -config::members(std::string value) +void +config::members(const std::string &value, std::vector &view) const { - std::istringstream ist(value); - std::string m; - std::vector view; - while (ist >> m) { - view.push_back(m); - } - return view; + std::istringstream ist(value); + std::string m; + view.clear(); + while (ist >> m) { + view.push_back(m); + } } std::string -config::value(std::vector m) +config::value(const std::vector &m) const { - std::ostringstream ost; - for (unsigned i = 0; i < m.size(); i++) { - ost << m[i]; - ost << " "; - } - return ost.str(); + std::ostringstream ost; + for (unsigned i = 0; i < m.size(); i++) { + ost << m[i]; + ost << " "; + } + return ost.str(); } // caller should hold cfg_mutex void config::reconstruct() { - if (acc->instance() > 0) { - std::string m; - myvid = acc->instance(); - mems = get_view_wo(myvid); - tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str()); - } + if (paxos_acceptor->instance() > 0) { + std::string m; + my_view_id = paxos_acceptor->instance(); + get_view_wo(my_view_id, mems); + tprintf("config::reconstruct: %d %s\n", + my_view_id, print_members(mems).c_str()); + } } // Called by Paxos's acceptor. void -config::paxos_commit(unsigned instance, std::string value) +config::paxos_commit(unsigned instance, const std::string &value) { - std::string m; - std::vector newmem; - ScopedLock ml(&cfg_mutex); - - newmem = members(value); - tprintf("config::paxos_commit: %d: %s\n", instance, - print_members(newmem).c_str()); - - for (unsigned i = 0; i < mems.size(); i++) { - tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str()); - if (!isamember(mems[i], newmem) && me != mems[i]) { - tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); - mgr.delete_handle(mems[i]); + std::string m; + std::vector newmem; + lock ml(cfg_mutex); + + members(value, newmem); + tprintf("config::paxos_commit: %d: %s\n", instance, + print_members(newmem).c_str()); + + for (unsigned i = 0; i < mems.size(); i++) { + tprintf("config::paxos_commit: is %s still a member?\n", + mems[i].c_str()); + if (!isamember(mems[i], newmem) && me != mems[i]) { + tprintf("config::paxos_commit: delete %s\n", mems[i].c_str()); + mgr.delete_handle(mems[i]); + } } - } - mems = newmem; - myvid = instance; - if (vc) { - unsigned vid = myvid; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - vc->commit_change(vid); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - } + mems = newmem; + my_view_id = instance; + if (vc) { + ml.unlock(); + vc->commit_change(instance); + ml.lock(); + } } bool -config::ismember(std::string m, unsigned vid) +config::ismember(const std::string &m, unsigned vid) { - bool r; - ScopedLock ml(&cfg_mutex); - std::vector v = get_view_wo(vid); - r = isamember(m, v); - return r; + lock ml(cfg_mutex); + std::vector v; + get_view_wo(vid, v); + return isamember(m, v); } bool -config::add(std::string new_m, unsigned vid) +config::add(const std::string &new_m, unsigned vid) { - std::vector m; - std::vector curm; - ScopedLock ml(&cfg_mutex); - if (vid != myvid) - return false; - tprintf("config::add %s\n", new_m.c_str()); - m = mems; - m.push_back(new_m); - curm = mems; - std::string v = value(m); - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, curm, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::add: proposer returned success\n"); - } else { - tprintf("config::add: proposer returned failure\n"); - } - return r; + std::vector m; + std::vector curm; + lock ml(cfg_mutex); + if (vid != my_view_id) + return false; + tprintf("config::add %s\n", new_m.c_str()); + m = mems; + m.push_back(new_m); + curm = mems; + std::string v = value(m); + int nextvid = my_view_id + 1; + bool r; + { + ml.unlock(); + r = paxos_proposer->run(nextvid, curm, v); + ml.lock(); + } + tprintf("config::add: proposer returned %s\n", + r ? "success" : "failure"); + return r; } // caller should hold cfg_mutex bool -config::remove_wo(std::string m) +config::remove(const std::string &m) { - tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str()); - std::vector n; - for (unsigned i = 0; i < mems.size(); i++) { - if (mems[i] != m) n.push_back(mems[i]); - } - std::string v = value(n); - std::vector cmems = mems; - int nextvid = myvid + 1; - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - bool r = pro->run(nextvid, cmems, v); - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (r) { - tprintf("config::remove: proposer returned success\n"); - } else { - tprintf("config::remove: proposer returned failure\n"); - } - return r; + adopt_lock ml(cfg_mutex); + tprintf("config::remove: my_view_id %d remove? %s\n", + my_view_id, m.c_str()); + std::vector n; + for (unsigned i = 0; i < mems.size(); i++) { + if (mems[i] != m) + n.push_back(mems[i]); + } + std::string v = value(n); + std::vector cmems = mems; + int nextvid = my_view_id + 1; + bool r; + { + ml.unlock(); + r = paxos_proposer->run(nextvid, cmems, v); + ml.lock(); + } + tprintf("config::remove: proposer returned %s\n", + r ? "success" : "failure"); + return r; } void config::heartbeater() { - struct timeval now; - struct timespec next_timeout; - std::string m; - heartbeat_t h; - bool stable; - unsigned vid; - std::vector cmems; - ScopedLock ml(&cfg_mutex); - - while (1) { - - gettimeofday(&now, NULL); - next_timeout.tv_sec = now.tv_sec + 3; - next_timeout.tv_nsec = 0; - tprintf("heartbeater: go to sleep\n"); - pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout); - - stable = true; - vid = myvid; - cmems = get_view_wo(vid); - tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str()); - - if (!isamember(me, cmems)) { - tprintf("heartbeater: not member yet; skip hearbeat\n"); - continue; - } - - //find the node with the smallest id - m = me; - for (unsigned i = 0; i < cmems.size(); i++) { - if (m > cmems[i]) - m = cmems[i]; - } - - if (m == me) { - //if i am the one with smallest id, ping the rest of the nodes - for (unsigned i = 0; i < cmems.size(); i++) { - if (cmems[i] != me) { - if ((h = doheartbeat(cmems[i])) != OK) { - stable = false; - m = cmems[i]; - break; - } - } - } - } else { - //the rest of the nodes ping the one with smallest id - if ((h = doheartbeat(m)) != OK) - stable = false; - } - - if (!stable && vid == myvid) { - remove_wo(m); + std::string m; + heartbeat_t h; + bool stable; + unsigned vid; + std::vector cmems; + lock ml(cfg_mutex); + + while (1) { + auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3); + tprintf("heartbeater: go to sleep\n"); + config_cond.wait_until(ml, next_timeout); + + stable = true; + vid = my_view_id; + get_view_wo(vid, cmems); + tprintf("heartbeater: current membership %s\n", + print_members(cmems).c_str()); + + if (!isamember(me, cmems)) { + tprintf("heartbeater: not member yet; skip hearbeat\n"); + continue; + } + + // who has the smallest ID? + m = me; + for (unsigned i = 0; i < cmems.size(); i++) { + if (m > cmems[i]) + m = cmems[i]; + } + + if (m == me) { + // ping the other nodes + for (unsigned i = 0; i < cmems.size(); i++) { + if (cmems[i] != me) { + if ((h = doheartbeat(cmems[i])) != OK) { + stable = false; + m = cmems[i]; + break; + } + } + } + } else { + // ping the node with the smallest ID + if ((h = doheartbeat(m)) != OK) + stable = false; + } + + if (!stable && vid == my_view_id) { + remove(m); + } } - } } paxos_protocol::status -config::heartbeat(std::string m, unsigned vid, int &r) +config::heartbeat(int &r, std::string m, unsigned vid) { - ScopedLock ml(&cfg_mutex); - int ret = paxos_protocol::ERR; - r = (int) myvid; - tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid); - if (vid == myvid) { - ret = paxos_protocol::OK; - } else if (pro->isrunning()) { - VERIFY (vid == myvid + 1 || vid + 1 == myvid); - ret = paxos_protocol::OK; - } else { - ret = paxos_protocol::ERR; - } - return ret; + lock ml(cfg_mutex); + int ret = paxos_protocol::ERR; + r = (int) my_view_id; + tprintf("heartbeat from %s(%d) my_view_id %d\n", + m.c_str(), vid, my_view_id); + if (vid == my_view_id) { + ret = paxos_protocol::OK; + } else if (paxos_proposer->isrunning()) { + VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id); + ret = paxos_protocol::OK; + } else { + ret = paxos_protocol::ERR; + } + return ret; } config::heartbeat_t -config::doheartbeat(std::string m) +config::doheartbeat(const std::string &m) { - int ret = rpc_const::timeout_failure; - int r; - unsigned vid = myvid; - heartbeat_t res = OK; - - tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); - handle h(m); - VERIFY(pthread_mutex_unlock(&cfg_mutex)==0); - rpcc *cl = h.safebind(); - if (cl) { - ret = cl->call(paxos_protocol::heartbeat, me, vid, r, - rpcc::to(1000)); - } - VERIFY(pthread_mutex_lock(&cfg_mutex)==0); - if (ret != paxos_protocol::OK) { - if (ret == rpc_const::atmostonce_failure || - ret == rpc_const::oldsrv_failure) { - mgr.delete_handle(m); - } else { - tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", - m.c_str(), ret, vid, r); - if (ret < 0) res = FAILURE; - else res = VIEWERR; + adopt_lock ml(cfg_mutex); + int ret = rpc_const::timeout_failure; + int r; + unsigned vid = my_view_id; + heartbeat_t res = OK; + + tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid); + handle h(m); + { + ml.unlock(); + rpcc *cl = h.safebind(); + if (cl) { + ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid); + } + ml.lock(); + } + if (ret != paxos_protocol::OK) { + if (ret == rpc_const::atmostonce_failure || + ret == rpc_const::oldsrv_failure) { + mgr.delete_handle(m); + } else { + tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", + m.c_str(), ret, vid, r); + if (ret < 0) res = FAILURE; + else res = VIEWERR; + } } - } - tprintf("doheartbeat done %d\n", res); - return res; + tprintf("doheartbeat done %d\n", res); + return res; }