// paxos_commit to inform higher layers of the agreed value for this
// instance.
-
-bool
-operator> (const prop_t &a, const prop_t &b)
-{
- return (a.n > b.n || (a.n == b.n && a.m > b.m));
+bool operator> (const prop_t &a, const prop_t &b) {
+ return (a.n > b.n || (a.n == b.n && a.m > b.m));
}
-bool
-operator>= (const prop_t &a, const prop_t &b)
-{
- return (a.n > b.n || (a.n == b.n && a.m >= b.m));
+bool operator>= (const prop_t &a, const prop_t &b) {
+ return (a.n > b.n || (a.n == b.n && a.m >= b.m));
}
std::string
-print_members(const std::vector<std::string> &nodes)
-{
- std::string s;
- s.clear();
- for (unsigned i = 0; i < nodes.size(); i++) {
- s += nodes[i];
- if (i < (nodes.size()-1))
- s += ",";
- }
- return s;
+print_members(const std::vector<std::string> &nodes) {
+ std::string s;
+ s.clear();
+ for (unsigned i = 0; i < nodes.size(); i++) {
+ s += nodes[i];
+ if (i < (nodes.size()-1))
+ s += ",";
+ }
+ return s;
}
-bool isamember(std::string m, const std::vector<std::string> &nodes)
-{
- for (unsigned i = 0; i < nodes.size(); i++) {
- if (nodes[i] == m) return 1;
- }
- return 0;
+
+bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+ for (auto n : nodes) {
+ if (n == m)
+ return 1;
+ }
+ return 0;
}
-bool
-proposer::isrunning()
-{
- bool r;
- lock ml(pxs_mutex);
- r = !stable;
- return r;
+bool proposer::isrunning() {
+ bool r;
+ lock ml(pxs_mutex);
+ r = !stable;
+ return r;
}
// check if the servers in l2 contains a majority of servers in l1
-bool
-proposer::majority(const std::vector<std::string> &l1,
- const std::vector<std::string> &l2)
-{
- unsigned n = 0;
+bool proposer::majority(const std::vector<std::string> &l1,
+ const std::vector<std::string> &l2) {
+ unsigned n = 0;
- for (unsigned i = 0; i < l1.size(); i++) {
- if (isamember(l1[i], l2))
- n++;
- }
- return n >= (l1.size() >> 1) + 1;
+ for (unsigned i = 0; i < l1.size(); i++) {
+ if (isamember(l1[i], l2))
+ n++;
+ }
+ return n >= (l1.size() >> 1) + 1;
}
proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
- std::string _me)
+ const std::string &_me)
: cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
- my_n.n = 0;
- my_n.m = me;
+ my_n.n = 0;
+ my_n.m = me;
}
-void
-proposer::setn()
+void proposer::setn()
{
- my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+ my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
}
-bool
-proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
+bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
+ const std::string & newv)
{
- std::vector<std::string> accepts;
- std::vector<std::string> nodes;
- std::string v;
- bool r = false;
-
- lock ml(pxs_mutex);
- tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
- print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
- if (!stable) { // already running proposer?
- tprintf("proposer::run: already running\n");
- return false;
- }
- stable = false;
- setn();
- accepts.clear();
- v.clear();
- if (prepare(instance, accepts, cur_nodes, v)) {
-
- if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of prepare responses\n");
-
- if (v.size() == 0)
- v = newv;
-
- breakpoint1();
-
- nodes = accepts;
- accepts.clear();
- accept(instance, accepts, nodes, v);
-
- if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of accept responses\n");
-
- breakpoint2();
-
- decide(instance, accepts, v);
- r = true;
- } else {
- tprintf("paxos::manager: no majority of accept responses\n");
- }
+ std::vector<std::string> accepts;
+ std::vector<std::string> nodes;
+ std::string v;
+ bool r = false;
+
+ lock ml(pxs_mutex);
+ tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
+ print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ if (!stable) { // already running proposer?
+ tprintf("proposer::run: already running\n");
+ return false;
+ }
+ stable = false;
+ setn();
+ accepts.clear();
+ v.clear();
+ if (prepare(instance, accepts, cur_nodes, v)) {
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of prepare responses\n");
+
+ if (v.size() == 0)
+ v = newv;
+
+ breakpoint1();
+
+ nodes = accepts;
+ accepts.clear();
+ accept(instance, accepts, nodes, v);
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of accept responses\n");
+
+ breakpoint2();
+
+ decide(instance, accepts, v);
+ r = true;
+ } else {
+ tprintf("paxos::manager: no majority of accept responses\n");
+ }
+ } else {
+ tprintf("paxos::manager: no majority of prepare responses\n");
+ }
} else {
- tprintf("paxos::manager: no majority of prepare responses\n");
+ tprintf("paxos::manager: prepare is rejected %d\n", stable);
}
- } else {
- tprintf("paxos::manager: prepare is rejected %d\n", stable);
- }
- stable = true;
- return r;
+ stable = true;
+ return r;
}
// proposer::run() calls prepare to send prepare RPCs to nodes
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes,
- std::string &v)
+proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes,
+ std::string & v)
{
struct paxos_protocol::preparearg arg = { instance, my_n };
struct paxos_protocol::prepareres res;
prop_t n_a = { 0, "" };
rpcc *r;
- for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
- handle h(*i);
+ for (auto i : nodes) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
return false;
}
if (res.accept) {
- accepts.push_back(*i);
+ accepts.push_back(i);
if (res.n_a >= n_a) {
tprintf("found a newer accepted proposal\n");
v = res.v_a;
// run() calls this to send out accept RPCs to accepts.
// fill in accepts with list of nodes that accepted.
void
-proposer::accept(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes, std::string v)
+proposer::accept(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes, const std::string & v)
{
struct paxos_protocol::acceptarg arg = { instance, my_n, v };
rpcc *r;
- for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
- handle h(*i);
+ for (auto i : nodes) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
bool accept = false;
int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
- if (status == paxos_protocol::OK) {
- if (accept)
- accepts.push_back(*i);
- }
+ if (status == paxos_protocol::OK && accept)
+ accepts.push_back(i);
}
}
void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
- std::string v)
+proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
+ const std::string & v)
{
struct paxos_protocol::decidearg arg = { instance, v };
rpcc *r;
- for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
- handle h(*i);
+ for (auto i : accepts) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
int res = 0;
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
- std::string _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
+ const std::string & _value)
: cfg(_cfg), me (_me), instance_h(0)
{
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
-
- l = new log (this, me);
-
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l->loginstance(1, _value);
- instance_h = 1;
- }
-
- pxs = new rpcs(atoi(_me.c_str()));
- pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
- pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
- pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+
+ l = new log (this, me);
+
+ if (instance_h == 0 && _first) {
+ values[1] = _value;
+ l->loginstance(1, _value);
+ instance_h = 1;
+ }
+
+ pxs = new rpcs((uint32_t)std::stoi(_me));
+ pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+ pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+ pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
+acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+ paxos_protocol::preparearg a)
{
lock ml(pxs_mutex);
r.oldinstance = false;
}
paxos_protocol::status
-acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
{
lock ml(pxs_mutex);
r = false;
return paxos_protocol::OK;
}
-// the src argument is only for debug purpose
- paxos_protocol::status
-acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
+// the src argument is only for debugging
+paxos_protocol::status
+acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
{
lock ml(pxs_mutex);
tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
a.instance, instance_h, v_a.c_str());
if (a.instance == instance_h + 1) {
VERIFY(v_a == a.v);
- commit_wo(a.instance, v_a);
+ commit(a.instance, v_a, ml);
} else if (a.instance <= instance_h) {
// we are ahead ignore.
} else {
}
void
-acceptor::commit_wo(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
{
- //assume pxs_mutex is held
- adopt_lock ml(pxs_mutex);
tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
if (instance > instance_h) {
tprintf("commit: highestaccepteinstance = %d\n", instance);
n_a.m = me;
v_a.clear();
if (cfg) {
- ml.unlock();
+ pxs_mutex_lock.unlock();
cfg->paxos_commit(instance, value);
- ml.lock();
+ pxs_mutex_lock.lock();
}
}
}
void
-acceptor::commit(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value)
{
lock ml(pxs_mutex);
- commit_wo(instance, value);
+ commit(instance, value, ml);
}
std::string
acceptor::dump()
{
- return l->dump();
+ return l->dump();
}
void
-acceptor::restore(std::string s)
+acceptor::restore(const std::string & s)
{
- l->restore(s);
- l->logread();
+ l->restore(s);
+ l->logread();
}
void
proposer::breakpoint1()
{
- if (break1) {
- tprintf("Dying at breakpoint 1!\n");
- exit(1);
- }
+ if (break1) {
+ tprintf("Dying at breakpoint 1!\n");
+ exit(1);
+ }
}
// Call this from your code between phases accept and decide of proposer
void
proposer::breakpoint2()
{
- if (break2) {
- tprintf("Dying at breakpoint 2!\n");
- exit(1);
- }
+ if (break2) {
+ tprintf("Dying at breakpoint 2!\n");
+ exit(1);
+ }
}
void
proposer::breakpoint(int b)
{
- if (b == 3) {
- tprintf("Proposer: breakpoint 1\n");
- break1 = true;
- } else if (b == 4) {
- tprintf("Proposer: breakpoint 2\n");
- break2 = true;
- }
+ if (b == 3) {
+ tprintf("Proposer: breakpoint 1\n");
+ break1 = true;
+ } else if (b == 4) {
+ tprintf("Proposer: breakpoint 2\n");
+ break2 = true;
+ }
}