#include "paxos.h"
#include "handle.h"
-string print_members(const nodes_t &nodes) {
- ostringstream ost;
- copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
- return ost.str();
-}
-
bool isamember(const node_t & m, const nodes_t & nodes) {
return find(nodes.begin(), nodes.end(), m) != nodes.end();
}
// check if l2 contains a majority of the elements of l1
-bool majority(const nodes_t &l1, const nodes_t &l2) {
- auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+ auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
return overlap >= (l1.size() >> 1) + 1;
}
// paxos_commit to inform higher layers of the agreed value for this
// instance.
-proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
bool _first, const node_t & _me, const value_t & _value)
: delegate(_delegate), me (_me)
{
bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
lock ml(proposer_mutex);
- LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
+ LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
if (!stable) { // already running proposer?
- LOG("proposer::run: already running");
+ LOG("paxos proposer already running");
return false;
}
stable = false;
bool r = false;
- my_n.n = std::max(n_h.n, my_n.n) + 1;
+ proposal.n = max(promise.n, proposal.n) + 1;
nodes_t accepts;
- value_t v = newv;
+ value_t v;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- LOG("paxos::run: received a majority of prepare responses");
+ LOG("received a majority of prepare responses");
+
+ if (!v.size())
+ v = newv;
breakpoint1();
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- LOG("paxos::run: received a majority of accept responses");
+ LOG("received a majority of accept responses");
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- LOG("paxos::run: no majority of accept responses");
+ LOG("no majority of accept responses");
}
} else {
- LOG("paxos::run: no majority of prepare responses");
+ LOG("no majority of prepare responses");
}
} else {
- LOG("paxos::run: prepare is rejected " << stable);
+ LOG("prepare is rejected " << stable);
}
stable = true;
return r;
bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, value_t & v) {
+ LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
if (!r)
continue;
auto status = (paxos_protocol::status)r->call_timeout(
- paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
+ paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
LOG("commiting old instance!");
commit(instance, res.v_a);
return false;
}
+ LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
+ "v_a=\"" << res.v_a << "\"");
if (res.accept) {
accepts.push_back(i);
if (res.n_a >= highest_n_a) {
- LOG("found a newer accepted proposal");
+ LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
v = res.v_a;
highest_n_a = res.n_a;
}
continue;
bool accept = false;
int status = r->call_timeout(
- paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
+ paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
if (status == paxos_protocol::OK && accept)
accepts.push_back(i);
}
if (!r)
continue;
int res = 0;
- r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
+ r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
}
}
paxos_protocol::status
proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+ LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
lock ml(acceptor_mutex);
r.oldinstance = false;
r.accept = false;
- r.n_a = n_a;
- r.v_a = v_a;
+ r.n_a = accepted;
+ r.v_a = accepted_value;
if (instance <= instance_h) {
+ LOG("old instance " << instance << " has value " << values[instance]);
r.oldinstance = true;
r.v_a = values[instance];
- } else if (n > n_h) {
- n_h = n;
- l.logprop(n_h);
+ } else if (n > promise) {
+ LOG("looks good to me");
+ promise = n;
+ l.logprop(promise);
r.accept = true;
} else {
LOG("I totally rejected this request. Ha.");
}
+ LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
+ "v_a=\"" << r.v_a << "\"");
return paxos_protocol::OK;
}
lock ml(acceptor_mutex);
r = false;
if (instance == instance_h + 1) {
- if (n >= n_h) {
- n_a = n;
- v_a = v;
- l.logaccept(n_a, v_a);
+ if (n >= promise) {
+ accepted = n;
+ accepted_value = v;
+ l.logaccept(accepted, accepted_value);
r = true;
}
return paxos_protocol::OK;
paxos_protocol::status
proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
lock ml(acceptor_mutex);
- LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
+ LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
if (instance == instance_h + 1) {
- VERIFY(v_a == v);
- commit(instance, v_a, ml);
+ VERIFY(accepted_value == v);
+ commit(instance, accepted_value, ml);
} else if (instance <= instance_h) {
// we are ahead; ignore.
} else {
}
void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
- LOG("acceptor::commit: instance=" << instance << " has v=" << value);
+ LOG("instance=" << instance << " has v=" << value);
if (instance > instance_h) {
- LOG("commit: highestacceptedinstance = " << instance);
+ LOG("highestacceptedinstance = " << instance);
values[instance] = value;
l.loginstance(instance, value);
instance_h = instance;
- n_a = n_h = {0, me};
- v_a.clear();
+ accepted = promise = {0, me};
+ string v = value; // gaaahhh aliasing of value and accepted_value
+ accepted_value.clear(); // this wipes out "value", too
if (delegate) {
pxs_mutex_lock.unlock();
- delegate->paxos_commit(instance, value);
+ delegate->paxos_commit(instance, v);
pxs_mutex_lock.lock();
}
}
void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
- LOG("Proposer: breakpoint 1");
+ LOG("breakpoint 1");
break1 = true;
} else if (b == 4) {
- LOG("Proposer: breakpoint 2");
+ LOG("breakpoint 2");
break2 = true;
}
}