So many changes. Broken.
[invirt/third/libt4.git] / paxos.cc
index c7f2d1d..108dfad 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,6 +1,7 @@
-#include "paxos.h"
+#include "include/paxos.h"
 
 using namespace std::placeholders;
+using namespace std::chrono;
 
 paxos_change::~paxos_change() {}
 
@@ -28,12 +29,24 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
         bool _first, const node_t & _me, const value_t & _value)
     : delegate(_delegate), me (_me)
 {
-    // at this point, the log has already been replayed
-    if (instance_h == 0 && _first) {
-        values[1] = _value;
-        l.loginstance(1, _value);
-        instance_h = 1;
-    }
+    l.handler<log_instance>([this] (auto entry) {
+        instance_h = entry.number;
+        values[entry.number] = entry.value;
+        accepted = promise = {0, me};
+        accepted_value.clear();
+    });
+    l.handler<log_proposal>([this] (auto entry) {
+        promise = entry.promise;
+    });
+    l.handler<log_accept>([this] (auto entry) {
+        accepted = entry.number;
+        accepted_value = entry.value;
+    });
+
+    if (instance_h == 0 && _first)
+        l.append(log_instance{1, _value});
+
+    l.replay();
 
     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
@@ -96,7 +109,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         auto cl = rpcc::bind_cached(i);
         if (!cl)
             continue;
-        int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100),
+        int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
                 res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
@@ -124,7 +137,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
     bool accept = false;
     for (auto i : nodes) {
         if (auto cl = rpcc::bind_cached(i)) {
-            int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100),
+            int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
                     accept, me, instance, proposal, v);
             if (status == paxos_protocol::OK && accept)
                 accepts.push_back(i);
@@ -136,7 +149,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
     int res = 0;
     for (auto i : accepts)
         if (auto cl = rpcc::bind_cached(i))
-            cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
+            cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
 }
 
 paxos_protocol::status
@@ -149,7 +162,7 @@ proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance,
     } else if (n > promise) {
         LOG << "looks good to me";
         promise = n;
-        l.logprop(promise);
+        l.append(log_proposal{n});
         r = prepareres{prepareres::accept, accepted, accepted_value};
     } else {
         LOG << "I totally rejected this request.  Ha.";
@@ -169,7 +182,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
     if (n >= promise) {
         accepted = n;
         accepted_value = v;
-        l.logaccept(accepted, accepted_value);
+        l.append(log_accept{n, v});
         r = true;
     }
     return paxos_protocol::OK;
@@ -197,9 +210,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock &
     LOG << "instance=" << instance << " has v=" << value;
     if (instance > instance_h) {
         LOG << "highestacceptedinstance = " << instance;
-        values[instance] = value;
-        l.loginstance(instance, value);
+        l.append(log_instance{instance, value});
         instance_h = instance;
+        values[instance] = value;
         accepted = promise = {0, me};
         accepted_value.clear();
         if (delegate) {