X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/eeab3e6cade87c1fe0a5f3d93522e12ccb9ec2ab..f2170465073de34adf89161d4287182b518352c4:/paxos.cc?ds=sidebyside diff --git a/paxos.cc b/paxos.cc index b0ec640..83bf4f1 100644 --- a/paxos.cc +++ b/paxos.cc @@ -14,129 +14,119 @@ // paxos_commit to inform higher layers of the agreed value for this // instance. - -bool -operator> (const prop_t &a, const prop_t &b) -{ - return (a.n > b.n || (a.n == b.n && a.m > b.m)); +bool operator> (const prop_t &a, const prop_t &b) { + return (a.n > b.n || (a.n == b.n && a.m > b.m)); } -bool -operator>= (const prop_t &a, const prop_t &b) -{ - return (a.n > b.n || (a.n == b.n && a.m >= b.m)); +bool operator>= (const prop_t &a, const prop_t &b) { + return (a.n > b.n || (a.n == b.n && a.m >= b.m)); } std::string -print_members(const std::vector &nodes) -{ - std::string s; - s.clear(); - for (unsigned i = 0; i < nodes.size(); i++) { - s += nodes[i]; - if (i < (nodes.size()-1)) - s += ","; - } - return s; +print_members(const std::vector &nodes) { + std::string s; + s.clear(); + for (unsigned i = 0; i < nodes.size(); i++) { + s += nodes[i]; + if (i < (nodes.size()-1)) + s += ","; + } + return s; } -bool isamember(std::string m, const std::vector &nodes) -{ - for (unsigned i = 0; i < nodes.size(); i++) { - if (nodes[i] == m) return 1; - } - return 0; + +bool isamember(const std::string & m, const std::vector & nodes) { + for (auto n : nodes) { + if (n == m) + return 1; + } + return 0; } -bool -proposer::isrunning() -{ - bool r; - lock ml(pxs_mutex); - r = !stable; - return r; +bool proposer::isrunning() { + bool r; + lock ml(pxs_mutex); + r = !stable; + return r; } // check if the servers in l2 contains a majority of servers in l1 -bool -proposer::majority(const std::vector &l1, - const std::vector &l2) -{ - unsigned n = 0; +bool proposer::majority(const std::vector &l1, + const std::vector &l2) { + unsigned n = 0; - for (unsigned i = 0; i < l1.size(); i++) { - if (isamember(l1[i], l2)) - n++; - } - return n >= (l1.size() >> 1) + 1; + for (unsigned i = 0; i < l1.size(); i++) { + if (isamember(l1[i], l2)) + n++; + } + return n >= (l1.size() >> 1) + 1; } proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - std::string _me) + const std::string &_me) : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { - my_n.n = 0; - my_n.m = me; + my_n.n = 0; + my_n.m = me; } -void -proposer::setn() +void proposer::setn() { - my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; + my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; } -bool -proposer::run(int instance, std::vector cur_nodes, std::string newv) +bool proposer::run(unsigned instance, const std::vector & cur_nodes, + const std::string & newv) { - std::vector accepts; - std::vector nodes; - std::string v; - bool r = false; - - lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); - if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); - return false; - } - stable = false; - setn(); - accepts.clear(); - v.clear(); - if (prepare(instance, accepts, cur_nodes, v)) { - - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); - - if (v.size() == 0) - v = newv; - - breakpoint1(); - - nodes = accepts; - accepts.clear(); - accept(instance, accepts, nodes, v); - - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); - - breakpoint2(); - - decide(instance, accepts, v); - r = true; - } else { - tprintf("paxos::manager: no majority of accept responses\n"); - } + std::vector accepts; + std::vector nodes; + std::string v; + bool r = false; + + lock ml(pxs_mutex); + tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", + print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + if (!stable) { // already running proposer? + tprintf("proposer::run: already running\n"); + return false; + } + stable = false; + setn(); + accepts.clear(); + v.clear(); + if (prepare(instance, accepts, cur_nodes, v)) { + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of prepare responses\n"); + + if (v.size() == 0) + v = newv; + + breakpoint1(); + + nodes = accepts; + accepts.clear(); + accept(instance, accepts, nodes, v); + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of accept responses\n"); + + breakpoint2(); + + decide(instance, accepts, v); + r = true; + } else { + tprintf("paxos::manager: no majority of accept responses\n"); + } + } else { + tprintf("paxos::manager: no majority of prepare responses\n"); + } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + tprintf("paxos::manager: prepare is rejected %d\n", stable); } - } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); - } - stable = true; - return r; + stable = true; + return r; } // proposer::run() calls prepare to send prepare RPCs to nodes @@ -145,16 +135,16 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector &accepts, - std::vector nodes, - std::string &v) +proposer::prepare(unsigned instance, std::vector & accepts, + const std::vector & nodes, + std::string & v) { struct paxos_protocol::preparearg arg = { instance, my_n }; struct paxos_protocol::prepareres res; prop_t n_a = { 0, "" }; rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); + for (auto i : nodes) { + handle h(i); if (!(r = h.safebind())) continue; int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); @@ -165,7 +155,7 @@ proposer::prepare(unsigned instance, std::vector &accepts, return false; } if (res.accept) { - accepts.push_back(*i); + accepts.push_back(i); if (res.n_a >= n_a) { tprintf("found a newer accepted proposal\n"); v = res.v_a; @@ -180,32 +170,30 @@ proposer::prepare(unsigned instance, std::vector &accepts, // run() calls this to send out accept RPCs to accepts. // fill in accepts with list of nodes that accepted. void -proposer::accept(unsigned instance, std::vector &accepts, - std::vector nodes, std::string v) +proposer::accept(unsigned instance, std::vector & accepts, + const std::vector & nodes, const std::string & v) { struct paxos_protocol::acceptarg arg = { instance, my_n, v }; rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); + for (auto i : nodes) { + handle h(i); if (!(r = h.safebind())) continue; bool accept = false; int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); - if (status == paxos_protocol::OK) { - if (accept) - accepts.push_back(*i); - } + if (status == paxos_protocol::OK && accept) + accepts.push_back(i); } } void -proposer::decide(unsigned instance, std::vector accepts, - std::string v) +proposer::decide(unsigned instance, const std::vector & accepts, + const std::string & v) { struct paxos_protocol::decidearg arg = { instance, v }; rpcc *r; - for (std::vector::iterator i=accepts.begin(); i!=accepts.end(); i++) { - handle h(*i); + for (auto i : accepts) { + handle h(i); if (!(r = h.safebind())) continue; int res = 0; @@ -213,32 +201,33 @@ proposer::decide(unsigned instance, std::vector accepts, } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, - std::string _value) +acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me, + const std::string & _value) : cfg(_cfg), me (_me), instance_h(0) { - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - - l = new log (this, me); - - if (instance_h == 0 && _first) { - values[1] = _value; - l->loginstance(1, _value); - instance_h = 1; - } - - pxs = new rpcs(atoi(_me.c_str())); - pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); - pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); - pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + + l = new log (this, me); + + if (instance_h == 0 && _first) { + values[1] = _value; + l->loginstance(1, _value); + instance_h = 1; + } + + pxs = new rpcs((uint32_t)std::stoi(_me)); + pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); + pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); + pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a) +acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, + paxos_protocol::preparearg a) { lock ml(pxs_mutex); r.oldinstance = false; @@ -259,7 +248,7 @@ acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_proto } paxos_protocol::status -acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) +acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) { lock ml(pxs_mutex); r = false; @@ -272,16 +261,16 @@ acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) return paxos_protocol::OK; } -// the src argument is only for debug purpose - paxos_protocol::status -acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) +// the src argument is only for debugging +paxos_protocol::status +acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) { lock ml(pxs_mutex); tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", a.instance, instance_h, v_a.c_str()); if (a.instance == instance_h + 1) { VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); + commit(a.instance, v_a, ml); } else if (a.instance <= instance_h) { // we are ahead ignore. } else { @@ -292,10 +281,8 @@ acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) } void -acceptor::commit_wo(unsigned instance, std::string value) +acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock) { - //assume pxs_mutex is held - adopt_lock ml(pxs_mutex); tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); if (instance > instance_h) { tprintf("commit: highestaccepteinstance = %d\n", instance); @@ -308,31 +295,31 @@ acceptor::commit_wo(unsigned instance, std::string value) n_a.m = me; v_a.clear(); if (cfg) { - ml.unlock(); + pxs_mutex_lock.unlock(); cfg->paxos_commit(instance, value); - ml.lock(); + pxs_mutex_lock.lock(); } } } void -acceptor::commit(unsigned instance, std::string value) +acceptor::commit(unsigned instance, const std::string & value) { lock ml(pxs_mutex); - commit_wo(instance, value); + commit(instance, value, ml); } std::string acceptor::dump() { - return l->dump(); + return l->dump(); } void -acceptor::restore(std::string s) +acceptor::restore(const std::string & s) { - l->restore(s); - l->logread(); + l->restore(s); + l->logread(); } @@ -343,30 +330,30 @@ acceptor::restore(std::string s) void proposer::breakpoint1() { - if (break1) { - tprintf("Dying at breakpoint 1!\n"); - exit(1); - } + if (break1) { + tprintf("Dying at breakpoint 1!\n"); + exit(1); + } } // Call this from your code between phases accept and decide of proposer void proposer::breakpoint2() { - if (break2) { - tprintf("Dying at breakpoint 2!\n"); - exit(1); - } + if (break2) { + tprintf("Dying at breakpoint 2!\n"); + exit(1); + } } void proposer::breakpoint(int b) { - if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); - break1 = true; - } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); - break2 = true; - } + if (b == 3) { + tprintf("Proposer: breakpoint 1\n"); + break1 = true; + } else if (b == 4) { + tprintf("Proposer: breakpoint 2\n"); + break2 = true; + } }