X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5a5c578e2e358a121cdb9234a6cb11c4ecfbf323..b2609562b3d4fc548afcc0a3dfe4ff5fd4ae3d36:/paxos.cc diff --git a/paxos.cc b/paxos.cc index 095d56a..8b00ad8 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,12 +1,6 @@ #include "paxos.h" #include "handle.h" -string print_members(const nodes_t &nodes) { - ostringstream ost; - copy(nodes.begin(), nodes.end(), ostream_iterator(ost, ", ")); - return ost.str(); -} - bool isamember(const node_t & m, const nodes_t & nodes) { return find(nodes.begin(), nodes.end(), m) != nodes.end(); } @@ -26,7 +20,7 @@ bool majority(const nodes_t &l1, const nodes_t &l2) { // paxos_commit to inform higher layers of the agreed value for this // instance. -proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, +proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool _first, const node_t & _me, const value_t & _value) : delegate(_delegate), me (_me) { @@ -45,20 +39,23 @@ proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { lock ml(proposer_mutex); - LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable); + LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable); if (!stable) { // already running proposer? - LOG("proposer::run: already running"); + LOG("paxos proposer already running"); return false; } stable = false; bool r = false; - my_n.n = std::max(n_h.n, my_n.n) + 1; + proposal.n = max(promise.n, proposal.n) + 1; nodes_t accepts; - value_t v = newv; + value_t v; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - LOG("paxos::run: received a majority of prepare responses"); + LOG("received a majority of prepare responses"); + + if (!v.size()) + v = newv; breakpoint1(); @@ -67,20 +64,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - LOG("paxos::run: received a majority of accept responses"); + LOG("received a majority of accept responses"); breakpoint2(); decide(instance, accepts, v); r = true; } else { - LOG("paxos::run: no majority of accept responses"); + LOG("no majority of accept responses"); } } else { - LOG("paxos::run: no majority of prepare responses"); + LOG("no majority of prepare responses"); } } else { - LOG("paxos::run: prepare is rejected " << stable); + LOG("prepare is rejected " << stable); } stable = true; return r; @@ -88,6 +85,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v) { + LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"); prepareres res; prop_t highest_n_a{0, ""}; for (auto i : nodes) { @@ -96,17 +94,19 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, if (!r) continue; auto status = (paxos_protocol::status)r->call_timeout( - paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n); + paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal); if (status == paxos_protocol::OK) { if (res.oldinstance) { LOG("commiting old instance!"); commit(instance, res.v_a); return false; } + LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " << + "v_a=\"" << res.v_a << "\""); if (res.accept) { accepts.push_back(i); if (res.n_a >= highest_n_a) { - LOG("found a newer accepted proposal"); + LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"); v = res.v_a; highest_n_a = res.n_a; } @@ -125,7 +125,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, continue; bool accept = false; int status = r->call_timeout( - paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v); + paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); } @@ -138,27 +138,32 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const if (!r) continue; int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v); + r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v); } } paxos_protocol::status proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { + LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")"); lock ml(acceptor_mutex); r.oldinstance = false; r.accept = false; - r.n_a = n_a; - r.v_a = v_a; + r.n_a = accepted; + r.v_a = accepted_value; if (instance <= instance_h) { + LOG("old instance " << instance << " has value " << values[instance]); r.oldinstance = true; r.v_a = values[instance]; - } else if (n > n_h) { - n_h = n; - l.logprop(n_h); + } else if (n > promise) { + LOG("looks good to me"); + promise = n; + l.logprop(promise); r.accept = true; } else { LOG("I totally rejected this request. Ha."); } + LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << + "v_a=\"" << r.v_a << "\""); return paxos_protocol::OK; } @@ -167,10 +172,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t lock ml(acceptor_mutex); r = false; if (instance == instance_h + 1) { - if (n >= n_h) { - n_a = n; - v_a = v; - l.logaccept(n_a, v_a); + if (n >= promise) { + accepted = n; + accepted_value = v; + l.logaccept(accepted, accepted_value); r = true; } return paxos_protocol::OK; @@ -182,10 +187,10 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t paxos_protocol::status proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { lock ml(acceptor_mutex); - LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a); + LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value); if (instance == instance_h + 1) { - VERIFY(v_a == v); - commit(instance, v_a, ml); + VERIFY(accepted_value == v); + commit(instance, accepted_value, ml); } else if (instance <= instance_h) { // we are ahead; ignore. } else { @@ -201,17 +206,18 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value) { } void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { - LOG("acceptor::commit: instance=" << instance << " has v=" << value); + LOG("instance=" << instance << " has v=" << value); if (instance > instance_h) { - LOG("commit: highestacceptedinstance = " << instance); + LOG("highestacceptedinstance = " << instance); values[instance] = value; l.loginstance(instance, value); instance_h = instance; - n_a = n_h = {0, me}; - v_a.clear(); + accepted = promise = {0, me}; + string v = value; // gaaahhh aliasing of value and accepted_value + accepted_value.clear(); // this wipes out "value", too if (delegate) { pxs_mutex_lock.unlock(); - delegate->paxos_commit(instance, value); + delegate->paxos_commit(instance, v); pxs_mutex_lock.lock(); } }