#include "config.h"
-#include "handle.h"
+
+using std::vector;
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// all views, the other nodes can bring this re-joined node up to
// date.
+config_view_change::~config_view_change() {}
+
config::config(const string & _first, const string & _me, config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc),
paxos(this, me == _first, me, me)
void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
string value = paxos.value(instance);
- LOG("get_view(" << instance << "): returns " << value);
+ LOG << "get_view(" << instance << "): returns " << value;
m = explode(value);
}
my_view_id = paxos.instance();
if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
- LOG("view " << my_view_id << " " << mems);
+ LOG << "view " << my_view_id << " " << mems;
}
}
lock cfg_mutex_lock(cfg_mutex);
vector<string> newmem = explode(value);
- LOG("instance " << instance << ": " << newmem);
+ LOG << "instance " << instance << ": " << newmem;
for (auto mem : mems) {
- LOG("is " << mem << " still a member?");
+ LOG << "is " << mem << " still a member?";
if (!isamember(mem, newmem) && me != mem) {
- LOG("delete " << mem);
- handle(mem).invalidate();
+ LOG << "delete " << mem;
+ rpcc::unbind_cached(mem);
}
}
bool config::add(const string & new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
- LOG("adding " << new_m << " to " << vid);
+ LOG << "adding " << new_m << " to " << vid;
if (vid != my_view_id) {
- LOG("that's not my view id, " << my_view_id << "!");
+ LOG << "that's not my view id, " << my_view_id << "!";
return false;
}
- LOG("calling down to paxos layer");
+ LOG << "calling down to paxos layer";
vector<string> m(mems), cmems(mems);
m.push_back(new_m);
- LOG("old mems " << cmems << " " << implode(cmems));
- LOG("new mems " << m << " " << implode(m));
+ LOG << "old mems " << cmems << " " << implode(cmems);
+ LOG << "new mems " << m << " " << implode(m);
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, implode(m));
cfg_mutex_lock.lock();
- LOG("paxos proposer returned " << (r ? "success" : "failure"));
+ LOG << "paxos proposer returned " << (r ? "success" : "failure");
return r;
}
// caller should hold cfg_mutex
bool config::remove(const string & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- LOG("my_view_id " << my_view_id << " remove? " << m);
+ LOG << "my_view_id " << my_view_id << " remove? " << m;
vector<string> n;
for (auto mem : mems) {
if (mem != m)
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, implode(n));
cfg_mutex_lock.lock();
- LOG("proposer returned " << (r ? "success" : "failure"));
+ LOG << "proposer returned " << (r ? "success" : "failure");
return r;
}
while (1) {
auto next_timeout = steady_clock::now() + milliseconds(300);
- LOG("go to sleep");
+ LOG << "go to sleep";
config_cond.wait_until(cfg_mutex_lock, next_timeout);
unsigned vid = my_view_id;
vector<string> cmems;
get_view(vid, cmems, cfg_mutex_lock);
- LOG("current membership " << cmems);
+ LOG << "current membership " << cmems;
if (!isamember(me, cmems)) {
- LOG("not member yet; skip hearbeat");
+ LOG << "not member yet; skip hearbeat";
continue;
}
// who has the smallest ID?
- string m = min(me, *min_element(cmems.begin(), cmems.end()));
+ string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
if (m == me) {
// ping the other nodes
paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
- LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+ LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
if (vid == my_view_id)
return paxos_protocol::OK;
else if (paxos.isrunning()) {
config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
unsigned vid = my_view_id;
- LOG("heartbeat to " << m << " (" << vid << ")");
- handle h(m);
+ LOG << "heartbeat to " << m << " (" << vid << ")";
cfg_mutex_lock.unlock();
int r = 0, ret = rpc_protocol::bind_failure;
- if (rpcc *cl = h.safebind())
+ if (auto cl = rpcc::bind_cached(m))
ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
cfg_mutex_lock.lock();
break;
case rpc_protocol::atmostonce_failure:
case rpc_protocol::oldsrv_failure:
- h.invalidate();
+ rpcc::unbind_cached(m);
break;
default:
- LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
res = (ret < 0) ? FAILURE : VIEWERR;
}
- LOG("done " << res);
+ LOG << "done " << res;
return res;
}