X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..0989f6feac9c8e83847165c4abee5273463eaa63:/paxos.cc?ds=inline diff --git a/paxos.cc b/paxos.cc index c3a445f..b0ec640 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,9 +1,9 @@ #include "paxos.h" #include "handle.h" -// #include #include #include "tprintf.h" #include "lang/verify.h" +#include "lock.h" // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -52,14 +52,14 @@ bool proposer::isrunning() { bool r; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = !stable; return r; } // check if the servers in l2 contains a majority of servers in l1 bool -proposer::majority(const std::vector &l1, +proposer::majority(const std::vector &l1, const std::vector &l2) { unsigned n = 0; @@ -71,9 +71,9 @@ proposer::majority(const std::vector &l1, return n >= (l1.size() >> 1) + 1; } -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, std::string _me) - : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), + : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { my_n.n = 0; @@ -94,7 +94,7 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv std::string v; bool r = false; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); if (!stable) { // already running proposer? @@ -145,7 +145,7 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector &accepts, +proposer::prepare(unsigned instance, std::vector &accepts, std::vector nodes, std::string &v) { @@ -157,7 +157,7 @@ proposer::prepare(unsigned instance, std::vector &accepts, handle h(*i); if (!(r = h.safebind())) continue; - int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000)); + int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); if (status == paxos_protocol::OK) { if (res.oldinstance) { tprintf("commiting old instance!\n"); @@ -190,7 +190,7 @@ proposer::accept(unsigned instance, std::vector &accepts, if (!(r = h.safebind())) continue; bool accept = false; - int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000)); + int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); if (status == paxos_protocol::OK) { if (accept) accepts.push_back(*i); @@ -199,7 +199,7 @@ proposer::accept(unsigned instance, std::vector &accepts, } void -proposer::decide(unsigned instance, std::vector accepts, +proposer::decide(unsigned instance, std::vector accepts, std::string v) { struct paxos_protocol::decidearg arg = { instance, v }; @@ -209,11 +209,11 @@ proposer::decide(unsigned instance, std::vector accepts, if (!(r = h.safebind())) continue; int res = 0; - r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000)); + r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg); } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, +acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, std::string _value) : cfg(_cfg), me (_me), instance_h(0) { @@ -232,16 +232,15 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, } pxs = new rpcs(atoi(_me.c_str())); - pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq); - pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq); - pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq); + pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); + pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); + pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(std::string src, paxos_protocol::preparearg a, - paxos_protocol::prepareres &r) +acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r.oldinstance = false; r.accept = false; r.n_a = n_a; @@ -260,9 +259,9 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a, } paxos_protocol::status -acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) +acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = false; if (a.n >= n_h) { n_a = a.n; @@ -274,52 +273,53 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) } // the src argument is only for debug purpose -paxos_protocol::status -acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) + paxos_protocol::status +acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) { - ScopedLock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); - } else if (a.instance <= instance_h) { - // we are ahead ignore. - } else { - // we are behind - VERIFY(0); - } - return paxos_protocol::OK; + lock ml(pxs_mutex); + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + a.instance, instance_h, v_a.c_str()); + if (a.instance == instance_h + 1) { + VERIFY(v_a == a.v); + commit_wo(a.instance, v_a); + } else if (a.instance <= instance_h) { + // we are ahead ignore. + } else { + // we are behind + VERIFY(0); + } + return paxos_protocol::OK; } void acceptor::commit_wo(unsigned instance, std::string value) { - //assume pxs_mutex is held - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); - if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); - values[instance] = value; - l->loginstance(instance, value); - instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - if (cfg) { - pxs_mutex.release(); - cfg->paxos_commit(instance, value); - pxs_mutex.acquire(); + //assume pxs_mutex is held + adopt_lock ml(pxs_mutex); + tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + if (instance > instance_h) { + tprintf("commit: highestaccepteinstance = %d\n", instance); + values[instance] = value; + l->loginstance(instance, value); + instance_h = instance; + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + if (cfg) { + ml.unlock(); + cfg->paxos_commit(instance, value); + ml.lock(); + } } - } } void acceptor::commit(unsigned instance, std::string value) { - ScopedLock ml(pxs_mutex); - commit_wo(instance, value); + lock ml(pxs_mutex); + commit_wo(instance, value); } std::string