+#include <thread>
#include <sstream>
#include <iostream>
#include <stdio.h>
// all views, the other nodes can bring this re-joined node up to
// date.
-static void *
-heartbeatthread(void *x)
+config::config(
+ const std::string &_first,
+ const std::string &_me,
+ config_view_change *_vc)
+ : my_view_id(0), first(_first), me(_me), vc(_vc)
{
- config *r = (config *) x;
- r->heartbeater();
- return 0;
-}
-
-config::config(std::string _first, std::string _me, config_view_change *_vc)
- : myvid (0), first (_first), me (_me), vc (_vc)
-{
- VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
- VERIFY(pthread_cond_init(&config_cond, NULL) == 0);
-
- std::ostringstream ost;
- ost << me;
-
- acc = new acceptor(this, me == _first, me, ost.str());
- pro = new proposer(this, acc, me);
+ paxos_acceptor = new acceptor(this, me == _first, me, me);
+ paxos_proposer = new proposer(this, paxos_acceptor, me);
- // XXX hack; maybe should have its own port number
- pxsrpc = acc->get_rpcs();
- pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+ // XXX hack; maybe should have its own port number
+ paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
- {
- ScopedLock ml(&cfg_mutex);
-
- reconstruct();
-
- pthread_t th;
- VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
- }
+ {
+ lock ml(cfg_mutex);
+ reconstruct();
+ std::thread(&config::heartbeater, this).detach();
+ }
}
void
-config::restore(std::string s)
+config::restore(const std::string &s)
{
- ScopedLock ml(&cfg_mutex);
- acc->restore(s);
- reconstruct();
+ lock ml(cfg_mutex);
+ paxos_acceptor->restore(s);
+ reconstruct();
}
-std::vector<std::string>
-config::get_view(unsigned instance)
+void
+config::get_view(unsigned instance, std::vector<std::string> &m)
{
- ScopedLock ml(&cfg_mutex);
- return get_view_wo(instance);
+ lock ml(cfg_mutex);
+ get_view_wo(instance, m);
}
// caller should hold cfg_mutex
-std::vector<std::string>
-config::get_view_wo(unsigned instance)
+void
+config::get_view_wo(unsigned instance, std::vector<std::string> &m)
{
- std::string value = acc->value(instance);
- tprintf("get_view(%d): returns %s\n", instance, value.c_str());
- return members(value);
+ std::string value = paxos_acceptor->value(instance);
+ tprintf("get_view(%d): returns %s\n", instance, value.c_str());
+ members(value, m);
}
-std::vector<std::string>
-config::members(std::string value)
+void
+config::members(const std::string &value, std::vector<std::string> &view) const
{
- std::istringstream ist(value);
- std::string m;
- std::vector<std::string> view;
- while (ist >> m) {
- view.push_back(m);
- }
- return view;
+ std::istringstream ist(value);
+ std::string m;
+ view.clear();
+ while (ist >> m) {
+ view.push_back(m);
+ }
}
std::string
-config::value(std::vector<std::string> m)
+config::value(const std::vector<std::string> &m) const
{
- std::ostringstream ost;
- for (unsigned i = 0; i < m.size(); i++) {
- ost << m[i];
- ost << " ";
- }
- return ost.str();
+ std::ostringstream ost;
+ for (unsigned i = 0; i < m.size(); i++) {
+ ost << m[i];
+ ost << " ";
+ }
+ return ost.str();
}
// caller should hold cfg_mutex
void
config::reconstruct()
{
- if (acc->instance() > 0) {
- std::string m;
- myvid = acc->instance();
- mems = get_view_wo(myvid);
- tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
- }
+ if (paxos_acceptor->instance() > 0) {
+ std::string m;
+ my_view_id = paxos_acceptor->instance();
+ get_view_wo(my_view_id, mems);
+ tprintf("config::reconstruct: %d %s\n",
+ my_view_id, print_members(mems).c_str());
+ }
}
// Called by Paxos's acceptor.
void
-config::paxos_commit(unsigned instance, std::string value)
+config::paxos_commit(unsigned instance, const std::string &value)
{
- std::string m;
- std::vector<std::string> newmem;
- ScopedLock ml(&cfg_mutex);
-
- newmem = members(value);
- tprintf("config::paxos_commit: %d: %s\n", instance,
- print_members(newmem).c_str());
-
- for (unsigned i = 0; i < mems.size(); i++) {
- tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
- if (!isamember(mems[i], newmem) && me != mems[i]) {
- tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
- mgr.delete_handle(mems[i]);
+ std::string m;
+ std::vector<std::string> newmem;
+ lock ml(cfg_mutex);
+
+ members(value, newmem);
+ tprintf("config::paxos_commit: %d: %s\n", instance,
+ print_members(newmem).c_str());
+
+ for (unsigned i = 0; i < mems.size(); i++) {
+ tprintf("config::paxos_commit: is %s still a member?\n",
+ mems[i].c_str());
+ if (!isamember(mems[i], newmem) && me != mems[i]) {
+ tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
+ mgr.delete_handle(mems[i]);
+ }
}
- }
- mems = newmem;
- myvid = instance;
- if (vc) {
- unsigned vid = myvid;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- vc->commit_change(vid);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- }
+ mems = newmem;
+ my_view_id = instance;
+ if (vc) {
+ ml.unlock();
+ vc->commit_change(instance);
+ ml.lock();
+ }
}
bool
-config::ismember(std::string m, unsigned vid)
+config::ismember(const std::string &m, unsigned vid)
{
- bool r;
- ScopedLock ml(&cfg_mutex);
- std::vector<std::string> v = get_view_wo(vid);
- r = isamember(m, v);
- return r;
+ lock ml(cfg_mutex);
+ std::vector<std::string> v;
+ get_view_wo(vid, v);
+ return isamember(m, v);
}
bool
-config::add(std::string new_m, unsigned vid)
+config::add(const std::string &new_m, unsigned vid)
{
- std::vector<std::string> m;
- std::vector<std::string> curm;
- ScopedLock ml(&cfg_mutex);
- if (vid != myvid)
- return false;
- tprintf("config::add %s\n", new_m.c_str());
- m = mems;
- m.push_back(new_m);
- curm = mems;
- std::string v = value(m);
- int nextvid = myvid + 1;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- bool r = pro->run(nextvid, curm, v);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (r) {
- tprintf("config::add: proposer returned success\n");
- } else {
- tprintf("config::add: proposer returned failure\n");
- }
- return r;
+ std::vector<std::string> m;
+ std::vector<std::string> curm;
+ lock ml(cfg_mutex);
+ if (vid != my_view_id)
+ return false;
+ tprintf("config::add %s\n", new_m.c_str());
+ m = mems;
+ m.push_back(new_m);
+ curm = mems;
+ std::string v = value(m);
+ int nextvid = my_view_id + 1;
+ bool r;
+ {
+ ml.unlock();
+ r = paxos_proposer->run(nextvid, curm, v);
+ ml.lock();
+ }
+ tprintf("config::add: proposer returned %s\n",
+ r ? "success" : "failure");
+ return r;
}
// caller should hold cfg_mutex
bool
-config::remove_wo(std::string m)
+config::remove(const std::string &m)
{
- tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
- std::vector<std::string> n;
- for (unsigned i = 0; i < mems.size(); i++) {
- if (mems[i] != m) n.push_back(mems[i]);
- }
- std::string v = value(n);
- std::vector<std::string> cmems = mems;
- int nextvid = myvid + 1;
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- bool r = pro->run(nextvid, cmems, v);
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (r) {
- tprintf("config::remove: proposer returned success\n");
- } else {
- tprintf("config::remove: proposer returned failure\n");
- }
- return r;
+ adopt_lock ml(cfg_mutex);
+ tprintf("config::remove: my_view_id %d remove? %s\n",
+ my_view_id, m.c_str());
+ std::vector<std::string> n;
+ for (unsigned i = 0; i < mems.size(); i++) {
+ if (mems[i] != m)
+ n.push_back(mems[i]);
+ }
+ std::string v = value(n);
+ std::vector<std::string> cmems = mems;
+ int nextvid = my_view_id + 1;
+ bool r;
+ {
+ ml.unlock();
+ r = paxos_proposer->run(nextvid, cmems, v);
+ ml.lock();
+ }
+ tprintf("config::remove: proposer returned %s\n",
+ r ? "success" : "failure");
+ return r;
}
void
config::heartbeater()
{
- struct timeval now;
- struct timespec next_timeout;
- std::string m;
- heartbeat_t h;
- bool stable;
- unsigned vid;
- std::vector<std::string> cmems;
- ScopedLock ml(&cfg_mutex);
-
- while (1) {
-
- gettimeofday(&now, NULL);
- next_timeout.tv_sec = now.tv_sec + 3;
- next_timeout.tv_nsec = 0;
- tprintf("heartbeater: go to sleep\n");
- pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
-
- stable = true;
- vid = myvid;
- cmems = get_view_wo(vid);
- tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
-
- if (!isamember(me, cmems)) {
- tprintf("heartbeater: not member yet; skip hearbeat\n");
- continue;
- }
-
- //find the node with the smallest id
- m = me;
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (m > cmems[i])
- m = cmems[i];
- }
-
- if (m == me) {
- //if i am the one with smallest id, ping the rest of the nodes
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (cmems[i] != me) {
- if ((h = doheartbeat(cmems[i])) != OK) {
- stable = false;
- m = cmems[i];
- break;
- }
- }
- }
- } else {
- //the rest of the nodes ping the one with smallest id
- if ((h = doheartbeat(m)) != OK)
- stable = false;
- }
-
- if (!stable && vid == myvid) {
- remove_wo(m);
+ std::string m;
+ heartbeat_t h;
+ bool stable;
+ unsigned vid;
+ std::vector<std::string> cmems;
+ lock ml(cfg_mutex);
+
+ while (1) {
+ auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+ tprintf("heartbeater: go to sleep\n");
+ config_cond.wait_until(ml, next_timeout);
+
+ stable = true;
+ vid = my_view_id;
+ get_view_wo(vid, cmems);
+ tprintf("heartbeater: current membership %s\n",
+ print_members(cmems).c_str());
+
+ if (!isamember(me, cmems)) {
+ tprintf("heartbeater: not member yet; skip hearbeat\n");
+ continue;
+ }
+
+ // who has the smallest ID?
+ m = me;
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (m > cmems[i])
+ m = cmems[i];
+ }
+
+ if (m == me) {
+ // ping the other nodes
+ for (unsigned i = 0; i < cmems.size(); i++) {
+ if (cmems[i] != me) {
+ if ((h = doheartbeat(cmems[i])) != OK) {
+ stable = false;
+ m = cmems[i];
+ break;
+ }
+ }
+ }
+ } else {
+ // ping the node with the smallest ID
+ if ((h = doheartbeat(m)) != OK)
+ stable = false;
+ }
+
+ if (!stable && vid == my_view_id) {
+ remove(m);
+ }
}
- }
}
paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
+config::heartbeat(int &r, std::string m, unsigned vid)
{
- ScopedLock ml(&cfg_mutex);
- int ret = paxos_protocol::ERR;
- r = (int) myvid;
- tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
- if (vid == myvid) {
- ret = paxos_protocol::OK;
- } else if (pro->isrunning()) {
- VERIFY (vid == myvid + 1 || vid + 1 == myvid);
- ret = paxos_protocol::OK;
- } else {
- ret = paxos_protocol::ERR;
- }
- return ret;
+ lock ml(cfg_mutex);
+ int ret = paxos_protocol::ERR;
+ r = (int) my_view_id;
+ tprintf("heartbeat from %s(%d) my_view_id %d\n",
+ m.c_str(), vid, my_view_id);
+ if (vid == my_view_id) {
+ ret = paxos_protocol::OK;
+ } else if (paxos_proposer->isrunning()) {
+ VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
+ ret = paxos_protocol::OK;
+ } else {
+ ret = paxos_protocol::ERR;
+ }
+ return ret;
}
config::heartbeat_t
-config::doheartbeat(std::string m)
+config::doheartbeat(const std::string &m)
{
- int ret = rpc_const::timeout_failure;
- int r;
- unsigned vid = myvid;
- heartbeat_t res = OK;
-
- tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
- handle h(m);
- VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
- rpcc::to(1000));
- }
- VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
- if (ret != paxos_protocol::OK) {
- if (ret == rpc_const::atmostonce_failure ||
- ret == rpc_const::oldsrv_failure) {
- mgr.delete_handle(m);
- } else {
- tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
- m.c_str(), ret, vid, r);
- if (ret < 0) res = FAILURE;
- else res = VIEWERR;
+ adopt_lock ml(cfg_mutex);
+ int ret = rpc_const::timeout_failure;
+ int r;
+ unsigned vid = my_view_id;
+ heartbeat_t res = OK;
+
+ tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+ handle h(m);
+ {
+ ml.unlock();
+ rpcc *cl = h.safebind();
+ if (cl) {
+ ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
+ }
+ ml.lock();
+ }
+ if (ret != paxos_protocol::OK) {
+ if (ret == rpc_const::atmostonce_failure ||
+ ret == rpc_const::oldsrv_failure) {
+ mgr.delete_handle(m);
+ } else {
+ tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
+ m.c_str(), ret, vid, r);
+ if (ret < 0) res = FAILURE;
+ else res = VIEWERR;
+ }
}
- }
- tprintf("doheartbeat done %d\n", res);
- return res;
+ tprintf("doheartbeat done %d\n", res);
+ return res;
}