X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/02967a43024ce81912cd1ec96a800397457f8066..refs/heads/iannucci:/paxos.cc?ds=inline diff --git a/paxos.cc b/paxos.cc index c7f2d1d..108dfad 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,6 +1,7 @@ -#include "paxos.h" +#include "include/paxos.h" using namespace std::placeholders; +using namespace std::chrono; paxos_change::~paxos_change() {} @@ -28,12 +29,24 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool _first, const node_t & _me, const value_t & _value) : delegate(_delegate), me (_me) { - // at this point, the log has already been replayed - if (instance_h == 0 && _first) { - values[1] = _value; - l.loginstance(1, _value); - instance_h = 1; - } + l.handler([this] (auto entry) { + instance_h = entry.number; + values[entry.number] = entry.value; + accepted = promise = {0, me}; + accepted_value.clear(); + }); + l.handler([this] (auto entry) { + promise = entry.promise; + }); + l.handler([this] (auto entry) { + accepted = entry.number; + accepted_value = entry.value; + }); + + if (instance_h == 0 && _first) + l.append(log_instance{1, _value}); + + l.replay(); pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this); pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this); @@ -96,7 +109,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, auto cl = rpcc::bind_cached(i); if (!cl) continue; - int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100), + int status = cl->call_timeout(paxos_protocol::preparereq, 100ms, res, me, instance, proposal); if (status == paxos_protocol::OK) { LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n @@ -124,7 +137,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, bool accept = false; for (auto i : nodes) { if (auto cl = rpcc::bind_cached(i)) { - int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100), + int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms, accept, me, instance, proposal, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); @@ -136,7 +149,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const int res = 0; for (auto i : accepts) if (auto cl = rpcc::bind_cached(i)) - cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v); + cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v); } paxos_protocol::status @@ -149,7 +162,7 @@ proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, } else if (n > promise) { LOG << "looks good to me"; promise = n; - l.logprop(promise); + l.append(log_proposal{n}); r = prepareres{prepareres::accept, accepted, accepted_value}; } else { LOG << "I totally rejected this request. Ha."; @@ -169,7 +182,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t if (n >= promise) { accepted = n; accepted_value = v; - l.logaccept(accepted, accepted_value); + l.append(log_accept{n, v}); r = true; } return paxos_protocol::OK; @@ -197,9 +210,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & LOG << "instance=" << instance << " has v=" << value; if (instance > instance_h) { LOG << "highestacceptedinstance = " << instance; - values[instance] = value; - l.loginstance(instance, value); + l.append(log_instance{instance, value}); instance_h = instance; + values[instance] = value; accepted = promise = {0, me}; accepted_value.clear(); if (delegate) {