X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/f2170465073de34adf89161d4287182b518352c4..2546a41ad36fdc9ef6471cb35a1d56930ae1b527:/paxos.cc diff --git a/paxos.cc b/paxos.cc index 83bf4f1..46d9c1c 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,10 +1,11 @@ #include "paxos.h" #include "handle.h" -#include -#include "tprintf.h" +#include "threaded_log.h" #include "lang/verify.h" #include "lock.h" +using std::stoi; + // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made // Simple". To kick off an instance of Paxos, the caller supplies a @@ -22,9 +23,9 @@ bool operator>= (const prop_t &a, const prop_t &b) { return (a.n > b.n || (a.n == b.n && a.m >= b.m)); } -std::string -print_members(const std::vector &nodes) { - std::string s; +string +print_members(const vector &nodes) { + string s; s.clear(); for (unsigned i = 0; i < nodes.size(); i++) { s += nodes[i]; @@ -35,7 +36,7 @@ print_members(const std::vector &nodes) { } -bool isamember(const std::string & m, const std::vector & nodes) { +bool isamember(const string & m, const vector & nodes) { for (auto n : nodes) { if (n == m) return 1; @@ -51,8 +52,7 @@ bool proposer::isrunning() { } // check if the servers in l2 contains a majority of servers in l1 -bool proposer::majority(const std::vector &l1, - const std::vector &l2) { +bool proposer::majority(const vector &l1, const vector &l2) { unsigned n = 0; for (unsigned i = 0; i < l1.size(); i++) { @@ -62,8 +62,7 @@ bool proposer::majority(const std::vector &l1, return n >= (l1.size() >> 1) + 1; } -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - const std::string &_me) +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me) : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { @@ -76,19 +75,17 @@ void proposer::setn() my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; } -bool proposer::run(unsigned instance, const std::vector & cur_nodes, - const std::string & newv) +bool proposer::run(unsigned instance, const vector & cur_nodes, const string & newv) { - std::vector accepts; - std::vector nodes; - std::string v; + vector accepts; + vector nodes; + string v; bool r = false; lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable); if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); + LOG("proposer::run: already running"); return false; } stable = false; @@ -98,7 +95,7 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes if (prepare(instance, accepts, cur_nodes, v)) { if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); + LOG("paxos::manager: received a majority of prepare responses"); if (v.size() == 0) v = newv; @@ -110,20 +107,20 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes accept(instance, accepts, nodes, v); if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); + LOG("paxos::manager: received a majority of accept responses"); breakpoint2(); decide(instance, accepts, v); r = true; } else { - tprintf("paxos::manager: no majority of accept responses\n"); + LOG("paxos::manager: no majority of accept responses"); } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + LOG("paxos::manager: no majority of prepare responses"); } } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); + LOG("paxos::manager: prepare is rejected " << stable); } stable = true; return r; @@ -135,9 +132,9 @@ bool proposer::run(unsigned instance, const std::vector & cur_nodes // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector & accepts, - const std::vector & nodes, - std::string & v) +proposer::prepare(unsigned instance, vector & accepts, + const vector & nodes, + string & v) { struct paxos_protocol::preparearg arg = { instance, my_n }; struct paxos_protocol::prepareres res; @@ -150,14 +147,14 @@ proposer::prepare(unsigned instance, std::vector & accepts, int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); if (status == paxos_protocol::OK) { if (res.oldinstance) { - tprintf("commiting old instance!\n"); + LOG("commiting old instance!"); acc->commit(instance, res.v_a); return false; } if (res.accept) { accepts.push_back(i); if (res.n_a >= n_a) { - tprintf("found a newer accepted proposal\n"); + LOG("found a newer accepted proposal"); v = res.v_a; n_a = res.n_a; } @@ -170,8 +167,8 @@ proposer::prepare(unsigned instance, std::vector & accepts, // run() calls this to send out accept RPCs to accepts. // fill in accepts with list of nodes that accepted. void -proposer::accept(unsigned instance, std::vector & accepts, - const std::vector & nodes, const std::string & v) +proposer::accept(unsigned instance, vector & accepts, + const vector & nodes, const string & v) { struct paxos_protocol::acceptarg arg = { instance, my_n, v }; rpcc *r; @@ -187,8 +184,8 @@ proposer::accept(unsigned instance, std::vector & accepts, } void -proposer::decide(unsigned instance, const std::vector & accepts, - const std::string & v) +proposer::decide(unsigned instance, const vector & accepts, + const string & v) { struct paxos_protocol::decidearg arg = { instance, v }; rpcc *r; @@ -201,8 +198,8 @@ proposer::decide(unsigned instance, const std::vector & accepts, } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me, - const std::string & _value) +acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me, + const string & _value) : cfg(_cfg), me (_me), instance_h(0) { n_h.n = 0; @@ -219,14 +216,14 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _m instance_h = 1; } - pxs = new rpcs((uint32_t)std::stoi(_me)); + pxs = new rpcs((uint32_t)stoi(_me)); pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, +acceptor::preparereq(paxos_protocol::prepareres & r, const string &, paxos_protocol::preparearg a) { lock ml(pxs_mutex); @@ -242,13 +239,13 @@ acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, l->logprop(n_h); r.accept = true; } else { - tprintf("I totally rejected this request. Ha.\n"); + LOG("I totally rejected this request. Ha."); } return paxos_protocol::OK; } paxos_protocol::status -acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) +acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a) { lock ml(pxs_mutex); r = false; @@ -263,11 +260,10 @@ acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) // the src argument is only for debugging paxos_protocol::status -acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) +acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a) { lock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); + LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a); if (a.instance == instance_h + 1) { VERIFY(v_a == a.v); commit(a.instance, v_a, ml); @@ -281,11 +277,11 @@ acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) } void -acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock) +acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock) { - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + LOG("acceptor::commit: instance=" << instance << " has v=" << value); if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); + LOG("commit: highestaccepteinstance = " << instance); values[instance] = value; l->loginstance(instance, value); instance_h = instance; @@ -303,20 +299,20 @@ acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_ } void -acceptor::commit(unsigned instance, const std::string & value) +acceptor::commit(unsigned instance, const string & value) { lock ml(pxs_mutex); commit(instance, value, ml); } -std::string +string acceptor::dump() { return l->dump(); } void -acceptor::restore(const std::string & s) +acceptor::restore(const string & s) { l->restore(s); l->logread(); @@ -331,7 +327,7 @@ void proposer::breakpoint1() { if (break1) { - tprintf("Dying at breakpoint 1!\n"); + LOG("Dying at breakpoint 1!"); exit(1); } } @@ -341,7 +337,7 @@ void proposer::breakpoint2() { if (break2) { - tprintf("Dying at breakpoint 2!\n"); + LOG("Dying at breakpoint 2!"); exit(1); } } @@ -350,10 +346,10 @@ void proposer::breakpoint(int b) { if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); + LOG("Proposer: breakpoint 1"); break1 = true; } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); + LOG("Proposer: breakpoint 2"); break2 = true; } }