-#include <thread>
-#include <sstream>
#include "config.h"
-#include "paxos.h"
#include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// all views, the other nodes can bring this re-joined node up to
// date.
-config::config(const string &_first, const string &_me, config_view_change *_vc)
+config::config(const string & _first, const string & _me, config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc),
- paxos_acceptor(this, me == _first, me, me),
- paxos_proposer(this, &paxos_acceptor, me)
+ paxos(this, me == _first, me, me)
{
get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
lock cfg_mutex_lock(cfg_mutex);
thread(&config::heartbeater, this).detach();
}
-void config::restore(const string &s) {
+void config::restore(const string & s) {
lock cfg_mutex_lock(cfg_mutex);
- paxos_acceptor.restore(s);
+ paxos.restore(s);
reconstruct(cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> &m) {
+void config::get_view(unsigned instance, vector<string> & m) {
lock cfg_mutex_lock(cfg_mutex);
get_view(instance, m, cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> &m, lock &) {
- string value = paxos_acceptor.value(instance);
+void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
+ string value = paxos.value(instance);
LOG("get_view(" << instance << "): returns " << value);
- m = members(value);
-}
-
-vector<string> config::members(const string &value) const {
- istringstream ist(value);
- using it = istream_iterator<string>;
- return {it(ist), it()};
-}
-
-string config::value(const vector<string> &m) const {
- ostringstream ost;
- copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
- return ost.str();
+ m = explode(value);
}
-void config::reconstruct(lock &cfg_mutex_lock) {
+void config::reconstruct(lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- if (paxos_acceptor.instance() > 0) {
- my_view_id = paxos_acceptor.instance();
+ my_view_id = paxos.instance();
+ if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
- LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
+ LOG("view " << my_view_id << " " << mems);
}
}
// Called by Paxos's acceptor.
-void config::paxos_commit(unsigned instance, const string &value) {
+void config::paxos_commit(unsigned instance, const string & value) {
lock cfg_mutex_lock(cfg_mutex);
- vector<string> newmem = members(value);
- LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
+ vector<string> newmem = explode(value);
+ LOG("instance " << instance << ": " << newmem);
for (auto mem : mems) {
- LOG("config::paxos_commit: is " << mem << " still a member?");
+ LOG("is " << mem << " still a member?");
if (!isamember(mem, newmem) && me != mem) {
- LOG("config::paxos_commit: delete " << mem);
- invalidate_handle(mem);
+ LOG("delete " << mem);
+ handle(mem).invalidate();
}
}
}
}
-bool config::ismember(const string &m, unsigned vid) {
+bool config::ismember(const string & m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
vector<string> v;
get_view(vid, v, cfg_mutex_lock);
return isamember(m, v);
}
-bool config::add(const string &new_m, unsigned vid) {
+bool config::add(const string & new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
- if (vid != my_view_id)
+ LOG("adding " << new_m << " to " << vid);
+ if (vid != my_view_id) {
+ LOG("that's not my view id, " << my_view_id << "!");
return false;
- LOG("config::add " << new_m);
- vector<string> m = mems;
+ }
+ LOG("calling down to paxos layer");
+ vector<string> m(mems), cmems(mems);
m.push_back(new_m);
- vector<string> cmems = mems;
+ LOG("old mems " << cmems << " " << implode(cmems));
+ LOG("new mems " << m << " " << implode(m));
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos_proposer.run(nextvid, cmems, value(m));
+ bool r = paxos.run(nextvid, cmems, implode(m));
cfg_mutex_lock.lock();
- LOG("config::add: proposer returned " << (r ? "success" : "failure"));
+ LOG("paxos proposer returned " << (r ? "success" : "failure"));
return r;
}
// caller should hold cfg_mutex
-bool config::remove(const string &m, lock &cfg_mutex_lock) {
- LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
+bool config::remove(const string & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
+ LOG("my_view_id " << my_view_id << " remove? " << m);
vector<string> n;
for (auto mem : mems) {
if (mem != m)
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos_proposer.run(nextvid, cmems, value(n));
+ bool r = paxos.run(nextvid, cmems, implode(n));
cfg_mutex_lock.lock();
- LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
+ LOG("proposer returned " << (r ? "success" : "failure"));
return r;
}
-void config::heartbeater() [[noreturn]] {
+void config::heartbeater() {
lock cfg_mutex_lock(cfg_mutex);
while (1) {
- auto next_timeout = steady_clock::now() + seconds(3);
- LOG("heartbeater: go to sleep");
+ auto next_timeout = steady_clock::now() + milliseconds(300);
+ LOG("go to sleep");
config_cond.wait_until(cfg_mutex_lock, next_timeout);
unsigned vid = my_view_id;
vector<string> cmems;
get_view(vid, cmems, cfg_mutex_lock);
- LOG("heartbeater: current membership " << print_members(cmems));
+ LOG("current membership " << cmems);
if (!isamember(me, cmems)) {
- LOG("heartbeater: not member yet; skip hearbeat");
+ LOG("not member yet; skip hearbeat");
continue;
}
}
}
-paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
if (vid == my_view_id)
return paxos_protocol::OK;
- else if (paxos_proposer.isrunning()) {
+ else if (paxos.isrunning()) {
VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
return paxos_protocol::OK;
}
return paxos_protocol::ERR;
}
-config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
+config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
unsigned vid = my_view_id;
- LOG("doheartbeater to " << m << " (" << vid << ")");
+ LOG("heartbeat to " << m << " (" << vid << ")");
handle h(m);
cfg_mutex_lock.unlock();
- int r = 0, ret = rpc_const::bind_failure;
+ int r = 0, ret = rpc_protocol::bind_failure;
if (rpcc *cl = h.safebind())
- ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
+ ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
cfg_mutex_lock.lock();
heartbeat_t res = OK;
switch (ret) {
case paxos_protocol::OK:
break;
- case rpc_const::atmostonce_failure:
- case rpc_const::oldsrv_failure:
- invalidate_handle(m);
+ case rpc_protocol::atmostonce_failure:
+ case rpc_protocol::oldsrv_failure:
+ h.invalidate();
break;
default:
- LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
res = (ret < 0) ? FAILURE : VIEWERR;
}
- LOG("doheartbeat done " << res);
+ LOG("done " << res);
return res;
}