Cleanups
[invirt/third/libt4.git] / paxos.cc
index b0ec640..83bf4f1 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-
-bool
-operator> (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m > b.m));
+bool operator> (const prop_t &a, const prop_t &b) {
+    return (a.n > b.n || (a.n == b.n && a.m > b.m));
 }
 
 }
 
-bool
-operator>= (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m >= b.m));
+bool operator>= (const prop_t &a, const prop_t &b) {
+    return (a.n > b.n || (a.n == b.n && a.m >= b.m));
 }
 
 std::string
 }
 
 std::string
-print_members(const std::vector<std::string> &nodes)
-{
-  std::string s;
-  s.clear();
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    s += nodes[i];
-    if (i < (nodes.size()-1))
-      s += ",";
-  }
-  return s;
+print_members(const std::vector<std::string> &nodes) {
+    std::string s;
+    s.clear();
+    for (unsigned i = 0; i < nodes.size(); i++) {
+        s += nodes[i];
+        if (i < (nodes.size()-1))
+            s += ",";
+    }
+    return s;
 }
 
 }
 
-bool isamember(std::string m, const std::vector<std::string> &nodes)
-{
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    if (nodes[i] == m) return 1;
-  }
-  return 0;
+
+bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+    for (auto n : nodes) {
+        if (n == m)
+            return 1;
+    }
+    return 0;
 }
 
 }
 
-bool
-proposer::isrunning()
-{
-  bool r;
-  lock ml(pxs_mutex);
-  r = !stable;
-  return r;
+bool proposer::isrunning() {
+    bool r;
+    lock ml(pxs_mutex);
+    r = !stable;
+    return r;
 }
 
 // check if the servers in l2 contains a majority of servers in l1
 }
 
 // check if the servers in l2 contains a majority of servers in l1
-bool
-proposer::majority(const std::vector<std::string> &l1,
-               const std::vector<std::string> &l2)
-{
-  unsigned n = 0;
+bool proposer::majority(const std::vector<std::string> &l1,
+               const std::vector<std::string> &l2) {
+    unsigned n = 0;
 
 
-  for (unsigned i = 0; i < l1.size(); i++) {
-    if (isamember(l1[i], l2))
-      n++;
-  }
-  return n >= (l1.size() >> 1) + 1;
+    for (unsigned i = 0; i < l1.size(); i++) {
+        if (isamember(l1[i], l2))
+            n++;
+    }
+    return n >= (l1.size() >> 1) + 1;
 }
 
 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
 }
 
 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
-                  std::string _me)
+        const std::string &_me)
   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
-  my_n.n = 0;
-  my_n.m = me;
+    my_n.n = 0;
+    my_n.m = me;
 }
 
 }
 
-void
-proposer::setn()
+void proposer::setn()
 {
 {
-  my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+    my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
 }
 
 }
 
-bool
-proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
+bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
+        const std::string & newv)
 {
 {
-  std::vector<std::string> accepts;
-  std::vector<std::string> nodes;
-  std::string v;
-  bool r = false;
-
-  lock ml(pxs_mutex);
-  tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
-        print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
-  if (!stable) {  // already running proposer?
-    tprintf("proposer::run: already running\n");
-    return false;
-  }
-  stable = false;
-  setn();
-  accepts.clear();
-  v.clear();
-  if (prepare(instance, accepts, cur_nodes, v)) {
-
-    if (majority(cur_nodes, accepts)) {
-      tprintf("paxos::manager: received a majority of prepare responses\n");
-
-      if (v.size() == 0)
-       v = newv;
-
-      breakpoint1();
-
-      nodes = accepts;
-      accepts.clear();
-      accept(instance, accepts, nodes, v);
-
-      if (majority(cur_nodes, accepts)) {
-       tprintf("paxos::manager: received a majority of accept responses\n");
-
-       breakpoint2();
-
-       decide(instance, accepts, v);
-       r = true;
-      } else {
-       tprintf("paxos::manager: no majority of accept responses\n");
-      }
+    std::vector<std::string> accepts;
+    std::vector<std::string> nodes;
+    std::string v;
+    bool r = false;
+
+    lock ml(pxs_mutex);
+    tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
+            print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+    if (!stable) {  // already running proposer?
+        tprintf("proposer::run: already running\n");
+        return false;
+    }
+    stable = false;
+    setn();
+    accepts.clear();
+    v.clear();
+    if (prepare(instance, accepts, cur_nodes, v)) {
+
+        if (majority(cur_nodes, accepts)) {
+            tprintf("paxos::manager: received a majority of prepare responses\n");
+
+            if (v.size() == 0)
+                v = newv;
+
+            breakpoint1();
+
+            nodes = accepts;
+            accepts.clear();
+            accept(instance, accepts, nodes, v);
+
+            if (majority(cur_nodes, accepts)) {
+                tprintf("paxos::manager: received a majority of accept responses\n");
+
+                breakpoint2();
+
+                decide(instance, accepts, v);
+                r = true;
+            } else {
+                tprintf("paxos::manager: no majority of accept responses\n");
+            }
+        } else {
+            tprintf("paxos::manager: no majority of prepare responses\n");
+        }
     } else {
     } else {
-      tprintf("paxos::manager: no majority of prepare responses\n");
+        tprintf("paxos::manager: prepare is rejected %d\n", stable);
     }
     }
-  } else {
-    tprintf("paxos::manager: prepare is rejected %d\n", stable);
-  }
-  stable = true;
-  return r;
+    stable = true;
+    return r;
 }
 
 // proposer::run() calls prepare to send prepare RPCs to nodes
 }
 
 // proposer::run() calls prepare to send prepare RPCs to nodes
@@ -145,16 +135,16 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
-         std::vector<std::string> nodes,
-         std::string &v)
+proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
+        const std::vector<std::string> & nodes,
+        std::string & v)
 {
     struct paxos_protocol::preparearg arg = { instance, my_n };
     struct paxos_protocol::prepareres res;
     prop_t n_a = { 0, "" };
     rpcc *r;
 {
     struct paxos_protocol::preparearg arg = { instance, my_n };
     struct paxos_protocol::prepareres res;
     prop_t n_a = { 0, "" };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
+    for (auto i : nodes) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
         if (!(r = h.safebind()))
             continue;
         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
@@ -165,7 +155,7 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
                 return false;
             }
             if (res.accept) {
                 return false;
             }
             if (res.accept) {
-                accepts.push_back(*i);
+                accepts.push_back(i);
                 if (res.n_a >= n_a) {
                     tprintf("found a newer accepted proposal\n");
                     v = res.v_a;
                 if (res.n_a >= n_a) {
                     tprintf("found a newer accepted proposal\n");
                     v = res.v_a;
@@ -180,32 +170,30 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
 // run() calls this to send out accept RPCs to accepts.
 // fill in accepts with list of nodes that accepted.
 void
 // run() calls this to send out accept RPCs to accepts.
 // fill in accepts with list of nodes that accepted.
 void
-proposer::accept(unsigned instance, std::vector<std::string> &accepts,
-        std::vector<std::string> nodes, std::string v)
+proposer::accept(unsigned instance, std::vector<std::string> & accepts,
+        const std::vector<std::string> & nodes, const std::string & v)
 {
     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
     rpcc *r;
 {
     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
+    for (auto i : nodes) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         bool accept = false;
         int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
         if (!(r = h.safebind()))
             continue;
         bool accept = false;
         int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
-        if (status == paxos_protocol::OK) {
-            if (accept)
-                accepts.push_back(*i);
-        }
+        if (status == paxos_protocol::OK && accept)
+            accepts.push_back(i);
     }
 }
 
 void
     }
 }
 
 void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
-             std::string v)
+proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
+        const std::string & v)
 {
     struct paxos_protocol::decidearg arg = { instance, v };
     rpcc *r;
 {
     struct paxos_protocol::decidearg arg = { instance, v };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
-        handle h(*i);
+    for (auto i : accepts) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         int res = 0;
         if (!(r = h.safebind()))
             continue;
         int res = 0;
@@ -213,32 +201,33 @@ proposer::decide(unsigned instance, std::vector<std::string> accepts,
     }
 }
 
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
-            std::string _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
+        const std::string & _value)
   : cfg(_cfg), me (_me), instance_h(0)
 {
   : cfg(_cfg), me (_me), instance_h(0)
 {
-  n_h.n = 0;
-  n_h.m = me;
-  n_a.n = 0;
-  n_a.m = me;
-  v_a.clear();
-
-  l = new log (this, me);
-
-  if (instance_h == 0 && _first) {
-    values[1] = _value;
-    l->loginstance(1, _value);
-    instance_h = 1;
-  }
-
-  pxs = new rpcs(atoi(_me.c_str()));
-  pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
-  pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
-  pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
+    n_h.n = 0;
+    n_h.m = me;
+    n_a.n = 0;
+    n_a.m = me;
+    v_a.clear();
+
+    l = new log (this, me);
+
+    if (instance_h == 0 && _first) {
+        values[1] = _value;
+        l->loginstance(1, _value);
+        instance_h = 1;
+    }
+
+    pxs = new rpcs((uint32_t)std::stoi(_me));
+    pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+    pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+    pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
 }
 
 paxos_protocol::status
 }
 
 paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
+acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+        paxos_protocol::preparearg a)
 {
     lock ml(pxs_mutex);
     r.oldinstance = false;
 {
     lock ml(pxs_mutex);
     r.oldinstance = false;
@@ -259,7 +248,7 @@ acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_proto
 }
 
 paxos_protocol::status
 }
 
 paxos_protocol::status
-acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
 {
     lock ml(pxs_mutex);
     r = false;
 {
     lock ml(pxs_mutex);
     r = false;
@@ -272,16 +261,16 @@ acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
     return paxos_protocol::OK;
 }
 
     return paxos_protocol::OK;
 }
 
-// the src argument is only for debug purpose
-    paxos_protocol::status
-acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
+// the src argument is only for debugging
+paxos_protocol::status
+acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
 {
     lock ml(pxs_mutex);
     tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
             a.instance, instance_h, v_a.c_str());
     if (a.instance == instance_h + 1) {
         VERIFY(v_a == a.v);
 {
     lock ml(pxs_mutex);
     tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
             a.instance, instance_h, v_a.c_str());
     if (a.instance == instance_h + 1) {
         VERIFY(v_a == a.v);
-        commit_wo(a.instance, v_a);
+        commit(a.instance, v_a, ml);
     } else if (a.instance <= instance_h) {
         // we are ahead ignore.
     } else {
     } else if (a.instance <= instance_h) {
         // we are ahead ignore.
     } else {
@@ -292,10 +281,8 @@ acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
 }
 
 void
 }
 
 void
-acceptor::commit_wo(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
 {
 {
-    //assume pxs_mutex is held
-    adopt_lock ml(pxs_mutex);
     tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
     if (instance > instance_h) {
         tprintf("commit: highestaccepteinstance = %d\n", instance);
     tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
     if (instance > instance_h) {
         tprintf("commit: highestaccepteinstance = %d\n", instance);
@@ -308,31 +295,31 @@ acceptor::commit_wo(unsigned instance, std::string value)
         n_a.m = me;
         v_a.clear();
         if (cfg) {
         n_a.m = me;
         v_a.clear();
         if (cfg) {
-            ml.unlock();
+            pxs_mutex_lock.unlock();
             cfg->paxos_commit(instance, value);
             cfg->paxos_commit(instance, value);
-            ml.lock();
+            pxs_mutex_lock.lock();
         }
     }
 }
 
 void
         }
     }
 }
 
 void
-acceptor::commit(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value)
 {
     lock ml(pxs_mutex);
 {
     lock ml(pxs_mutex);
-    commit_wo(instance, value);
+    commit(instance, value, ml);
 }
 
 std::string
 acceptor::dump()
 {
 }
 
 std::string
 acceptor::dump()
 {
-  return l->dump();
+    return l->dump();
 }
 
 void
 }
 
 void
-acceptor::restore(std::string s)
+acceptor::restore(const std::string & s)
 {
 {
-  l->restore(s);
-  l->logread();
+    l->restore(s);
+    l->logread();
 }
 
 
 }
 
 
@@ -343,30 +330,30 @@ acceptor::restore(std::string s)
 void
 proposer::breakpoint1()
 {
 void
 proposer::breakpoint1()
 {
-  if (break1) {
-    tprintf("Dying at breakpoint 1!\n");
-    exit(1);
-  }
+    if (break1) {
+        tprintf("Dying at breakpoint 1!\n");
+        exit(1);
+    }
 }
 
 // Call this from your code between phases accept and decide of proposer
 void
 proposer::breakpoint2()
 {
 }
 
 // Call this from your code between phases accept and decide of proposer
 void
 proposer::breakpoint2()
 {
-  if (break2) {
-    tprintf("Dying at breakpoint 2!\n");
-    exit(1);
-  }
+    if (break2) {
+        tprintf("Dying at breakpoint 2!\n");
+        exit(1);
+    }
 }
 
 void
 proposer::breakpoint(int b)
 {
 }
 
 void
 proposer::breakpoint(int b)
 {
-  if (b == 3) {
-    tprintf("Proposer: breakpoint 1\n");
-    break1 = true;
-  } else if (b == 4) {
-    tprintf("Proposer: breakpoint 2\n");
-    break2 = true;
-  }
+    if (b == 3) {
+        tprintf("Proposer: breakpoint 1\n");
+        break1 = true;
+    } else if (b == 4) {
+        tprintf("Proposer: breakpoint 2\n");
+        break2 = true;
+    }
 }
 }