-#include "paxos.h"
-#include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
-#include "lang/verify.h"
-#include "lock.h"
+#include "include/paxos.h"
+
+using namespace std::placeholders;
+using namespace std::chrono;
+
+paxos_change::~paxos_change() {}
+
+bool isamember(const node_t & m, const nodes_t & nodes) {
+ return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
+}
+
+// check if l2 contains a majority of the elements of l1
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+ auto overlap = (size_t)std::count_if(
+ l1.begin(), l1.end(), std::bind(isamember, _1, l2));
+ return overlap >= (l1.size() >> 1) + 1;
+}
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
// paxos_commit to inform higher layers of the agreed value for this
// instance.
-bool operator> (const prop_t &a, const prop_t &b) {
- return (a.n > b.n || (a.n == b.n && a.m > b.m));
-}
-
-bool operator>= (const prop_t &a, const prop_t &b) {
- return (a.n > b.n || (a.n == b.n && a.m >= b.m));
-}
-
-std::string
-print_members(const std::vector<std::string> &nodes) {
- std::string s;
- s.clear();
- for (unsigned i = 0; i < nodes.size(); i++) {
- s += nodes[i];
- if (i < (nodes.size()-1))
- s += ",";
- }
- return s;
-}
-
-
-bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
- for (auto n : nodes) {
- if (n == m)
- return 1;
- }
- return 0;
-}
-
-bool proposer::isrunning() {
- bool r;
- lock ml(pxs_mutex);
- r = !stable;
- return r;
-}
-
-// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const std::vector<std::string> &l1,
- const std::vector<std::string> &l2) {
- unsigned n = 0;
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
+ bool _first, const node_t & _me, const value_t & _value)
+ : delegate(_delegate), me (_me)
+{
+ l.handler<log_instance>([this] (auto entry) {
+ instance_h = entry.number;
+ values[entry.number] = entry.value;
+ accepted = promise = {0, me};
+ accepted_value.clear();
+ });
+ l.handler<log_proposal>([this] (auto entry) {
+ promise = entry.promise;
+ });
+ l.handler<log_accept>([this] (auto entry) {
+ accepted = entry.number;
+ accepted_value = entry.value;
+ });
- for (unsigned i = 0; i < l1.size(); i++) {
- if (isamember(l1[i], l2))
- n++;
- }
- return n >= (l1.size() >> 1) + 1;
-}
+ if (instance_h == 0 && _first)
+ l.append(log_instance{1, _value});
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
- const std::string &_me)
- : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
- stable (true)
-{
- my_n.n = 0;
- my_n.m = me;
-}
+ l.replay();
-void proposer::setn()
-{
- my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+ pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
+ pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
+ pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
}
-bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
- const std::string & newv)
+bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
{
- std::vector<std::string> accepts;
- std::vector<std::string> nodes;
- std::string v;
- bool r = false;
-
- lock ml(pxs_mutex);
- tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
- print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ lock ml(proposer_mutex);
+ LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
if (!stable) { // already running proposer?
- tprintf("proposer::run: already running\n");
+ LOG << "paxos proposer already running";
return false;
}
stable = false;
- setn();
- accepts.clear();
- v.clear();
+ bool r = false;
+ proposal.n = std::max(promise.n, proposal.n) + 1;
+ nodes_t accepts;
+ value_t v;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of prepare responses\n");
+ LOG << "received a majority of prepare responses";
- if (v.size() == 0)
+ if (!v.size())
v = newv;
breakpoint1();
- nodes = accepts;
- accepts.clear();
+ nodes_t nodes;
+ nodes.swap(accepts);
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of accept responses\n");
+ LOG << "received a majority of accept responses";
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- tprintf("paxos::manager: no majority of accept responses\n");
+ LOG << "no majority of accept responses";
}
} else {
- tprintf("paxos::manager: no majority of prepare responses\n");
+ LOG << "no majority of prepare responses";
}
} else {
- tprintf("paxos::manager: prepare is rejected %d\n", stable);
+ LOG << "prepare is rejected " << stable;
}
stable = true;
return r;
}
-// proposer::run() calls prepare to send prepare RPCs to nodes
-// and collect responses. if one of those nodes
-// replies with an oldinstance, return false.
-// otherwise fill in accepts with set of nodes that accepted,
-// set v to the v_a with the highest n_a, and return true.
-bool
-proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes,
- std::string & v)
-{
- struct paxos_protocol::preparearg arg = { instance, my_n };
- struct paxos_protocol::prepareres res;
- prop_t n_a = { 0, "" };
- rpcc *r;
+bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
+ const nodes_t & nodes, value_t & v) {
+ LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
+ prepareres res;
+ prop_t highest_n_a{0, ""};
for (auto i : nodes) {
- handle h(i);
- if (!(r = h.safebind()))
+ auto cl = rpcc::bind_cached(i);
+ if (!cl)
continue;
- int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
+ int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
+ res, me, instance, proposal);
if (status == paxos_protocol::OK) {
- if (res.oldinstance) {
- tprintf("commiting old instance!\n");
- acc->commit(instance, res.v_a);
+ LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
+ << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\"";
+ if (res.type == prepareres::oldinstance) {
+ LOG << "commiting old instance!";
+ lock ml(acceptor_mutex);
+ commit(instance, res.v_a, ml);
return false;
- }
- if (res.accept) {
+ } else if (res.type == prepareres::accept) {
accepts.push_back(i);
- if (res.n_a >= n_a) {
- tprintf("found a newer accepted proposal\n");
+ if (res.n_a >= highest_n_a) {
+ LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
v = res.v_a;
- n_a = res.n_a;
+ highest_n_a = res.n_a;
}
}
}
return true;
}
-// run() calls this to send out accept RPCs to accepts.
-// fill in accepts with list of nodes that accepted.
-void
-proposer::accept(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes, const std::string & v)
-{
- struct paxos_protocol::acceptarg arg = { instance, my_n, v };
- rpcc *r;
+void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
+ const nodes_t & nodes, const value_t & v) {
+ bool accept = false;
for (auto i : nodes) {
- handle h(i);
- if (!(r = h.safebind()))
- continue;
- bool accept = false;
- int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
- if (status == paxos_protocol::OK && accept)
- accepts.push_back(i);
+ if (auto cl = rpcc::bind_cached(i)) {
+ int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
+ accept, me, instance, proposal, v);
+ if (status == paxos_protocol::OK && accept)
+ accepts.push_back(i);
+ }
}
}
-void
-proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
- const std::string & v)
-{
- struct paxos_protocol::decidearg arg = { instance, v };
- rpcc *r;
- for (auto i : accepts) {
- handle h(i);
- if (!(r = h.safebind()))
- continue;
- int res = 0;
- r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
- }
-}
-
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
- const std::string & _value)
- : cfg(_cfg), me (_me), instance_h(0)
-{
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
-
- l = new log (this, me);
-
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l->loginstance(1, _value);
- instance_h = 1;
- }
-
- pxs = new rpcs((uint32_t)std::stoi(_me));
- pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
- pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
- pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
+void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
+ int res = 0;
+ for (auto i : accepts)
+ if (auto cl = rpcc::bind_cached(i))
+ cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
}
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
- paxos_protocol::preparearg a)
-{
- lock ml(pxs_mutex);
- r.oldinstance = false;
- r.accept = false;
- r.n_a = n_a;
- r.v_a = v_a;
- if (a.instance <= instance_h) {
- r.oldinstance = true;
- r.v_a = values[a.instance];
- } else if (a.n > n_h) {
- n_h = a.n;
- l->logprop(n_h);
- r.accept = true;
+proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+ LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
+ lock ml(acceptor_mutex);
+ if (instance <= instance_h) {
+ LOG << "old instance " << instance << " has value " << values[instance];
+ r = prepareres{prepareres::oldinstance, accepted, values[instance]};
+ } else if (n > promise) {
+ LOG << "looks good to me";
+ promise = n;
+ l.append(log_proposal{n});
+ r = prepareres{prepareres::accept, accepted, accepted_value};
} else {
- tprintf("I totally rejected this request. Ha.\n");
+ LOG << "I totally rejected this request. Ha.";
+ r = prepareres{prepareres::reject, accepted, accepted_value};
}
+ LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept
+ << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\"";
return paxos_protocol::OK;
}
paxos_protocol::status
-acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
-{
- lock ml(pxs_mutex);
+proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
+ lock ml(acceptor_mutex);
r = false;
- if (a.n >= n_h) {
- n_a = a.n;
- v_a = a.v;
- l->logaccept(n_a, v_a);
+ if (instance != instance_h + 1)
+ return paxos_protocol::ERR;
+ if (n >= promise) {
+ accepted = n;
+ accepted_value = v;
+ l.append(log_accept{n, v});
r = true;
}
return paxos_protocol::OK;
}
-// the src argument is only for debugging
paxos_protocol::status
-acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
-{
- lock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
- a.instance, instance_h, v_a.c_str());
- if (a.instance == instance_h + 1) {
- VERIFY(v_a == a.v);
- commit(a.instance, v_a, ml);
- } else if (a.instance <= instance_h) {
- // we are ahead ignore.
+proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
+ lock ml(acceptor_mutex);
+ LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
+ if (instance == instance_h + 1) {
+ VERIFY(accepted_value == v);
+ commit(instance, v, ml);
+ } else if (instance <= instance_h) {
+ // we are ahead; ignore.
} else {
- // we are behind
+ // we are behind.
VERIFY(0);
}
return paxos_protocol::OK;
}
-void
-acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
-{
- tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) {
+ VERIFY(&value != &accepted_value); // eited by aliasing?
+ VERIFY(acceptor_mutex_lock);
+ LOG << "instance=" << instance << " has v=" << value;
if (instance > instance_h) {
- tprintf("commit: highestaccepteinstance = %d\n", instance);
- values[instance] = value;
- l->loginstance(instance, value);
+ LOG << "highestacceptedinstance = " << instance;
+ l.append(log_instance{instance, value});
instance_h = instance;
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
- if (cfg) {
- pxs_mutex_lock.unlock();
- cfg->paxos_commit(instance, value);
- pxs_mutex_lock.lock();
+ values[instance] = value;
+ accepted = promise = {0, me};
+ accepted_value.clear();
+ if (delegate) {
+ acceptor_mutex_lock.unlock();
+ delegate->paxos_commit(instance, value);
+ acceptor_mutex_lock.lock();
}
}
}
-void
-acceptor::commit(unsigned instance, const std::string & value)
-{
- lock ml(pxs_mutex);
- commit(instance, value, ml);
-}
-
-std::string
-acceptor::dump()
-{
- return l->dump();
-}
-
-void
-acceptor::restore(const std::string & s)
-{
- l->restore(s);
- l->logread();
-}
-
-
-
// For testing purposes
-
-// Call this from your code between phases prepare and accept of proposer
-void
-proposer::breakpoint1()
-{
+void proposer_acceptor::breakpoint1() {
if (break1) {
- tprintf("Dying at breakpoint 1!\n");
+ LOG << "Dying at breakpoint 1!";
exit(1);
}
}
-// Call this from your code between phases accept and decide of proposer
-void
-proposer::breakpoint2()
-{
+void proposer_acceptor::breakpoint2() {
if (break2) {
- tprintf("Dying at breakpoint 2!\n");
+ LOG << "Dying at breakpoint 2!";
exit(1);
}
}
-void
-proposer::breakpoint(int b)
-{
+void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
- tprintf("Proposer: breakpoint 1\n");
+ LOG << "breakpoint 1";
break1 = true;
} else if (b == 4) {
- tprintf("Proposer: breakpoint 2\n");
+ LOG << "breakpoint 2";
break2 = true;
}
}