X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/f2170465073de34adf89161d4287182b518352c4..refs/heads/iannucci:/paxos.cc?ds=inline diff --git a/paxos.cc b/paxos.cc index 83bf4f1..108dfad 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,9 +1,20 @@ -#include "paxos.h" -#include "handle.h" -#include -#include "tprintf.h" -#include "lang/verify.h" -#include "lock.h" +#include "include/paxos.h" + +using namespace std::placeholders; +using namespace std::chrono; + +paxos_change::~paxos_change() {} + +bool isamember(const node_t & m, const nodes_t & nodes) { + return std::find(nodes.begin(), nodes.end(), m) != nodes.end(); +} + +// check if l2 contains a majority of the elements of l1 +bool majority(const nodes_t & l1, const nodes_t & l2) { + auto overlap = (size_t)std::count_if( + l1.begin(), l1.end(), std::bind(isamember, _1, l2)); + return overlap >= (l1.size() >> 1) + 1; +} // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -14,152 +25,106 @@ // paxos_commit to inform higher layers of the agreed value for this // instance. -bool operator> (const prop_t &a, const prop_t &b) { - return (a.n > b.n || (a.n == b.n && a.m > b.m)); -} - -bool operator>= (const prop_t &a, const prop_t &b) { - return (a.n > b.n || (a.n == b.n && a.m >= b.m)); -} - -std::string -print_members(const std::vector &nodes) { - std::string s; - s.clear(); - for (unsigned i = 0; i < nodes.size(); i++) { - s += nodes[i]; - if (i < (nodes.size()-1)) - s += ","; - } - return s; -} - - -bool isamember(const std::string & m, const std::vector & nodes) { - for (auto n : nodes) { - if (n == m) - return 1; - } - return 0; -} - -bool proposer::isrunning() { - bool r; - lock ml(pxs_mutex); - r = !stable; - return r; -} - -// check if the servers in l2 contains a majority of servers in l1 -bool proposer::majority(const std::vector &l1, - const std::vector &l2) { - unsigned n = 0; +proposer_acceptor::proposer_acceptor(paxos_change *_delegate, + bool _first, const node_t & _me, const value_t & _value) + : delegate(_delegate), me (_me) +{ + l.handler([this] (auto entry) { + instance_h = entry.number; + values[entry.number] = entry.value; + accepted = promise = {0, me}; + accepted_value.clear(); + }); + l.handler([this] (auto entry) { + promise = entry.promise; + }); + l.handler([this] (auto entry) { + accepted = entry.number; + accepted_value = entry.value; + }); - for (unsigned i = 0; i < l1.size(); i++) { - if (isamember(l1[i], l2)) - n++; - } - return n >= (l1.size() >> 1) + 1; -} + if (instance_h == 0 && _first) + l.append(log_instance{1, _value}); -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - const std::string &_me) - : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), - stable (true) -{ - my_n.n = 0; - my_n.m = me; -} + l.replay(); -void proposer::setn() -{ - my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; + pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this); + pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this); + pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this); } -bool proposer::run(unsigned instance, const std::vector & cur_nodes, - const std::string & newv) +bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { - std::vector accepts; - std::vector nodes; - std::string v; - bool r = false; - - lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + lock ml(proposer_mutex); + LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable; if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); + LOG << "paxos proposer already running"; return false; } stable = false; - setn(); - accepts.clear(); - v.clear(); + bool r = false; + proposal.n = std::max(promise.n, proposal.n) + 1; + nodes_t accepts; + value_t v; if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); + LOG << "received a majority of prepare responses"; - if (v.size() == 0) + if (!v.size()) v = newv; breakpoint1(); - nodes = accepts; - accepts.clear(); + nodes_t nodes; + nodes.swap(accepts); accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); + LOG << "received a majority of accept responses"; breakpoint2(); decide(instance, accepts, v); r = true; } else { - tprintf("paxos::manager: no majority of accept responses\n"); + LOG << "no majority of accept responses"; } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + LOG << "no majority of prepare responses"; } } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); + LOG << "prepare is rejected " << stable; } stable = true; return r; } -// proposer::run() calls prepare to send prepare RPCs to nodes -// and collect responses. if one of those nodes -// replies with an oldinstance, return false. -// otherwise fill in accepts with set of nodes that accepted, -// set v to the v_a with the highest n_a, and return true. -bool -proposer::prepare(unsigned instance, std::vector & accepts, - const std::vector & nodes, - std::string & v) -{ - struct paxos_protocol::preparearg arg = { instance, my_n }; - struct paxos_protocol::prepareres res; - prop_t n_a = { 0, "" }; - rpcc *r; +bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, value_t & v) { + LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"; + prepareres res; + prop_t highest_n_a{0, ""}; for (auto i : nodes) { - handle h(i); - if (!(r = h.safebind())) + auto cl = rpcc::bind_cached(i); + if (!cl) continue; - int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); + int status = cl->call_timeout(paxos_protocol::preparereq, 100ms, + res, me, instance, proposal); if (status == paxos_protocol::OK) { - if (res.oldinstance) { - tprintf("commiting old instance!\n"); - acc->commit(instance, res.v_a); + LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n + << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\""; + if (res.type == prepareres::oldinstance) { + LOG << "commiting old instance!"; + lock ml(acceptor_mutex); + commit(instance, res.v_a, ml); return false; - } - if (res.accept) { + } else if (res.type == prepareres::accept) { accepts.push_back(i); - if (res.n_a >= n_a) { - tprintf("found a newer accepted proposal\n"); + if (res.n_a >= highest_n_a) { + LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"; v = res.v_a; - n_a = res.n_a; + highest_n_a = res.n_a; } } } @@ -167,193 +132,118 @@ proposer::prepare(unsigned instance, std::vector & accepts, return true; } -// run() calls this to send out accept RPCs to accepts. -// fill in accepts with list of nodes that accepted. -void -proposer::accept(unsigned instance, std::vector & accepts, - const std::vector & nodes, const std::string & v) -{ - struct paxos_protocol::acceptarg arg = { instance, my_n, v }; - rpcc *r; +void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, const value_t & v) { + bool accept = false; for (auto i : nodes) { - handle h(i); - if (!(r = h.safebind())) - continue; - bool accept = false; - int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); - if (status == paxos_protocol::OK && accept) - accepts.push_back(i); + if (auto cl = rpcc::bind_cached(i)) { + int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms, + accept, me, instance, proposal, v); + if (status == paxos_protocol::OK && accept) + accepts.push_back(i); + } } } -void -proposer::decide(unsigned instance, const std::vector & accepts, - const std::string & v) -{ - struct paxos_protocol::decidearg arg = { instance, v }; - rpcc *r; - for (auto i : accepts) { - handle h(i); - if (!(r = h.safebind())) - continue; - int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg); - } -} - -acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me, - const std::string & _value) - : cfg(_cfg), me (_me), instance_h(0) -{ - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - - l = new log (this, me); - - if (instance_h == 0 && _first) { - values[1] = _value; - l->loginstance(1, _value); - instance_h = 1; - } - - pxs = new rpcs((uint32_t)std::stoi(_me)); - pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); - pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); - pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); +void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) { + int res = 0; + for (auto i : accepts) + if (auto cl = rpcc::bind_cached(i)) + cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v); } paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, - paxos_protocol::preparearg a) -{ - lock ml(pxs_mutex); - r.oldinstance = false; - r.accept = false; - r.n_a = n_a; - r.v_a = v_a; - if (a.instance <= instance_h) { - r.oldinstance = true; - r.v_a = values[a.instance]; - } else if (a.n > n_h) { - n_h = a.n; - l->logprop(n_h); - r.accept = true; +proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { + LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")"; + lock ml(acceptor_mutex); + if (instance <= instance_h) { + LOG << "old instance " << instance << " has value " << values[instance]; + r = prepareres{prepareres::oldinstance, accepted, values[instance]}; + } else if (n > promise) { + LOG << "looks good to me"; + promise = n; + l.append(log_proposal{n}); + r = prepareres{prepareres::accept, accepted, accepted_value}; } else { - tprintf("I totally rejected this request. Ha.\n"); + LOG << "I totally rejected this request. Ha."; + r = prepareres{prepareres::reject, accepted, accepted_value}; } + LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept + << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\""; return paxos_protocol::OK; } paxos_protocol::status -acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) -{ - lock ml(pxs_mutex); +proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) { + lock ml(acceptor_mutex); r = false; - if (a.n >= n_h) { - n_a = a.n; - v_a = a.v; - l->logaccept(n_a, v_a); + if (instance != instance_h + 1) + return paxos_protocol::ERR; + if (n >= promise) { + accepted = n; + accepted_value = v; + l.append(log_accept{n, v}); r = true; } return paxos_protocol::OK; } -// the src argument is only for debugging paxos_protocol::status -acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) -{ - lock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit(a.instance, v_a, ml); - } else if (a.instance <= instance_h) { - // we are ahead ignore. +proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { + lock ml(acceptor_mutex); + LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value; + if (instance == instance_h + 1) { + VERIFY(accepted_value == v); + commit(instance, v, ml); + } else if (instance <= instance_h) { + // we are ahead; ignore. } else { - // we are behind + // we are behind. VERIFY(0); } return paxos_protocol::OK; } -void -acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock) -{ - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); +void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) { + VERIFY(&value != &accepted_value); // eited by aliasing? + VERIFY(acceptor_mutex_lock); + LOG << "instance=" << instance << " has v=" << value; if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); - values[instance] = value; - l->loginstance(instance, value); + LOG << "highestacceptedinstance = " << instance; + l.append(log_instance{instance, value}); instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - if (cfg) { - pxs_mutex_lock.unlock(); - cfg->paxos_commit(instance, value); - pxs_mutex_lock.lock(); + values[instance] = value; + accepted = promise = {0, me}; + accepted_value.clear(); + if (delegate) { + acceptor_mutex_lock.unlock(); + delegate->paxos_commit(instance, value); + acceptor_mutex_lock.lock(); } } } -void -acceptor::commit(unsigned instance, const std::string & value) -{ - lock ml(pxs_mutex); - commit(instance, value, ml); -} - -std::string -acceptor::dump() -{ - return l->dump(); -} - -void -acceptor::restore(const std::string & s) -{ - l->restore(s); - l->logread(); -} - - - // For testing purposes - -// Call this from your code between phases prepare and accept of proposer -void -proposer::breakpoint1() -{ +void proposer_acceptor::breakpoint1() { if (break1) { - tprintf("Dying at breakpoint 1!\n"); + LOG << "Dying at breakpoint 1!"; exit(1); } } -// Call this from your code between phases accept and decide of proposer -void -proposer::breakpoint2() -{ +void proposer_acceptor::breakpoint2() { if (break2) { - tprintf("Dying at breakpoint 2!\n"); + LOG << "Dying at breakpoint 2!"; exit(1); } } -void -proposer::breakpoint(int b) -{ +void proposer_acceptor::breakpoint(int b) { if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); + LOG << "breakpoint 1"; break1 = true; } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); + LOG << "breakpoint 2"; break2 = true; } }