-#include "paxos.h"
-#include "handle.h"
+#include "include/paxos.h"
+
+using namespace std::placeholders;
+using namespace std::chrono;
+
+paxos_change::~paxos_change() {}
bool isamember(const node_t & m, const nodes_t & nodes) {
- return find(nodes.begin(), nodes.end(), m) != nodes.end();
+ return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
}
// check if l2 contains a majority of the elements of l1
-bool majority(const nodes_t &l1, const nodes_t &l2) {
- auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+ auto overlap = (size_t)std::count_if(
+ l1.begin(), l1.end(), std::bind(isamember, _1, l2));
return overlap >= (l1.size() >> 1) + 1;
}
bool _first, const node_t & _me, const value_t & _value)
: delegate(_delegate), me (_me)
{
- // at this point, the log has already been replayed
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l.loginstance(1, _value);
- instance_h = 1;
- }
+ l.handler<log_instance>([this] (auto entry) {
+ instance_h = entry.number;
+ values[entry.number] = entry.value;
+ accepted = promise = {0, me};
+ accepted_value.clear();
+ });
+ l.handler<log_proposal>([this] (auto entry) {
+ promise = entry.promise;
+ });
+ l.handler<log_accept>([this] (auto entry) {
+ accepted = entry.number;
+ accepted_value = entry.value;
+ });
+
+ if (instance_h == 0 && _first)
+ l.append(log_instance{1, _value});
+
+ l.replay();
pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
lock ml(proposer_mutex);
- LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
+ LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
if (!stable) { // already running proposer?
- LOG("paxos proposer already running");
+ LOG << "paxos proposer already running";
return false;
}
stable = false;
bool r = false;
- proposal.n = max(promise.n, proposal.n) + 1;
+ proposal.n = std::max(promise.n, proposal.n) + 1;
nodes_t accepts;
- value_t v = newv;
+ value_t v;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- LOG("received a majority of prepare responses");
+ LOG << "received a majority of prepare responses";
+
+ if (!v.size())
+ v = newv;
breakpoint1();
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- LOG("received a majority of accept responses");
+ LOG << "received a majority of accept responses";
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- LOG("no majority of accept responses");
+ LOG << "no majority of accept responses";
}
} else {
- LOG("no majority of prepare responses");
+ LOG << "no majority of prepare responses";
}
} else {
- LOG("prepare is rejected " << stable);
+ LOG << "prepare is rejected " << stable;
}
stable = true;
return r;
bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, value_t & v) {
- LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
+ LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
- handle h(i);
- rpcc *r = h.safebind();
- if (!r)
+ auto cl = rpcc::bind_cached(i);
+ if (!cl)
continue;
- auto status = (paxos_protocol::status)r->call_timeout(
- paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal);
+ int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
+ res, me, instance, proposal);
if (status == paxos_protocol::OK) {
- if (res.oldinstance) {
- LOG("commiting old instance!");
- commit(instance, res.v_a);
+ LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
+ << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\"";
+ if (res.type == prepareres::oldinstance) {
+ LOG << "commiting old instance!";
+ lock ml(acceptor_mutex);
+ commit(instance, res.v_a, ml);
return false;
- }
- LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
- "v_a=\"" << res.v_a << "\"");
- if (res.accept) {
+ } else if (res.type == prepareres::accept) {
accepts.push_back(i);
if (res.n_a >= highest_n_a) {
- LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
+ LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
v = res.v_a;
highest_n_a = res.n_a;
}
void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, const value_t & v) {
+ bool accept = false;
for (auto i : nodes) {
- handle h(i);
- rpcc *r = h.safebind();
- if (!r)
- continue;
- bool accept = false;
- int status = r->call_timeout(
- paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v);
- if (status == paxos_protocol::OK && accept)
- accepts.push_back(i);
+ if (auto cl = rpcc::bind_cached(i)) {
+ int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
+ accept, me, instance, proposal, v);
+ if (status == paxos_protocol::OK && accept)
+ accepts.push_back(i);
+ }
}
}
void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
- for (auto i : accepts) {
- handle h(i);
- rpcc *r = h.safebind();
- if (!r)
- continue;
- int res = 0;
- r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
- }
+ int res = 0;
+ for (auto i : accepts)
+ if (auto cl = rpcc::bind_cached(i))
+ cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
}
paxos_protocol::status
proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
- LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
+ LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
lock ml(acceptor_mutex);
- r.oldinstance = false;
- r.accept = false;
- r.n_a = accepted;
- r.v_a = accepted_value;
if (instance <= instance_h) {
- LOG("old instance " << instance << " has value " << values[instance]);
- r.oldinstance = true;
- r.v_a = values[instance];
+ LOG << "old instance " << instance << " has value " << values[instance];
+ r = prepareres{prepareres::oldinstance, accepted, values[instance]};
} else if (n > promise) {
- LOG("looks good to me");
+ LOG << "looks good to me";
promise = n;
- l.logprop(promise);
- r.accept = true;
+ l.append(log_proposal{n});
+ r = prepareres{prepareres::accept, accepted, accepted_value};
} else {
- LOG("I totally rejected this request. Ha.");
+ LOG << "I totally rejected this request. Ha.";
+ r = prepareres{prepareres::reject, accepted, accepted_value};
}
- LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
- "v_a=\"" << r.v_a << "\"");
+ LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept
+ << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\"";
return paxos_protocol::OK;
}
proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
lock ml(acceptor_mutex);
r = false;
- if (instance == instance_h + 1) {
- if (n >= promise) {
- accepted = n;
- accepted_value = v;
- l.logaccept(accepted, accepted_value);
- r = true;
- }
- return paxos_protocol::OK;
- } else {
+ if (instance != instance_h + 1)
return paxos_protocol::ERR;
+ if (n >= promise) {
+ accepted = n;
+ accepted_value = v;
+ l.append(log_accept{n, v});
+ r = true;
}
+ return paxos_protocol::OK;
}
paxos_protocol::status
proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
lock ml(acceptor_mutex);
- LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
+ LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
if (instance == instance_h + 1) {
VERIFY(accepted_value == v);
- commit(instance, accepted_value, ml);
+ commit(instance, v, ml);
} else if (instance <= instance_h) {
// we are ahead; ignore.
} else {
return paxos_protocol::OK;
}
-void proposer_acceptor::commit(unsigned instance, const value_t & value) {
- lock ml(acceptor_mutex);
- commit(instance, value, ml);
-}
-
-void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
- LOG("acceptor::commit: instance=" << instance << " has v=" << value);
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) {
+ VERIFY(&value != &accepted_value); // eited by aliasing?
+ VERIFY(acceptor_mutex_lock);
+ LOG << "instance=" << instance << " has v=" << value;
if (instance > instance_h) {
- LOG("commit: highestacceptedinstance = " << instance);
- values[instance] = value;
- l.loginstance(instance, value);
+ LOG << "highestacceptedinstance = " << instance;
+ l.append(log_instance{instance, value});
instance_h = instance;
+ values[instance] = value;
accepted = promise = {0, me};
accepted_value.clear();
if (delegate) {
- pxs_mutex_lock.unlock();
+ acceptor_mutex_lock.unlock();
delegate->paxos_commit(instance, value);
- pxs_mutex_lock.lock();
+ acceptor_mutex_lock.lock();
}
}
}
// For testing purposes
void proposer_acceptor::breakpoint1() {
if (break1) {
- LOG("Dying at breakpoint 1!");
+ LOG << "Dying at breakpoint 1!";
exit(1);
}
}
void proposer_acceptor::breakpoint2() {
if (break2) {
- LOG("Dying at breakpoint 2!");
+ LOG << "Dying at breakpoint 2!";
exit(1);
}
}
void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
- LOG("Proposer: breakpoint 1");
+ LOG << "breakpoint 1";
break1 = true;
} else if (b == 4) {
- LOG("Proposer: breakpoint 2");
+ LOG << "breakpoint 2";
break2 = true;
}
}