prepareres res;
prop_t highest_n_a{0, ""};
for (auto i : nodes) {
- auto r = rpcc::bind_cached(i);
- if (!r)
+ auto cl = rpcc::bind_cached(i);
+ if (!cl)
continue;
- auto status = (paxos_protocol::status)r->call_timeout(
- paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
+ int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100),
+ res, me, instance, proposal);
if (status == paxos_protocol::OK) {
- if (res.oldinstance) {
+ LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
+ << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\"";
+ if (res.type == prepareres::oldinstance) {
LOG << "commiting old instance!";
- commit(instance, res.v_a);
+ lock ml(acceptor_mutex);
+ commit(instance, res.v_a, ml);
return false;
- }
- LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
- << "v_a=\"" << res.v_a << "\"";
- if (res.accept) {
+ } else if (res.type == prepareres::accept) {
accepts.push_back(i);
if (res.n_a >= highest_n_a) {
LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
const nodes_t & nodes, const value_t & v) {
+ bool accept = false;
for (auto i : nodes) {
- auto r = rpcc::bind_cached(i);
- if (!r)
- continue;
- bool accept = false;
- int status = r->call_timeout(
- paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
- if (status == paxos_protocol::OK && accept)
- accepts.push_back(i);
+ if (auto cl = rpcc::bind_cached(i)) {
+ int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100),
+ accept, me, instance, proposal, v);
+ if (status == paxos_protocol::OK && accept)
+ accepts.push_back(i);
+ }
}
}
void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
- for (auto i : accepts) {
- auto r = rpcc::bind_cached(i);
- if (!r)
- continue;
- int res = 0;
- r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
- }
+ int res = 0;
+ for (auto i : accepts)
+ if (auto cl = rpcc::bind_cached(i))
+ cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
}
paxos_protocol::status
proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
lock ml(acceptor_mutex);
- r.oldinstance = false;
- r.accept = false;
- r.n_a = accepted;
- r.v_a = accepted_value;
if (instance <= instance_h) {
LOG << "old instance " << instance << " has value " << values[instance];
- r.oldinstance = true;
- r.v_a = values[instance];
+ r = prepareres{prepareres::oldinstance, accepted, values[instance]};
} else if (n > promise) {
LOG << "looks good to me";
promise = n;
l.logprop(promise);
- r.accept = true;
+ r = prepareres{prepareres::accept, accepted, accepted_value};
} else {
LOG << "I totally rejected this request. Ha.";
+ r = prepareres{prepareres::reject, accepted, accepted_value};
}
- LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
- << "v_a=\"" << r.v_a << "\"";
+ LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept
+ << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\"";
return paxos_protocol::OK;
}
proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
lock ml(acceptor_mutex);
r = false;
- if (instance == instance_h + 1) {
- if (n >= promise) {
- accepted = n;
- accepted_value = v;
- l.logaccept(accepted, accepted_value);
- r = true;
- }
- return paxos_protocol::OK;
- } else {
+ if (instance != instance_h + 1)
return paxos_protocol::ERR;
+ if (n >= promise) {
+ accepted = n;
+ accepted_value = v;
+ l.logaccept(accepted, accepted_value);
+ r = true;
}
+ return paxos_protocol::OK;
}
paxos_protocol::status
LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
if (instance == instance_h + 1) {
VERIFY(accepted_value == v);
- commit(instance, accepted_value, ml);
+ commit(instance, v, ml);
} else if (instance <= instance_h) {
// we are ahead; ignore.
} else {
return paxos_protocol::OK;
}
-void proposer_acceptor::commit(unsigned instance, const value_t & value) {
- lock ml(acceptor_mutex);
- commit(instance, value, ml);
-}
-
-void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) {
+ VERIFY(&value != &accepted_value); // eited by aliasing?
+ VERIFY(acceptor_mutex_lock);
LOG << "instance=" << instance << " has v=" << value;
if (instance > instance_h) {
LOG << "highestacceptedinstance = " << instance;
l.loginstance(instance, value);
instance_h = instance;
accepted = promise = {0, me};
- string v = value; // gaaahhh aliasing of value and accepted_value
- accepted_value.clear(); // this wipes out "value", too
+ accepted_value.clear();
if (delegate) {
- pxs_mutex_lock.unlock();
- delegate->paxos_commit(instance, v);
- pxs_mutex_lock.lock();
+ acceptor_mutex_lock.unlock();
+ delegate->paxos_commit(instance, value);
+ acceptor_mutex_lock.lock();
}
}
}