X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/130f2d53438eb6193accb445aca52fa8e2fe4158..16e7c282c6fcec8189425bd15ec9e8a4a0ee857d:/paxos.cc diff --git a/paxos.cc b/paxos.cc index 4434788..3166c92 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,9 +1,15 @@ #include "paxos.h" #include "handle.h" -#include -#include "tprintf.h" -#include "lang/verify.h" -#include "lock.h" + +bool isamember(const node_t & m, const nodes_t & nodes) { + return find(nodes.begin(), nodes.end(), m) != nodes.end(); +} + +// check if l2 contains a majority of the elements of l1 +bool majority(const nodes_t &l1, const nodes_t &l2) { + auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2)); + return overlap >= (l1.size() >> 1) + 1; +} // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -14,162 +20,92 @@ // paxos_commit to inform higher layers of the agreed value for this // instance. - -bool -operator> (const prop_t &a, const prop_t &b) +proposer_acceptor::proposer_acceptor(paxos_change *_delegate, + bool _first, const node_t & _me, const value_t & _value) + : delegate(_delegate), me (_me) { - return (a.n > b.n || (a.n == b.n && a.m > b.m)); -} - -bool -operator>= (const prop_t &a, const prop_t &b) -{ - return (a.n > b.n || (a.n == b.n && a.m >= b.m)); -} - -std::string -print_members(const std::vector &nodes) -{ - std::string s; - s.clear(); - for (unsigned i = 0; i < nodes.size(); i++) { - s += nodes[i]; - if (i < (nodes.size()-1)) - s += ","; - } - return s; -} - -bool isamember(std::string m, const std::vector &nodes) -{ - for (unsigned i = 0; i < nodes.size(); i++) { - if (nodes[i] == m) return 1; - } - return 0; -} - -bool -proposer::isrunning() -{ - bool r; - lock ml(pxs_mutex); - r = !stable; - return r; -} - -// check if the servers in l2 contains a majority of servers in l1 -bool -proposer::majority(const std::vector &l1, - const std::vector &l2) -{ - unsigned n = 0; - - for (unsigned i = 0; i < l1.size(); i++) { - if (isamember(l1[i], l2)) - n++; - } - return n >= (l1.size() >> 1) + 1; -} - -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - std::string _me) - : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), - stable (true) -{ - my_n.n = 0; - my_n.m = me; -} + // at this point, the log has already been replayed + if (instance_h == 0 && _first) { + values[1] = _value; + l.loginstance(1, _value); + instance_h = 1; + } -void -proposer::setn() -{ - my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; + pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this); + pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this); + pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this); } -bool -proposer::run(int instance, std::vector cur_nodes, std::string newv) +bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv) { - std::vector accepts; - std::vector nodes; - std::string v; - bool r = false; - - lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); - if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); - return false; - } - stable = false; - setn(); - accepts.clear(); - v.clear(); - if (prepare(instance, accepts, cur_nodes, v)) { - - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); + lock ml(proposer_mutex); + LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable); + if (!stable) { // already running proposer? + LOG("paxos proposer already running"); + return false; + } + stable = false; + bool r = false; + proposal.n = max(promise.n, proposal.n) + 1; + nodes_t accepts; + value_t v = newv; + if (prepare(instance, accepts, cur_nodes, v)) { - if (v.size() == 0) - v = newv; + if (majority(cur_nodes, accepts)) { + LOG("received a majority of prepare responses"); - breakpoint1(); + breakpoint1(); - nodes = accepts; - accepts.clear(); - accept(instance, accepts, nodes, v); + nodes_t nodes; + nodes.swap(accepts); + accept(instance, accepts, nodes, v); - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); + if (majority(cur_nodes, accepts)) { + LOG("received a majority of accept responses"); - breakpoint2(); + breakpoint2(); - decide(instance, accepts, v); - r = true; - } else { - tprintf("paxos::manager: no majority of accept responses\n"); - } + decide(instance, accepts, v); + r = true; + } else { + LOG("no majority of accept responses"); + } + } else { + LOG("no majority of prepare responses"); + } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + LOG("prepare is rejected " << stable); } - } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); - } - stable = true; - return r; -} - -// proposer::run() calls prepare to send prepare RPCs to nodes -// and collect responses. if one of those nodes -// replies with an oldinstance, return false. -// otherwise fill in accepts with set of nodes that accepted, -// set v to the v_a with the highest n_a, and return true. -bool -proposer::prepare(unsigned instance, std::vector &accepts, - std::vector nodes, - std::string &v) -{ - struct paxos_protocol::preparearg arg = { instance, my_n }; - struct paxos_protocol::prepareres res; - prop_t n_a = { 0, "" }; - rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); - if (!(r = h.safebind())) + stable = true; + return r; +} + +bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, value_t & v) { + LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")"); + prepareres res; + prop_t highest_n_a{0, ""}; + for (auto i : nodes) { + handle h(i); + rpcc *r = h.safebind(); + if (!r) continue; - int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); + auto status = (paxos_protocol::status)r->call_timeout( + paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal); if (status == paxos_protocol::OK) { if (res.oldinstance) { - tprintf("commiting old instance!\n"); - acc->commit(instance, res.v_a); + LOG("commiting old instance!"); + commit(instance, res.v_a); return false; } + LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " << + "v_a=\"" << res.v_a << "\""); if (res.accept) { - accepts.push_back(*i); - if (res.n_a >= n_a) { - tprintf("found a newer accepted proposal\n"); + accepts.push_back(i); + if (res.n_a >= highest_n_a) { + LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")"); v = res.v_a; - n_a = res.n_a; + highest_n_a = res.n_a; } } } @@ -177,197 +113,133 @@ proposer::prepare(unsigned instance, std::vector &accepts, return true; } -// run() calls this to send out accept RPCs to accepts. -// fill in accepts with list of nodes that accepted. -void -proposer::accept(unsigned instance, std::vector &accepts, - std::vector nodes, std::string v) -{ - struct paxos_protocol::acceptarg arg = { instance, my_n, v }; - rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); - if (!(r = h.safebind())) +void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, + const nodes_t & nodes, const value_t & v) { + for (auto i : nodes) { + handle h(i); + rpcc *r = h.safebind(); + if (!r) continue; bool accept = false; - int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); - if (status == paxos_protocol::OK) { - if (accept) - accepts.push_back(*i); - } + int status = r->call_timeout( + paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v); + if (status == paxos_protocol::OK && accept) + accepts.push_back(i); } } -void -proposer::decide(unsigned instance, std::vector accepts, - std::string v) -{ - struct paxos_protocol::decidearg arg = { instance, v }; - rpcc *r; - for (std::vector::iterator i=accepts.begin(); i!=accepts.end(); i++) { - handle h(*i); - if (!(r = h.safebind())) +void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) { + for (auto i : accepts) { + handle h(i); + rpcc *r = h.safebind(); + if (!r) continue; int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg); + r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v); } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, - std::string _value) - : cfg(_cfg), me (_me), instance_h(0) -{ - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - - l = new log (this, me); - - if (instance_h == 0 && _first) { - values[1] = _value; - l->loginstance(1, _value); - instance_h = 1; - } - - pxs = new rpcs(atoi(_me.c_str())); - pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq); - pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq); - pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq); -} - paxos_protocol::status -acceptor::preparereq(std::string src, paxos_protocol::preparearg a, - paxos_protocol::prepareres &r) -{ - lock ml(pxs_mutex); +proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) { + LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")"); + lock ml(acceptor_mutex); r.oldinstance = false; r.accept = false; - r.n_a = n_a; - r.v_a = v_a; - if (a.instance <= instance_h) { + r.n_a = accepted; + r.v_a = accepted_value; + if (instance <= instance_h) { + LOG("old instance " << instance << " has value " << values[instance]); r.oldinstance = true; - r.v_a = values[a.instance]; - } else if (a.n > n_h) { - n_h = a.n; - l->logprop(n_h); + r.v_a = values[instance]; + } else if (n > promise) { + LOG("looks good to me"); + promise = n; + l.logprop(promise); r.accept = true; } else { - tprintf("I totally rejected this request. Ha.\n"); + LOG("I totally rejected this request. Ha."); } + LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << + "v_a=\"" << r.v_a << "\""); return paxos_protocol::OK; } paxos_protocol::status -acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) -{ - lock ml(pxs_mutex); +proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) { + lock ml(acceptor_mutex); r = false; - if (a.n >= n_h) { - n_a = a.n; - v_a = a.v; - l->logaccept(n_a, v_a); - r = true; + if (instance == instance_h + 1) { + if (n >= promise) { + accepted = n; + accepted_value = v; + l.logaccept(accepted, accepted_value); + r = true; + } + return paxos_protocol::OK; + } else { + return paxos_protocol::ERR; } - return paxos_protocol::OK; } -// the src argument is only for debug purpose - paxos_protocol::status -acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) -{ - lock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); - } else if (a.instance <= instance_h) { - // we are ahead ignore. +paxos_protocol::status +proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) { + lock ml(acceptor_mutex); + LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value); + if (instance == instance_h + 1) { + VERIFY(accepted_value == v); + commit(instance, accepted_value, ml); + } else if (instance <= instance_h) { + // we are ahead; ignore. } else { - // we are behind + // we are behind. VERIFY(0); } return paxos_protocol::OK; } -void -acceptor::commit_wo(unsigned instance, std::string value) -{ - //assume pxs_mutex is held - adopt_lock ml(pxs_mutex); - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); +void proposer_acceptor::commit(unsigned instance, const value_t & value) { + lock ml(acceptor_mutex); + commit(instance, value, ml); +} + +void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) { + LOG("acceptor::commit: instance=" << instance << " has v=" << value); if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); + LOG("commit: highestacceptedinstance = " << instance); values[instance] = value; - l->loginstance(instance, value); + l.loginstance(instance, value); instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - if (cfg) { - ml.unlock(); - cfg->paxos_commit(instance, value); - ml.lock(); + accepted = promise = {0, me}; + accepted_value.clear(); + if (delegate) { + pxs_mutex_lock.unlock(); + delegate->paxos_commit(instance, value); + pxs_mutex_lock.lock(); } } } -void -acceptor::commit(unsigned instance, std::string value) -{ - lock ml(pxs_mutex); - commit_wo(instance, value); -} - -std::string -acceptor::dump() -{ - return l->dump(); -} - -void -acceptor::restore(std::string s) -{ - l->restore(s); - l->logread(); -} - - - // For testing purposes - -// Call this from your code between phases prepare and accept of proposer -void -proposer::breakpoint1() -{ - if (break1) { - tprintf("Dying at breakpoint 1!\n"); - exit(1); - } +void proposer_acceptor::breakpoint1() { + if (break1) { + LOG("Dying at breakpoint 1!"); + exit(1); + } } -// Call this from your code between phases accept and decide of proposer -void -proposer::breakpoint2() -{ - if (break2) { - tprintf("Dying at breakpoint 2!\n"); - exit(1); - } +void proposer_acceptor::breakpoint2() { + if (break2) { + LOG("Dying at breakpoint 2!"); + exit(1); + } } -void -proposer::breakpoint(int b) -{ - if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); - break1 = true; - } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); - break2 = true; - } +void proposer_acceptor::breakpoint(int b) { + if (b == 3) { + LOG("Proposer: breakpoint 1"); + break1 = true; + } else if (b == 4) { + LOG("Proposer: breakpoint 2"); + break2 = true; + } }