#include "paxos.h"
#include "handle.h"
-// #include <signal.h>
#include <stdio.h>
#include "tprintf.h"
#include "lang/verify.h"
+#include "lock.h"
// This module implements the proposer and acceptor of the Paxos
// distributed algorithm as described by Lamport's "Paxos Made
proposer::isrunning()
{
bool r;
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r = !stable;
return r;
}
// check if the servers in l2 contains a majority of servers in l1
bool
-proposer::majority(const std::vector<std::string> &l1,
+proposer::majority(const std::vector<std::string> &l1,
const std::vector<std::string> &l2)
{
unsigned n = 0;
return n >= (l1.size() >> 1) + 1;
}
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
std::string _me)
- : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
+ : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
my_n.n = 0;
std::string v;
bool r = false;
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
if (!stable) { // already running proposer?
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
+proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
std::vector<std::string> nodes,
std::string &v)
{
handle h(*i);
if (!(r = h.safebind()))
continue;
- int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
+ int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
tprintf("commiting old instance!\n");
if (!(r = h.safebind()))
continue;
bool accept = false;
- int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
+ int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
if (status == paxos_protocol::OK) {
if (accept)
accepts.push_back(*i);
}
void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
+proposer::decide(unsigned instance, std::vector<std::string> accepts,
std::string v)
{
struct paxos_protocol::decidearg arg = { instance, v };
if (!(r = h.safebind()))
continue;
int res = 0;
- r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
+ r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
+acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
std::string _value)
: cfg(_cfg), me (_me), instance_h(0)
{
}
pxs = new rpcs(atoi(_me.c_str()));
- pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
- pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
- pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+ pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+ pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+ pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
- paxos_protocol::prepareres &r)
+acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
{
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r.oldinstance = false;
r.accept = false;
r.n_a = n_a;
}
paxos_protocol::status
-acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
+acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
{
- ScopedLock ml(pxs_mutex);
+ lock ml(pxs_mutex);
r = false;
if (a.n >= n_h) {
n_a = a.n;
}
// the src argument is only for debug purpose
-paxos_protocol::status
-acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
+ paxos_protocol::status
+acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
{
- ScopedLock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
- a.instance, instance_h, v_a.c_str());
- if (a.instance == instance_h + 1) {
- VERIFY(v_a == a.v);
- commit_wo(a.instance, v_a);
- } else if (a.instance <= instance_h) {
- // we are ahead ignore.
- } else {
- // we are behind
- VERIFY(0);
- }
- return paxos_protocol::OK;
+ lock ml(pxs_mutex);
+ tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
+ a.instance, instance_h, v_a.c_str());
+ if (a.instance == instance_h + 1) {
+ VERIFY(v_a == a.v);
+ commit_wo(a.instance, v_a);
+ } else if (a.instance <= instance_h) {
+ // we are ahead ignore.
+ } else {
+ // we are behind
+ VERIFY(0);
+ }
+ return paxos_protocol::OK;
}
void
acceptor::commit_wo(unsigned instance, std::string value)
{
- //assume pxs_mutex is held
- tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
- if (instance > instance_h) {
- tprintf("commit: highestaccepteinstance = %d\n", instance);
- values[instance] = value;
- l->loginstance(instance, value);
- instance_h = instance;
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
- if (cfg) {
- pxs_mutex.release();
- cfg->paxos_commit(instance, value);
- pxs_mutex.acquire();
+ //assume pxs_mutex is held
+ adopt_lock ml(pxs_mutex);
+ tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
+ if (instance > instance_h) {
+ tprintf("commit: highestaccepteinstance = %d\n", instance);
+ values[instance] = value;
+ l->loginstance(instance, value);
+ instance_h = instance;
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+ if (cfg) {
+ ml.unlock();
+ cfg->paxos_commit(instance, value);
+ ml.lock();
+ }
}
- }
}
void
acceptor::commit(unsigned instance, std::string value)
{
- ScopedLock ml(pxs_mutex);
- commit_wo(instance, value);
+ lock ml(pxs_mutex);
+ commit_wo(instance, value);
}
std::string