From: Peter Iannucci Date: Tue, 1 Oct 2013 07:55:24 +0000 (-0400) Subject: Fixed two major bugs in paxos.cc. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/e478ac59e66e89cbc174e781ac715c8644539947?hp=c9be386a30dc7145ac757fae66c255b21da26d5d Fixed two major bugs in paxos.cc. --- diff --git a/config.cc b/config.cc index 038a100..727277e 100644 --- a/config.cc +++ b/config.cc @@ -73,7 +73,7 @@ void config::reconstruct(lock &cfg_mutex_lock) { my_view_id = paxos.instance(); if (my_view_id > 0) { get_view(my_view_id, mems, cfg_mutex_lock); - LOG("config::reconstruct: " << my_view_id << " " << mems); + LOG("view " << my_view_id << " " << mems); } } @@ -82,12 +82,12 @@ void config::paxos_commit(unsigned instance, const string &value) { lock cfg_mutex_lock(cfg_mutex); vector newmem = members(value); - LOG("config::paxos_commit: " << instance << ": " << newmem); + LOG("instance " << instance << ": " << newmem); for (auto mem : mems) { - LOG("config::paxos_commit: is " << mem << " still a member?"); + LOG("is " << mem << " still a member?"); if (!isamember(mem, newmem) && me != mem) { - LOG("config::paxos_commit: delete " << mem); + LOG("delete " << mem); invalidate_handle(mem); } } @@ -115,11 +115,12 @@ bool config::add(const string &new_m, unsigned vid) { LOG("that's not my view id, " << my_view_id << "!"); return false; } - vector m = mems; + LOG("calling down to paxos layer"); + vector m(mems), cmems(mems); m.push_back(new_m); - vector cmems = mems; + LOG("old mems " << cmems << " " << value(cmems)); + LOG("new mems " << m << " " << value(m)); unsigned nextvid = my_view_id + 1; - LOG("calling down to paxos layer"); cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, value(m)); cfg_mutex_lock.lock(); @@ -129,7 +130,7 @@ bool config::add(const string &new_m, unsigned vid) { // caller should hold cfg_mutex bool config::remove(const string &m, lock &cfg_mutex_lock) { - LOG("config::remove: my_view_id " << my_view_id << " remove? " << m); + LOG("my_view_id " << my_view_id << " remove? " << m); vector n; for (auto mem : mems) { if (mem != m) @@ -140,7 +141,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) { cfg_mutex_lock.unlock(); bool r = paxos.run(nextvid, cmems, value(n)); cfg_mutex_lock.lock(); - LOG("config::remove: proposer returned " << (r ? "success" : "failure")); + LOG("proposer returned " << (r ? "success" : "failure")); return r; } @@ -149,16 +150,16 @@ void config::heartbeater() [[noreturn]] { while (1) { auto next_timeout = steady_clock::now() + seconds(3); - LOG("heartbeater: go to sleep"); + LOG("go to sleep"); config_cond.wait_until(cfg_mutex_lock, next_timeout); unsigned vid = my_view_id; vector cmems; get_view(vid, cmems, cfg_mutex_lock); - LOG("heartbeater: current membership " << cmems); + LOG("current membership " << cmems); if (!isamember(me, cmems)) { - LOG("heartbeater: not member yet; skip hearbeat"); + LOG("not member yet; skip hearbeat"); continue; } @@ -197,7 +198,7 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { unsigned vid = my_view_id; - LOG("doheartbeater to " << m << " (" << vid << ")"); + LOG("heartbeat to " << m << " (" << vid << ")"); handle h(m); cfg_mutex_lock.unlock(); @@ -215,9 +216,9 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { invalidate_handle(m); break; default: - LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); + LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r); res = (ret < 0) ? FAILURE : VIEWERR; } - LOG("doheartbeat done " << res); + LOG("done " << res); return res; } diff --git a/paxos.cc b/paxos.cc index 3166c92..85507d5 100644 --- a/paxos.cc +++ b/paxos.cc @@ -48,12 +48,15 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const bool r = false; proposal.n = max(promise.n, proposal.n) + 1; nodes_t accepts; - value_t v = newv; + value_t v; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { LOG("received a majority of prepare responses"); + if (!v.size()) + v = newv; + breakpoint1(); nodes_t nodes; @@ -203,17 +206,18 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value) { } void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { - LOG("acceptor::commit: instance=" << instance << " has v=" << value); + LOG("instance=" << instance << " has v=" << value); if (instance > instance_h) { - LOG("commit: highestacceptedinstance = " << instance); + LOG("highestacceptedinstance = " << instance); values[instance] = value; l.loginstance(instance, value); instance_h = instance; accepted = promise = {0, me}; - accepted_value.clear(); + string v = value; // gaaahhh aliasing of value and accepted_value + accepted_value.clear(); // this wipes out "value", too if (delegate) { pxs_mutex_lock.unlock(); - delegate->paxos_commit(instance, value); + delegate->paxos_commit(instance, v); pxs_mutex_lock.lock(); } } diff --git a/threaded_log.h b/threaded_log.h index 5d3942c..6630a86 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -38,7 +38,7 @@ operator<<(ostream &o, const A &a) { int _self_ = instance_name_map[this]; \ if (_self_==0) \ _self_ = instance_name_map[this] = ++next_instance_num; \ - cerr << "#" << setw(2) << " " << _self_; \ + cerr << "#" << left << setw(2) << _self_ << " "; \ } #define LOG_NONMEMBER(_x_) { \ diff --git a/types.h b/types.h index cdb629a..0897649 100644 --- a/types.h +++ b/types.h @@ -48,8 +48,6 @@ using std::setfill; using std::setprecision; using std::ostream; using std::istream; -using std::ostream_iterator; -using std::istream_iterator; using std::ios; #include @@ -133,12 +131,13 @@ template constexpr inline E to_enum(enum_type_t value) noexcept { template inline typename enable_if::value, string>::type implode(const C & v, string delim=" ") { - if (v.begin() == v.end()) + auto i=v.cbegin(), end=v.cend(); + if (i == end) return string(); ostringstream oss; - auto last = prev(v.end()); - copy(v.begin(), last, ostream_iterator(oss, delim.c_str())); - oss << *last; + oss << *i++; + while (i != end) + oss << delim << *i++; return oss.str(); } @@ -147,7 +146,7 @@ inline vector explode(const string &s, string delim=" ") { size_t start = 0, end = 0; while ((end = s.find(delim, start)) != string::npos) { out.push_back(s.substr(start, end - start)); - start = end + 1; + start = end + delim.size(); } out.push_back(s.substr(start)); return out;