#include "paxos.h"
#include "handle.h"
-#include <stdio.h>
-#include "tprintf.h"
+#include "threaded_log.h"
#include "lang/verify.h"
#include "lock.h"
+using std::stoi;
+
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
// Simple". To kick off an instance of Paxos, the caller supplies a
return (a.n > b.n || (a.n == b.n && a.m >= b.m));
}
-std::string
-print_members(const std::vector<std::string> &nodes) {
- std::string s;
+string
+print_members(const vector<string> &nodes) {
+ string s;
s.clear();
for (unsigned i = 0; i < nodes.size(); i++) {
s += nodes[i];
}
-bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+bool isamember(const string & m, const vector<string> & nodes) {
for (auto n : nodes) {
if (n == m)
return 1;
}
// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const std::vector<std::string> &l1,
- const std::vector<std::string> &l2) {
+bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
unsigned n = 0;
for (unsigned i = 0; i < l1.size(); i++) {
return n >= (l1.size() >> 1) + 1;
}
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
- const std::string &_me)
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
: cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
}
-bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
- const std::string & newv)
+bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
{
- std::vector<std::string> accepts;
- std::vector<std::string> nodes;
- std::string v;
+ vector<string> accepts;
+ vector<string> nodes;
+ string v;
bool r = false;
lock ml(pxs_mutex);
- tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
- print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
if (!stable) { // already running proposer?
- tprintf("proposer::run: already running\n");
+ LOG("proposer::run: already running");
return false;
}
stable = false;
if (prepare(instance, accepts, cur_nodes, v)) {
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of prepare responses\n");
+ LOG("paxos::manager: received a majority of prepare responses");
if (v.size() == 0)
v = newv;
accept(instance, accepts, nodes, v);
if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of accept responses\n");
+ LOG("paxos::manager: received a majority of accept responses");
breakpoint2();
decide(instance, accepts, v);
r = true;
} else {
- tprintf("paxos::manager: no majority of accept responses\n");
+ LOG("paxos::manager: no majority of accept responses");
}
} else {
- tprintf("paxos::manager: no majority of prepare responses\n");
+ LOG("paxos::manager: no majority of prepare responses");
}
} else {
- tprintf("paxos::manager: prepare is rejected %d\n", stable);
+ LOG("paxos::manager: prepare is rejected " << stable);
}
stable = true;
return r;
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes,
- std::string & v)
+proposer::prepare(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes,
+ string & v)
{
struct paxos_protocol::preparearg arg = { instance, my_n };
struct paxos_protocol::prepareres res;
int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
- tprintf("commiting old instance!\n");
+ LOG("commiting old instance!");
acc->commit(instance, res.v_a);
return false;
}
if (res.accept) {
accepts.push_back(i);
if (res.n_a >= n_a) {
- tprintf("found a newer accepted proposal\n");
+ LOG("found a newer accepted proposal");
v = res.v_a;
n_a = res.n_a;
}
// run() calls this to send out accept RPCs to accepts.
// fill in accepts with list of nodes that accepted.
void
-proposer::accept(unsigned instance, std::vector<std::string> & accepts,
- const std::vector<std::string> & nodes, const std::string & v)
+proposer::accept(unsigned instance, vector<string> & accepts,
+ const vector<string> & nodes, const string & v)
{
struct paxos_protocol::acceptarg arg = { instance, my_n, v };
rpcc *r;
}
void
-proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
- const std::string & v)
+proposer::decide(unsigned instance, const vector<string> & accepts,
+ const string & v)
{
struct paxos_protocol::decidearg arg = { instance, v };
rpcc *r;
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
- const std::string & _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
+ const string & _value)
: cfg(_cfg), me (_me), instance_h(0)
{
n_h.n = 0;
instance_h = 1;
}
- pxs = new rpcs((uint32_t)std::stoi(_me));
+ pxs = new rpcs((uint32_t)stoi(_me));
pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
paxos_protocol::preparearg a)
{
lock ml(pxs_mutex);
l->logprop(n_h);
r.accept = true;
} else {
- tprintf("I totally rejected this request. Ha.\n");
+ LOG("I totally rejected this request. Ha.");
}
return paxos_protocol::OK;
}
paxos_protocol::status
-acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
{
lock ml(pxs_mutex);
r = false;
// the src argument is only for debugging
paxos_protocol::status
-acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
+acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
{
lock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
- a.instance, instance_h, v_a.c_str());
+ LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
if (a.instance == instance_h + 1) {
VERIFY(v_a == a.v);
commit(a.instance, v_a, ml);
}
void
-acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
+acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
{
- tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+ LOG("acceptor::commit: instance=" << instance << " has v=" << value);
if (instance > instance_h) {
- tprintf("commit: highestaccepteinstance = %d\n", instance);
+ LOG("commit: highestaccepteinstance = " << instance);
values[instance] = value;
l->loginstance(instance, value);
instance_h = instance;
}
void
-acceptor::commit(unsigned instance, const std::string & value)
+acceptor::commit(unsigned instance, const string & value)
{
lock ml(pxs_mutex);
commit(instance, value, ml);
}
-std::string
+string
acceptor::dump()
{
return l->dump();
}
void
-acceptor::restore(const std::string & s)
+acceptor::restore(const string & s)
{
l->restore(s);
l->logread();
proposer::breakpoint1()
{
if (break1) {
- tprintf("Dying at breakpoint 1!\n");
+ LOG("Dying at breakpoint 1!");
exit(1);
}
}
proposer::breakpoint2()
{
if (break2) {
- tprintf("Dying at breakpoint 2!\n");
+ LOG("Dying at breakpoint 2!");
exit(1);
}
}
proposer::breakpoint(int b)
{
if (b == 3) {
- tprintf("Proposer: breakpoint 1\n");
+ LOG("Proposer: breakpoint 1");
break1 = true;
} else if (b == 4) {
- tprintf("Proposer: breakpoint 2\n");
+ LOG("Proposer: breakpoint 2");
break2 = true;
}
}