X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..dfe8486473094c0769fd1922329c3f0dfd8f43c0:/paxos.cc diff --git a/paxos.cc b/paxos.cc index c3a445f..89f1714 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,9 +1,9 @@ #include "paxos.h" #include "handle.h" -// #include #include #include "tprintf.h" #include "lang/verify.h" +#include "lock.h" // This module implements the proposer and acceptor of the Paxos // distributed algorithm as described by Lamport's "Paxos Made @@ -52,7 +52,7 @@ bool proposer::isrunning() { bool r; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = !stable; return r; } @@ -94,7 +94,7 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv std::string v; bool r = false; - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); if (!stable) { // already running proposer? @@ -241,7 +241,7 @@ paxos_protocol::status acceptor::preparereq(std::string src, paxos_protocol::preparearg a, paxos_protocol::prepareres &r) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r.oldinstance = false; r.accept = false; r.n_a = n_a; @@ -262,7 +262,7 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a, paxos_protocol::status acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) { - ScopedLock ml(pxs_mutex); + lock ml(pxs_mutex); r = false; if (a.n >= n_h) { n_a = a.n; @@ -274,52 +274,53 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) } // the src argument is only for debug purpose -paxos_protocol::status + paxos_protocol::status acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) { - ScopedLock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", - a.instance, instance_h, v_a.c_str()); - if (a.instance == instance_h + 1) { - VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); - } else if (a.instance <= instance_h) { - // we are ahead ignore. - } else { - // we are behind - VERIFY(0); - } - return paxos_protocol::OK; + lock ml(pxs_mutex); + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + a.instance, instance_h, v_a.c_str()); + if (a.instance == instance_h + 1) { + VERIFY(v_a == a.v); + commit_wo(a.instance, v_a); + } else if (a.instance <= instance_h) { + // we are ahead ignore. + } else { + // we are behind + VERIFY(0); + } + return paxos_protocol::OK; } void acceptor::commit_wo(unsigned instance, std::string value) { - //assume pxs_mutex is held - tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); - if (instance > instance_h) { - tprintf("commit: highestaccepteinstance = %d\n", instance); - values[instance] = value; - l->loginstance(instance, value); - instance_h = instance; - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - if (cfg) { - pxs_mutex.release(); - cfg->paxos_commit(instance, value); - pxs_mutex.acquire(); + //assume pxs_mutex is held + adopt_lock ml(pxs_mutex); + tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); + if (instance > instance_h) { + tprintf("commit: highestaccepteinstance = %d\n", instance); + values[instance] = value; + l->loginstance(instance, value); + instance_h = instance; + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + if (cfg) { + ml.unlock(); + cfg->paxos_commit(instance, value); + ml.lock(); + } } - } } void acceptor::commit(unsigned instance, std::string value) { - ScopedLock ml(pxs_mutex); - commit_wo(instance, value); + lock ml(pxs_mutex); + commit_wo(instance, value); } std::string