X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5d99dbf06a14904944f5593c63705934bdfdcfb7..e0c49ff6ba44cf5b47ab91d58b67763f5a1c7a58:/paxos.cc?ds=sidebyside diff --git a/paxos.cc b/paxos.cc index b39fa5b..c7f2d1d 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,13 +1,17 @@ #include "paxos.h" -#include "handle.h" + +using namespace std::placeholders; + +paxos_change::~paxos_change() {} bool isamember(const node_t & m, const nodes_t & nodes) { - return find(nodes.begin(), nodes.end(), m) != nodes.end(); + return std::find(nodes.begin(), nodes.end(), m) != nodes.end(); } // check if l2 contains a majority of the elements of l1 -bool majority(const nodes_t &l1, const nodes_t &l2) { - auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2)); +bool majority(const nodes_t & l1, const nodes_t & l2) { + auto overlap = (size_t)std::count_if( + l1.begin(), l1.end(), std::bind(isamember, _1, l2)); return overlap >= (l1.size() >> 1) + 1; } @@ -20,7 +24,7 @@ bool majority(const nodes_t &l1, const nodes_t &l2) { // paxos_commit to inform higher layers of the agreed value for this // instance. -proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, +proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool _first, const node_t & _me, const value_t & _value) : delegate(_delegate), me (_me) { @@ -39,20 +43,23 @@ proposer_acceptor::proposer_acceptor(class paxos_change *_delegate, bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { lock ml(proposer_mutex); - LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable); + LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable; if (!stable) { // already running proposer? - LOG("paxos proposer already running"); + LOG << "paxos proposer already running"; return false; } stable = false; bool r = false; - proposal.n = max(promise.n, proposal.n) + 1; + proposal.n = std::max(promise.n, proposal.n) + 1; nodes_t accepts; - value_t v = newv; + value_t v; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - LOG("received a majority of prepare responses"); + LOG << "received a majority of prepare responses"; + + if (!v.size()) + v = newv; breakpoint1(); @@ -61,20 +68,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - LOG("received a majority of accept responses"); + LOG << "received a majority of accept responses"; breakpoint2(); decide(instance, accepts, v); r = true; } else { - LOG("no majority of accept responses"); + LOG << "no majority of accept responses"; } } else { - LOG("no majority of prepare responses"); + LOG << "no majority of prepare responses"; } } else { - LOG("prepare is rejected " << stable); + LOG << "prepare is rejected " << stable; } stable = true; return r; @@ -82,28 +89,27 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, const nodes_t & nodes, value_t & v) { - LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"); + LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"; prepareres res; prop_t highest_n_a{0, ""}; for (auto i : nodes) { - handle h(i); - rpcc *r = h.safebind(); - if (!r) + auto cl = rpcc::bind_cached(i); + if (!cl) continue; - auto status = (paxos_protocol::status)r->call_timeout( - paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal); + int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100), + res, me, instance, proposal); if (status == paxos_protocol::OK) { - if (res.oldinstance) { - LOG("commiting old instance!"); - commit(instance, res.v_a); + LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n + << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\""; + if (res.type == prepareres::oldinstance) { + LOG << "commiting old instance!"; + lock ml(acceptor_mutex); + commit(instance, res.v_a, ml); return false; - } - LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " << - "v_a=\"" << res.v_a << "\""); - if (res.accept) { + } else if (res.type == prepareres::accept) { accepts.push_back(i); if (res.n_a >= highest_n_a) { - LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"); + LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"; v = res.v_a; highest_n_a = res.n_a; } @@ -115,52 +121,42 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, const nodes_t & nodes, const value_t & v) { + bool accept = false; for (auto i : nodes) { - handle h(i); - rpcc *r = h.safebind(); - if (!r) - continue; - bool accept = false; - int status = r->call_timeout( - paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v); - if (status == paxos_protocol::OK && accept) - accepts.push_back(i); + if (auto cl = rpcc::bind_cached(i)) { + int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100), + accept, me, instance, proposal, v); + if (status == paxos_protocol::OK && accept) + accepts.push_back(i); + } } } void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) { - for (auto i : accepts) { - handle h(i); - rpcc *r = h.safebind(); - if (!r) - continue; - int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v); - } + int res = 0; + for (auto i : accepts) + if (auto cl = rpcc::bind_cached(i)) + cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v); } paxos_protocol::status proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { - LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")"); + LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")"; lock ml(acceptor_mutex); - r.oldinstance = false; - r.accept = false; - r.n_a = accepted; - r.v_a = accepted_value; if (instance <= instance_h) { - LOG("old instance " << instance << " has value " << values[instance]); - r.oldinstance = true; - r.v_a = values[instance]; + LOG << "old instance " << instance << " has value " << values[instance]; + r = prepareres{prepareres::oldinstance, accepted, values[instance]}; } else if (n > promise) { - LOG("looks good to me"); + LOG << "looks good to me"; promise = n; l.logprop(promise); - r.accept = true; + r = prepareres{prepareres::accept, accepted, accepted_value}; } else { - LOG("I totally rejected this request. Ha."); + LOG << "I totally rejected this request. Ha."; + r = prepareres{prepareres::reject, accepted, accepted_value}; } - LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << - "v_a=\"" << r.v_a << "\""); + LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept + << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\""; return paxos_protocol::OK; } @@ -168,26 +164,24 @@ paxos_protocol::status proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) { lock ml(acceptor_mutex); r = false; - if (instance == instance_h + 1) { - if (n >= promise) { - accepted = n; - accepted_value = v; - l.logaccept(accepted, accepted_value); - r = true; - } - return paxos_protocol::OK; - } else { + if (instance != instance_h + 1) return paxos_protocol::ERR; + if (n >= promise) { + accepted = n; + accepted_value = v; + l.logaccept(accepted, accepted_value); + r = true; } + return paxos_protocol::OK; } paxos_protocol::status proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { lock ml(acceptor_mutex); - LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value); + LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value; if (instance == instance_h + 1) { VERIFY(accepted_value == v); - commit(instance, accepted_value, ml); + commit(instance, v, ml); } else if (instance <= instance_h) { // we are ahead; ignore. } else { @@ -197,24 +191,21 @@ proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const val return paxos_protocol::OK; } -void proposer_acceptor::commit(unsigned instance, const value_t & value) { - lock ml(acceptor_mutex); - commit(instance, value, ml); -} - -void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { - LOG("acceptor::commit: instance=" << instance << " has v=" << value); +void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) { + VERIFY(&value != &accepted_value); // eited by aliasing? + VERIFY(acceptor_mutex_lock); + LOG << "instance=" << instance << " has v=" << value; if (instance > instance_h) { - LOG("commit: highestacceptedinstance = " << instance); + LOG << "highestacceptedinstance = " << instance; values[instance] = value; l.loginstance(instance, value); instance_h = instance; accepted = promise = {0, me}; accepted_value.clear(); if (delegate) { - pxs_mutex_lock.unlock(); + acceptor_mutex_lock.unlock(); delegate->paxos_commit(instance, value); - pxs_mutex_lock.lock(); + acceptor_mutex_lock.lock(); } } } @@ -222,24 +213,24 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & // For testing purposes void proposer_acceptor::breakpoint1() { if (break1) { - LOG("Dying at breakpoint 1!"); + LOG << "Dying at breakpoint 1!"; exit(1); } } void proposer_acceptor::breakpoint2() { if (break2) { - LOG("Dying at breakpoint 2!"); + LOG << "Dying at breakpoint 2!"; exit(1); } } void proposer_acceptor::breakpoint(int b) { if (b == 3) { - LOG("Proposer: breakpoint 1"); + LOG << "breakpoint 1"; break1 = true; } else if (b == 4) { - LOG("Proposer: breakpoint 2"); + LOG << "breakpoint 2"; break2 = true; } }