Rewrote threaded log code to be more idiomatic.
[invirt/third/libt4.git] / paxos.cc
index b39fa5b..cb32e36 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,13 +1,15 @@
 #include "paxos.h"
 #include "handle.h"
 
+paxos_change::~paxos_change() {}
+
 bool isamember(const node_t & m, const nodes_t & nodes) {
     return find(nodes.begin(), nodes.end(), m) != nodes.end();
 }
 
 // check if l2 contains a majority of the elements of l1
-bool majority(const nodes_t &l1, const nodes_t &l2) {
-    auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+    auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
     return overlap >= (l1.size() >> 1) + 1;
 }
 
@@ -20,7 +22,7 @@ bool majority(const nodes_t &l1, const nodes_t &l2) {
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
         bool _first, const node_t & _me, const value_t & _value)
     : delegate(_delegate), me (_me)
 {
@@ -39,20 +41,23 @@ proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
 {
     lock ml(proposer_mutex);
-    LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
+    LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
     if (!stable) {  // already running proposer?
-        LOG("paxos proposer already running");
+        LOG << "paxos proposer already running";
         return false;
     }
     stable = false;
     bool r = false;
     proposal.n = max(promise.n, proposal.n) + 1;
     nodes_t accepts;
-    value_t v = newv;
+    value_t v;
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            LOG("received a majority of prepare responses");
+            LOG << "received a majority of prepare responses";
+
+            if (!v.size())
+                v = newv;
 
             breakpoint1();
 
@@ -61,20 +66,20 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                LOG("received a majority of accept responses");
+                LOG << "received a majority of accept responses";
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                LOG("no majority of accept responses");
+                LOG << "no majority of accept responses";
             }
         } else {
-            LOG("no majority of prepare responses");
+            LOG << "no majority of prepare responses";
         }
     } else {
-        LOG("prepare is rejected " << stable);
+        LOG << "prepare is rejected " << stable;
     }
     stable = true;
     return r;
@@ -82,7 +87,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
 
 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         const nodes_t & nodes, value_t & v) {
-    LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
+    LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
     prepareres res;
     prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
@@ -91,19 +96,19 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
-                paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal);
+                paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
-                LOG("commiting old instance!");
+                LOG << "commiting old instance!";
                 commit(instance, res.v_a);
                 return false;
             }
-            LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
-                    "v_a=\"" << res.v_a << "\"");
+            LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
+                << "v_a=\"" << res.v_a << "\"";
             if (res.accept) {
                 accepts.push_back(i);
                 if (res.n_a >= highest_n_a) {
-                    LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
+                    LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
                     v = res.v_a;
                     highest_n_a = res.n_a;
                 }
@@ -122,7 +127,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
             continue;
         bool accept = false;
         int status = r->call_timeout(
-                paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v);
+                paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
@@ -135,32 +140,32 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
         if (!r)
             continue;
         int res = 0;
-        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
+        r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
     }
 }
 
 paxos_protocol::status
 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
-    LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
+    LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
     lock ml(acceptor_mutex);
     r.oldinstance = false;
     r.accept = false;
     r.n_a = accepted;
     r.v_a = accepted_value;
     if (instance <= instance_h) {
-        LOG("old instance " << instance << " has value " << values[instance]);
+        LOG << "old instance " << instance << " has value " << values[instance];
         r.oldinstance = true;
         r.v_a = values[instance];
     } else if (n > promise) {
-        LOG("looks good to me");
+        LOG << "looks good to me";
         promise = n;
         l.logprop(promise);
         r.accept = true;
     } else {
-        LOG("I totally rejected this request.  Ha.");
+        LOG << "I totally rejected this request.  Ha.";
     }
-    LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
-        "v_a=\"" << r.v_a << "\"");
+    LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
+        << "v_a=\"" << r.v_a << "\"";
     return paxos_protocol::OK;
 }
 
@@ -184,7 +189,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
 paxos_protocol::status
 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
     lock ml(acceptor_mutex);
-    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
+    LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
     if (instance == instance_h + 1) {
         VERIFY(accepted_value == v);
         commit(instance, accepted_value, ml);
@@ -203,17 +208,18 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value) {
 }
 
 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
-    LOG("acceptor::commit: instance=" << instance << " has v=" << value);
+    LOG << "instance=" << instance << " has v=" << value;
     if (instance > instance_h) {
-        LOG("commit: highestacceptedinstance = " << instance);
+        LOG << "highestacceptedinstance = " << instance;
         values[instance] = value;
         l.loginstance(instance, value);
         instance_h = instance;
         accepted = promise = {0, me};
-        accepted_value.clear();
+        string v = value; // gaaahhh aliasing of value and accepted_value
+        accepted_value.clear(); // this wipes out "value", too
         if (delegate) {
             pxs_mutex_lock.unlock();
-            delegate->paxos_commit(instance, value);
+            delegate->paxos_commit(instance, v);
             pxs_mutex_lock.lock();
         }
     }
@@ -222,24 +228,24 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock &
 // For testing purposes
 void proposer_acceptor::breakpoint1() {
     if (break1) {
-        LOG("Dying at breakpoint 1!");
+        LOG << "Dying at breakpoint 1!";
         exit(1);
     }
 }
 
 void proposer_acceptor::breakpoint2() {
     if (break2) {
-        LOG("Dying at breakpoint 2!");
+        LOG << "Dying at breakpoint 2!";
         exit(1);
     }
 }
 
 void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
-        LOG("Proposer: breakpoint 1");
+        LOG << "breakpoint 1";
         break1 = true;
     } else if (b == 4) {
-        LOG("Proposer: breakpoint 2");
+        LOG << "breakpoint 2";
         break2 = true;
     }
 }