my_view_id = paxos.instance();
if (my_view_id > 0) {
get_view(my_view_id, mems, cfg_mutex_lock);
- LOG("config::reconstruct: " << my_view_id << " " << mems);
+ LOG("view " << my_view_id << " " << mems);
}
}
lock cfg_mutex_lock(cfg_mutex);
vector<string> newmem = members(value);
- LOG("config::paxos_commit: " << instance << ": " << newmem);
+ LOG("instance " << instance << ": " << newmem);
for (auto mem : mems) {
- LOG("config::paxos_commit: is " << mem << " still a member?");
+ LOG("is " << mem << " still a member?");
if (!isamember(mem, newmem) && me != mem) {
- LOG("config::paxos_commit: delete " << mem);
+ LOG("delete " << mem);
invalidate_handle(mem);
}
}
LOG("that's not my view id, " << my_view_id << "!");
return false;
}
- vector<string> m = mems;
+ LOG("calling down to paxos layer");
+ vector<string> m(mems), cmems(mems);
m.push_back(new_m);
- vector<string> cmems = mems;
+ LOG("old mems " << cmems << " " << value(cmems));
+ LOG("new mems " << m << " " << value(m));
unsigned nextvid = my_view_id + 1;
- LOG("calling down to paxos layer");
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, value(m));
cfg_mutex_lock.lock();
// caller should hold cfg_mutex
bool config::remove(const string &m, lock &cfg_mutex_lock) {
- LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
+ LOG("my_view_id " << my_view_id << " remove? " << m);
vector<string> n;
for (auto mem : mems) {
if (mem != m)
cfg_mutex_lock.unlock();
bool r = paxos.run(nextvid, cmems, value(n));
cfg_mutex_lock.lock();
- LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
+ LOG("proposer returned " << (r ? "success" : "failure"));
return r;
}
while (1) {
auto next_timeout = steady_clock::now() + seconds(3);
- LOG("heartbeater: go to sleep");
+ LOG("go to sleep");
config_cond.wait_until(cfg_mutex_lock, next_timeout);
unsigned vid = my_view_id;
vector<string> cmems;
get_view(vid, cmems, cfg_mutex_lock);
- LOG("heartbeater: current membership " << cmems);
+ LOG("current membership " << cmems);
if (!isamember(me, cmems)) {
- LOG("heartbeater: not member yet; skip hearbeat");
+ LOG("not member yet; skip hearbeat");
continue;
}
config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
unsigned vid = my_view_id;
- LOG("doheartbeater to " << m << " (" << vid << ")");
+ LOG("heartbeat to " << m << " (" << vid << ")");
handle h(m);
cfg_mutex_lock.unlock();
invalidate_handle(m);
break;
default:
- LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+ LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
res = (ret < 0) ? FAILURE : VIEWERR;
}
- LOG("doheartbeat done " << res);
+ LOG("done " << res);
return res;
}
bool r = false;
proposal.n = max(promise.n, proposal.n) + 1;
nodes_t accepts;
- value_t v = newv;
+ value_t v;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
LOG("received a majority of prepare responses");
+ if (!v.size())
+ v = newv;
+
breakpoint1();
nodes_t nodes;
}
void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
- LOG("acceptor::commit: instance=" << instance << " has v=" << value);
+ LOG("instance=" << instance << " has v=" << value);
if (instance > instance_h) {
- LOG("commit: highestacceptedinstance = " << instance);
+ LOG("highestacceptedinstance = " << instance);
values[instance] = value;
l.loginstance(instance, value);
instance_h = instance;
accepted = promise = {0, me};
- accepted_value.clear();
+ string v = value; // gaaahhh aliasing of value and accepted_value
+ accepted_value.clear(); // this wipes out "value", too
if (delegate) {
pxs_mutex_lock.unlock();
- delegate->paxos_commit(instance, value);
+ delegate->paxos_commit(instance, v);
pxs_mutex_lock.lock();
}
}
using std::setprecision;
using std::ostream;
using std::istream;
-using std::ostream_iterator;
-using std::istream_iterator;
using std::ios;
#include <limits>
template <class C>
inline typename enable_if<is_const_iterable<C>::value, string>::type
implode(const C & v, string delim=" ") {
- if (v.begin() == v.end())
+ auto i=v.cbegin(), end=v.cend();
+ if (i == end)
return string();
ostringstream oss;
- auto last = prev(v.end());
- copy(v.begin(), last, ostream_iterator<typename C::value_type>(oss, delim.c_str()));
- oss << *last;
+ oss << *i++;
+ while (i != end)
+ oss << delim << *i++;
return oss.str();
}
size_t start = 0, end = 0;
while ((end = s.find(delim, start)) != string::npos) {
out.push_back(s.substr(start, end - start));
- start = end + 1;
+ start = end + delim.size();
}
out.push_back(s.substr(start));
return out;