Actually, you know, build.
[invirt/third/libt4.git] / paxos.cc
index c3a445f..b0ec640 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,9 +1,9 @@
 #include "paxos.h"
 #include "handle.h"
-// #include <signal.h>
 #include <stdio.h>
 #include "tprintf.h"
 #include "lang/verify.h"
+#include "lock.h"
 
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
@@ -52,14 +52,14 @@ bool
 proposer::isrunning()
 {
   bool r;
-  ScopedLock ml(pxs_mutex);
+  lock ml(pxs_mutex);
   r = !stable;
   return r;
 }
 
 // check if the servers in l2 contains a majority of servers in l1
 bool
-proposer::majority(const std::vector<std::string> &l1, 
+proposer::majority(const std::vector<std::string> &l1,
                const std::vector<std::string> &l2)
 {
   unsigned n = 0;
@@ -71,9 +71,9 @@ proposer::majority(const std::vector<std::string> &l1,
   return n >= (l1.size() >> 1) + 1;
 }
 
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, 
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
                   std::string _me)
-  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), 
+  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
   my_n.n = 0;
@@ -94,7 +94,7 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
   std::string v;
   bool r = false;
 
-  ScopedLock ml(pxs_mutex);
+  lock ml(pxs_mutex);
   tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
         print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
   if (!stable) {  // already running proposer?
@@ -145,7 +145,7 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts, 
+proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
          std::vector<std::string> nodes,
          std::string &v)
 {
@@ -157,7 +157,7 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
         handle h(*i);
         if (!(r = h.safebind()))
             continue;
-        int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
+        int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 tprintf("commiting old instance!\n");
@@ -190,7 +190,7 @@ proposer::accept(unsigned instance, std::vector<std::string> &accepts,
         if (!(r = h.safebind()))
             continue;
         bool accept = false;
-        int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
+        int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
         if (status == paxos_protocol::OK) {
             if (accept)
                 accepts.push_back(*i);
@@ -199,7 +199,7 @@ proposer::accept(unsigned instance, std::vector<std::string> &accepts,
 }
 
 void
-proposer::decide(unsigned instance, std::vector<std::string> accepts, 
+proposer::decide(unsigned instance, std::vector<std::string> accepts,
              std::string v)
 {
     struct paxos_protocol::decidearg arg = { instance, v };
@@ -209,11 +209,11 @@ proposer::decide(unsigned instance, std::vector<std::string> accepts,
         if (!(r = h.safebind()))
             continue;
         int res = 0;
-        r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
+        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, 
+acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
             std::string _value)
   : cfg(_cfg), me (_me), instance_h(0)
 {
@@ -232,16 +232,15 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
   }
 
   pxs = new rpcs(atoi(_me.c_str()));
-  pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
-  pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
-  pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+  pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+  pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+  pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
 }
 
 paxos_protocol::status
-acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
-    paxos_protocol::prepareres &r)
+acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
 {
-    ScopedLock ml(pxs_mutex);
+    lock ml(pxs_mutex);
     r.oldinstance = false;
     r.accept = false;
     r.n_a = n_a;
@@ -260,9 +259,9 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
 }
 
 paxos_protocol::status
-acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
+acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
 {
-    ScopedLock ml(pxs_mutex);
+    lock ml(pxs_mutex);
     r = false;
     if (a.n >= n_h) {
         n_a = a.n;
@@ -274,52 +273,53 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
 }
 
 // the src argument is only for debug purpose
-paxos_protocol::status
-acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
+    paxos_protocol::status
+acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
 {
-  ScopedLock ml(pxs_mutex);
-  tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
-        a.instance, instance_h, v_a.c_str());
-  if (a.instance == instance_h + 1) {
-    VERIFY(v_a == a.v);
-    commit_wo(a.instance, v_a);
-  } else if (a.instance <= instance_h) {
-    // we are ahead ignore.
-  } else {
-    // we are behind
-    VERIFY(0);
-  }
-  return paxos_protocol::OK;
+    lock ml(pxs_mutex);
+    tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
+            a.instance, instance_h, v_a.c_str());
+    if (a.instance == instance_h + 1) {
+        VERIFY(v_a == a.v);
+        commit_wo(a.instance, v_a);
+    } else if (a.instance <= instance_h) {
+        // we are ahead ignore.
+    } else {
+        // we are behind
+        VERIFY(0);
+    }
+    return paxos_protocol::OK;
 }
 
 void
 acceptor::commit_wo(unsigned instance, std::string value)
 {
-  //assume pxs_mutex is held
-  tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
-  if (instance > instance_h) {
-    tprintf("commit: highestaccepteinstance = %d\n", instance);
-    values[instance] = value;
-    l->loginstance(instance, value);
-    instance_h = instance;
-    n_h.n = 0;
-    n_h.m = me;
-    n_a.n = 0;
-    n_a.m = me;
-    v_a.clear();
-    if (cfg) {
-      pxs_mutex.release();
-      cfg->paxos_commit(instance, value);
-      pxs_mutex.acquire();
+    //assume pxs_mutex is held
+    adopt_lock ml(pxs_mutex);
+    tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+    if (instance > instance_h) {
+        tprintf("commit: highestaccepteinstance = %d\n", instance);
+        values[instance] = value;
+        l->loginstance(instance, value);
+        instance_h = instance;
+        n_h.n = 0;
+        n_h.m = me;
+        n_a.n = 0;
+        n_a.m = me;
+        v_a.clear();
+        if (cfg) {
+            ml.unlock();
+            cfg->paxos_commit(instance, value);
+            ml.lock();
+        }
     }
-  }
 }
 
 void
 acceptor::commit(unsigned instance, std::string value)
 {
-  ScopedLock ml(pxs_mutex);
-  commit_wo(instance, value);
+    lock ml(pxs_mutex);
+    commit_wo(instance, value);
 }
 
 std::string