-#include <thread>
-#include <sstream>
-#include <iostream>
-#include <stdio.h>
#include "config.h"
-#include "paxos.h"
#include "handle.h"
-#include "tprintf.h"
-#include "lang/verify.h"
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// all views, the other nodes can bring this re-joined node up to
// date.
-config::config(
- const std::string &_first,
- const std::string &_me,
- config_view_change *_vc)
- : my_view_id(0), first(_first), me(_me), vc(_vc)
+config::config(const string &_first, const string &_me, config_view_change *_vc)
+ : my_view_id(0), first(_first), me(_me), vc(_vc),
+ paxos(this, me == _first, me, me)
{
- paxos_acceptor = new acceptor(this, me == _first, me, me);
- paxos_proposer = new proposer(this, paxos_acceptor, me);
-
- // XXX hack; maybe should have its own port number
- paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
-
- {
- lock ml(cfg_mutex);
- reconstruct();
- std::thread(&config::heartbeater, this).detach();
- }
+ get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
+ lock cfg_mutex_lock(cfg_mutex);
+ reconstruct(cfg_mutex_lock);
+ thread(&config::heartbeater, this).detach();
}
-void
-config::restore(const std::string &s)
-{
- lock ml(cfg_mutex);
- paxos_acceptor->restore(s);
- reconstruct();
+void config::restore(const string &s) {
+ lock cfg_mutex_lock(cfg_mutex);
+ paxos.restore(s);
+ reconstruct(cfg_mutex_lock);
}
-void
-config::get_view(unsigned instance, std::vector<std::string> &m)
-{
- lock ml(cfg_mutex);
- get_view_wo(instance, m);
+void config::get_view(unsigned instance, vector<string> &m) {
+ lock cfg_mutex_lock(cfg_mutex);
+ get_view(instance, m, cfg_mutex_lock);
}
-// caller should hold cfg_mutex
-void
-config::get_view_wo(unsigned instance, std::vector<std::string> &m)
-{
- std::string value = paxos_acceptor->value(instance);
- tprintf("get_view(%d): returns %s\n", instance, value.c_str());
- members(value, m);
+void config::get_view(unsigned instance, vector<string> &m, lock &) {
+ string value = paxos.value(instance);
+ LOG("get_view(" << instance << "): returns " << value);
+ m = members(value);
}
-void
-config::members(const std::string &value, std::vector<std::string> &view) const
-{
- std::istringstream ist(value);
- std::string m;
- view.clear();
- while (ist >> m) {
- view.push_back(m);
- }
+vector<string> config::members(const string &value) const {
+ return explode(value);
}
-std::string
-config::value(const std::vector<std::string> &m) const
-{
- std::ostringstream ost;
- for (unsigned i = 0; i < m.size(); i++) {
- ost << m[i];
- ost << " ";
- }
- return ost.str();
+string config::value(const vector<string> &members) const {
+ return implode(members);
}
-// caller should hold cfg_mutex
-void
-config::reconstruct()
-{
- if (paxos_acceptor->instance() > 0) {
- std::string m;
- my_view_id = paxos_acceptor->instance();
- get_view_wo(my_view_id, mems);
- tprintf("config::reconstruct: %d %s\n",
- my_view_id, print_members(mems).c_str());
+void config::reconstruct(lock &cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
+ my_view_id = paxos.instance();
+ if (my_view_id > 0) {
+ get_view(my_view_id, mems, cfg_mutex_lock);
+ LOG("view " << my_view_id << " " << mems);
}
}
// Called by Paxos's acceptor.
-void
-config::paxos_commit(unsigned instance, const std::string &value)
-{
- std::string m;
- std::vector<std::string> newmem;
- lock ml(cfg_mutex);
+void config::paxos_commit(unsigned instance, const string &value) {
+ lock cfg_mutex_lock(cfg_mutex);
- members(value, newmem);
- tprintf("config::paxos_commit: %d: %s\n", instance,
- print_members(newmem).c_str());
+ vector<string> newmem = members(value);
+ LOG("instance " << instance << ": " << newmem);
- for (unsigned i = 0; i < mems.size(); i++) {
- tprintf("config::paxos_commit: is %s still a member?\n",
- mems[i].c_str());
- if (!isamember(mems[i], newmem) && me != mems[i]) {
- tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
- mgr.delete_handle(mems[i]);
+ for (auto mem : mems) {
+ LOG("is " << mem << " still a member?");
+ if (!isamember(mem, newmem) && me != mem) {
+ LOG("delete " << mem);
+ invalidate_handle(mem);
}
}
mems = newmem;
my_view_id = instance;
if (vc) {
- ml.unlock();
+ cfg_mutex_lock.unlock();
vc->commit_change(instance);
- ml.lock();
+ cfg_mutex_lock.lock();
}
}
-bool
-config::ismember(const std::string &m, unsigned vid)
-{
- lock ml(cfg_mutex);
- std::vector<std::string> v;
- get_view_wo(vid, v);
+bool config::ismember(const string &m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
+ vector<string> v;
+ get_view(vid, v, cfg_mutex_lock);
return isamember(m, v);
}
-bool
-config::add(const std::string &new_m, unsigned vid)
-{
- std::vector<std::string> m;
- std::vector<std::string> curm;
- lock ml(cfg_mutex);
- if (vid != my_view_id)
+bool config::add(const string &new_m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
+ LOG("adding " << new_m << " to " << vid);
+ if (vid != my_view_id) {
+ LOG("that's not my view id, " << my_view_id << "!");
return false;
- tprintf("config::add %s\n", new_m.c_str());
- m = mems;
- m.push_back(new_m);
- curm = mems;
- std::string v = value(m);
- int nextvid = my_view_id + 1;
- bool r;
- {
- ml.unlock();
- r = paxos_proposer->run(nextvid, curm, v);
- ml.lock();
}
- tprintf("config::add: proposer returned %s\n",
- r ? "success" : "failure");
+ LOG("calling down to paxos layer");
+ vector<string> m(mems), cmems(mems);
+ m.push_back(new_m);
+ LOG("old mems " << cmems << " " << value(cmems));
+ LOG("new mems " << m << " " << value(m));
+ unsigned nextvid = my_view_id + 1;
+ cfg_mutex_lock.unlock();
+ bool r = paxos.run(nextvid, cmems, value(m));
+ cfg_mutex_lock.lock();
+ LOG("paxos proposer returned " << (r ? "success" : "failure"));
return r;
}
// caller should hold cfg_mutex
-bool
-config::remove(const std::string &m)
-{
- adopt_lock ml(cfg_mutex);
- tprintf("config::remove: my_view_id %d remove? %s\n",
- my_view_id, m.c_str());
- std::vector<std::string> n;
- for (unsigned i = 0; i < mems.size(); i++) {
- if (mems[i] != m)
- n.push_back(mems[i]);
- }
- std::string v = value(n);
- std::vector<std::string> cmems = mems;
- int nextvid = my_view_id + 1;
- bool r;
- {
- ml.unlock();
- r = paxos_proposer->run(nextvid, cmems, v);
- ml.lock();
+bool config::remove(const string &m, lock &cfg_mutex_lock) {
+ LOG("my_view_id " << my_view_id << " remove? " << m);
+ vector<string> n;
+ for (auto mem : mems) {
+ if (mem != m)
+ n.push_back(mem);
}
- tprintf("config::remove: proposer returned %s\n",
- r ? "success" : "failure");
+ vector<string> cmems = mems;
+ unsigned nextvid = my_view_id + 1;
+ cfg_mutex_lock.unlock();
+ bool r = paxos.run(nextvid, cmems, value(n));
+ cfg_mutex_lock.lock();
+ LOG("proposer returned " << (r ? "success" : "failure"));
return r;
}
-void
-config::heartbeater()
-{
- std::string m;
- heartbeat_t h;
- bool stable;
- unsigned vid;
- std::vector<std::string> cmems;
- lock ml(cfg_mutex);
+void config::heartbeater() [[noreturn]] {
+ lock cfg_mutex_lock(cfg_mutex);
while (1) {
- auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
- tprintf("heartbeater: go to sleep\n");
- config_cond.wait_until(ml, next_timeout);
+ auto next_timeout = steady_clock::now() + milliseconds(300);
+ LOG("go to sleep");
+ config_cond.wait_until(cfg_mutex_lock, next_timeout);
- stable = true;
- vid = my_view_id;
- get_view_wo(vid, cmems);
- tprintf("heartbeater: current membership %s\n",
- print_members(cmems).c_str());
+ unsigned vid = my_view_id;
+ vector<string> cmems;
+ get_view(vid, cmems, cfg_mutex_lock);
+ LOG("current membership " << cmems);
if (!isamember(me, cmems)) {
- tprintf("heartbeater: not member yet; skip hearbeat\n");
+ LOG("not member yet; skip hearbeat");
continue;
}
// who has the smallest ID?
- m = me;
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (m > cmems[i])
- m = cmems[i];
- }
+ string m = min(me, *min_element(cmems.begin(), cmems.end()));
if (m == me) {
// ping the other nodes
- for (unsigned i = 0; i < cmems.size(); i++) {
- if (cmems[i] != me) {
- if ((h = doheartbeat(cmems[i])) != OK) {
- stable = false;
- m = cmems[i];
- break;
- }
- }
+ for (string mem : cmems) {
+ if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
+ continue;
+ if (vid == my_view_id)
+ remove(mem, cfg_mutex_lock);
+ break;
}
} else {
// ping the node with the smallest ID
- if ((h = doheartbeat(m)) != OK)
- stable = false;
- }
-
- if (!stable && vid == my_view_id) {
- remove(m);
+ if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
+ remove(m, cfg_mutex_lock);
}
}
}
-paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
-{
- lock ml(cfg_mutex);
- int ret = paxos_protocol::ERR;
+paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+ lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
- tprintf("heartbeat from %s(%d) my_view_id %d\n",
- m.c_str(), vid, my_view_id);
- if (vid == my_view_id) {
- ret = paxos_protocol::OK;
- } else if (paxos_proposer->isrunning()) {
+ LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+ if (vid == my_view_id)
+ return paxos_protocol::OK;
+ else if (paxos.isrunning()) {
VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
- ret = paxos_protocol::OK;
- } else {
- ret = paxos_protocol::ERR;
+ return paxos_protocol::OK;
}
- return ret;
+ return paxos_protocol::ERR;
}
-config::heartbeat_t
-config::doheartbeat(const std::string &m)
-{
- adopt_lock ml(cfg_mutex);
- int ret = rpc_const::timeout_failure;
- int r;
+config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
unsigned vid = my_view_id;
- heartbeat_t res = OK;
-
- tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+ LOG("heartbeat to " << m << " (" << vid << ")");
handle h(m);
- {
- ml.unlock();
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
- rpcc::to(1000));
- }
- ml.lock();
- }
- if (ret != paxos_protocol::OK) {
- if (ret == rpc_const::atmostonce_failure ||
- ret == rpc_const::oldsrv_failure) {
- mgr.delete_handle(m);
- } else {
- tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
- m.c_str(), ret, vid, r);
- if (ret < 0) res = FAILURE;
- else res = VIEWERR;
- }
+
+ cfg_mutex_lock.unlock();
+ int r = 0, ret = rpc_const::bind_failure;
+ if (rpcc *cl = h.safebind())
+ ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
+ cfg_mutex_lock.lock();
+
+ heartbeat_t res = OK;
+ switch (ret) {
+ case paxos_protocol::OK:
+ break;
+ case rpc_const::atmostonce_failure:
+ case rpc_const::oldsrv_failure:
+ invalidate_handle(m);
+ break;
+ default:
+ LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ res = (ret < 0) ? FAILURE : VIEWERR;
}
- tprintf("doheartbeat done %d\n", res);
+ LOG("done " << res);
return res;
}
-