c[who].notify_one();
}
-int lock_client::last_port = 0;
+unsigned int lock_client::last_port = 0;
lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
make_sockaddr(xdst.c_str(), &dstsock);
cl = new rpcc(dstsock);
if (cl->bind() < 0) {
- printf("lock_client: call bind\n");
+ LOG("lock_client: call bind");
}
- srand(time(NULL)^last_port);
- rlock_port = ((rand()%32000) | (0x1 << 10));
+ srandom((uint32_t)time(NULL)^last_port);
+ rlock_port = ((random()%32000) | (0x1 << 10));
const char *hname;
// VERIFY(gethostname(hname, 100) == 0);
hname = "127.0.0.1";
rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
{
lock sl(xid_mutex);
- xid = 0;
+ next_xid = 0;
}
rsmc = new rsm_client(xdst);
releaser_thread = std::thread(&lock_client::releaser, this);
}
-void lock_client::releaser() {
+void lock_client::releaser() [[noreturn]] {
while (1) {
lock_protocol::lockid_t lid;
release_fifo.deq(&lid);
sl.unlock();
int r;
rsmc->call(lock_protocol::release, r, lid, id, st.xid);
+ if (lu)
+ lu->dorelease(lid);
sl.lock();
}
st.state = lock_state::none;
if (st.state == lock_state::none || st.state == lock_state::retrying) {
if (st.state == lock_state::none) {
- lock sl(xid_mutex);
- st.xid = xid++;
+ lock l(xid_mutex);
+ st.xid = next_xid++;
}
st.state = lock_state::acquiring;
LOG("Lock " << lid << ": acquiring");
return rlock_protocol::OK;
}
-rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
lock_state &st = get_lock_state(lid);
lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
class lock_release_user {
public:
virtual void dorelease(lock_protocol::lockid_t) = 0;
- virtual ~lock_release_user() {};
+ virtual ~lock_release_user() {}
};
using std::string;
std::thread releaser_thread;
rsm_client *rsmc;
class lock_release_user *lu;
- int rlock_port;
+ unsigned int rlock_port;
string hostname;
string id;
mutex xid_mutex;
- lock_protocol::xid_t xid;
+ lock_protocol::xid_t next_xid;
fifo<lock_protocol::lockid_t> release_fifo;
mutex lock_table_lock;
lock_map lock_table;
lock_state &get_lock_state(lock_protocol::lockid_t lid);
public:
- static int last_port;
+ static unsigned int last_port;
lock_client(string xdst, class lock_release_user *l = 0);
- ~lock_client() {};
+ ~lock_client() {}
lock_protocol::status acquire(lock_protocol::lockid_t);
lock_protocol::status release(lock_protocol::lockid_t);
int stat(lock_protocol::lockid_t);
-//
-// Lock demo
-//
-
-#include "lock_protocol.h"
#include "lock_client.h"
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
+#include "tprintf.h"
char tprintf_thread_prefix = 'd';
}
lock_client *lc = new lock_client(argv[1]);
- printf ("stat returned %d\n", lc->stat("1"));
+ LOG_NONMEMBER("stat returned " << lc->stat("1"));
}
#include "lock_server.h"
#include <sstream>
-#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
#include "lang/verify.h"
return *this;
}
-template <class A, class B>
-ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
- o << "<" << d.first << "," << d.second << ">";
- return o;
-}
-
marshall & operator<<(marshall &m, const lock_state &d) {
return m << d.held << d.held_by << d.wanted_by;
}
rsm->set_state_transfer(this);
}
-void lock_server::revoker() {
+void lock_server::revoker() [[noreturn]] {
while (1) {
lock_protocol::lockid_t lid;
revoke_fifo.deq(&lid);
}
}
-void lock_server::retryer() {
+void lock_server::retryer() [[noreturn]] {
while (1) {
lock_protocol::lockid_t lid;
retry_fifo.deq(&lid);
}
}
-int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
LOG_FUNC_ENTER_SERVER;
holder h = holder(id, xid);
lock_state &st = get_lock_state(lid);
if (!found)
st.wanted_by.push_back(h);
- LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+ LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
// send revoke if we're first in line
if (st.wanted_by.front() == h)
return lock_protocol::RETRY;
}
-int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
+int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
LOG_FUNC_ENTER_SERVER;
lock_state &st = get_lock_state(lid);
lock sl(st.m);
}
lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
- printf("stat request\n");
+ LOG("stat request for " << lid);
VERIFY(0);
r = nacquire;
return lock_protocol::OK;
#include "rpc/rpc.h"
#include <arpa/inet.h>
#include <stdlib.h>
-#include <stdio.h>
+#include "tprintf.h"
#include <unistd.h>
#include "lock_server.h"
-#include "paxos.h"
#include "rsm.h"
// Main loop of lock_server
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
- srandom(getpid());
+ srandom((uint32_t)getpid());
if(argc != 3){
fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
}
void *
-test2(void *x)
+test2(int i)
{
- int i = * (int *) x;
-
tprintf ("test2: client %d acquire a release a\n", i);
lc[i]->acquire(a);
tprintf ("test2: client %d acquire done\n", i);
}
void *
-test3(void *x)
+test3(int i)
{
- int i = * (int *) x;
-
tprintf ("test3: client %d acquire a release a concurrent\n", i);
for (int j = 0; j < 10; j++) {
lc[i]->acquire(a);
}
void *
-test4(void *x)
+test4(int i)
{
- int i = * (int *) x;
-
tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
for (int j = 0; j < 10; j++) {
lc[0]->acquire(a);
}
void *
-test5(void *x)
+test5(int i)
{
- int i = * (int *) x;
-
tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
for (int j = 0; j < 10; j++) {
if (i < 5) lc[0]->acquire(a);
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
- srandom(getpid());
+ srandom((uint32_t)getpid());
if(argc < 2) {
fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
if(!test || test == 2){
// test2
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- th[i] = std::thread(test2, a);
- }
- for (int i = 0; i < nt; i++) {
+ for (int i = 0; i < nt; i++)
+ th[i] = std::thread(test2, i);
+ for (int i = 0; i < nt; i++)
th[i].join();
- }
}
if(!test || test == 3){
tprintf("test 3\n");
// test3
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- th[i] = std::thread(test3, a);
- }
- for (int i = 0; i < nt; i++) {
+ for (int i = 0; i < nt; i++)
+ th[i] = std::thread(test3, i);
+ for (int i = 0; i < nt; i++)
th[i].join();
- }
}
if(!test || test == 4){
tprintf("test 4\n");
// test 4
- for (int i = 0; i < 2; i++) {
- int *a = new int (i);
- th[i] = std::thread(test4, a);
- }
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 2; i++)
+ th[i] = std::thread(test4, i);
+ for (int i = 0; i < 2; i++)
th[i].join();
- }
}
if(!test || test == 5){
tprintf("test 5\n");
// test 5
-
- for (int i = 0; i < nt; i++) {
- int *a = new int (i);
- th[i] = std::thread(test5, a);
- }
- for (int i = 0; i < nt; i++) {
+ for (int i = 0; i < nt; i++)
+ th[i] = std::thread(test5, i);
+ for (int i = 0; i < nt; i++)
th[i].join();
- }
}
tprintf ("%s: passed all tests successfully\n", argv[0]);
// paxos_commit to inform higher layers of the agreed value for this
// instance.
-
-bool
-operator> (const prop_t &a, const prop_t &b)
-{
- return (a.n > b.n || (a.n == b.n && a.m > b.m));
+bool operator> (const prop_t &a, const prop_t &b) {
+ return (a.n > b.n || (a.n == b.n && a.m > b.m));
}
-bool
-operator>= (const prop_t &a, const prop_t &b)
-{
- return (a.n > b.n || (a.n == b.n && a.m >= b.m));
+bool operator>= (const prop_t &a, const prop_t &b) {
+ return (a.n > b.n || (a.n == b.n && a.m >= b.m));
}
std::string
-print_members(const std::vector<std::string> &nodes)
-{
- std::string s;
- s.clear();
- for (unsigned i = 0; i < nodes.size(); i++) {
- s += nodes[i];
- if (i < (nodes.size()-1))
- s += ",";
- }
- return s;
+print_members(const std::vector<std::string> &nodes) {
+ std::string s;
+ s.clear();
+ for (unsigned i = 0; i < nodes.size(); i++) {
+ s += nodes[i];
+ if (i < (nodes.size()-1))
+ s += ",";
+ }
+ return s;
}
-bool isamember(std::string m, const std::vector<std::string> &nodes)
-{
- for (unsigned i = 0; i < nodes.size(); i++) {
- if (nodes[i] == m) return 1;
- }
- return 0;
+
+bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+ for (auto n : nodes) {
+ if (n == m)
+ return 1;
+ }
+ return 0;
}
-bool
-proposer::isrunning()
-{
- bool r;
- lock ml(pxs_mutex);
- r = !stable;
- return r;
+bool proposer::isrunning() {
+ bool r;
+ lock ml(pxs_mutex);
+ r = !stable;
+ return r;
}
// check if the servers in l2 contains a majority of servers in l1
-bool
-proposer::majority(const std::vector<std::string> &l1,
- const std::vector<std::string> &l2)
-{
- unsigned n = 0;
+bool proposer::majority(const std::vector<std::string> &l1,
+ const std::vector<std::string> &l2) {
+ unsigned n = 0;
- for (unsigned i = 0; i < l1.size(); i++) {
- if (isamember(l1[i], l2))
- n++;
- }
- return n >= (l1.size() >> 1) + 1;
+ for (unsigned i = 0; i < l1.size(); i++) {
+ if (isamember(l1[i], l2))
+ n++;
+ }
+ return n >= (l1.size() >> 1) + 1;
}
proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
- std::string _me)
+ const std::string &_me)
: cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
- my_n.n = 0;
- my_n.m = me;
+ my_n.n = 0;
+ my_n.m = me;
}
-void
-proposer::setn()
+void proposer::setn()
{
- my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+ my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
}
-bool
-proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
+bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
+ const std::string & newv)
{
- std::vector<std::string> accepts;
- std::vector<std::string> nodes;
- std::string v;
- bool r = false;
-
- lock ml(pxs_mutex);
- tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
- print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
- if (!stable) { // already running proposer?
- tprintf("proposer::run: already running\n");
- return false;
- }
- stable = false;
- setn();
- accepts.clear();
- v.clear();
- if (prepare(instance, accepts, cur_nodes, v)) {
-
- if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of prepare responses\n");
-
- if (v.size() == 0)
- v = newv;
-
- breakpoint1();
-
- nodes = accepts;
- accepts.clear();
- accept(instance, accepts, nodes, v);
-
- if (majority(cur_nodes, accepts)) {
- tprintf("paxos::manager: received a majority of accept responses\n");
-
- breakpoint2();
-
- decide(instance, accepts, v);
- r = true;
- } else {
- tprintf("paxos::manager: no majority of accept responses\n");
- }
+ std::vector<std::string> accepts;
+ std::vector<std::string> nodes;
+ std::string v;
+ bool r = false;
+
+ lock ml(pxs_mutex);
+ tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
+ print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+ if (!stable) { // already running proposer?
+ tprintf("proposer::run: already running\n");
+ return false;
+ }
+ stable = false;
+ setn();
+ accepts.clear();
+ v.clear();
+ if (prepare(instance, accepts, cur_nodes, v)) {
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of prepare responses\n");
+
+ if (v.size() == 0)
+ v = newv;
+
+ breakpoint1();
+
+ nodes = accepts;
+ accepts.clear();
+ accept(instance, accepts, nodes, v);
+
+ if (majority(cur_nodes, accepts)) {
+ tprintf("paxos::manager: received a majority of accept responses\n");
+
+ breakpoint2();
+
+ decide(instance, accepts, v);
+ r = true;
+ } else {
+ tprintf("paxos::manager: no majority of accept responses\n");
+ }
+ } else {
+ tprintf("paxos::manager: no majority of prepare responses\n");
+ }
} else {
- tprintf("paxos::manager: no majority of prepare responses\n");
+ tprintf("paxos::manager: prepare is rejected %d\n", stable);
}
- } else {
- tprintf("paxos::manager: prepare is rejected %d\n", stable);
- }
- stable = true;
- return r;
+ stable = true;
+ return r;
}
// proposer::run() calls prepare to send prepare RPCs to nodes
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes,
- std::string &v)
+proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes,
+ std::string & v)
{
struct paxos_protocol::preparearg arg = { instance, my_n };
struct paxos_protocol::prepareres res;
prop_t n_a = { 0, "" };
rpcc *r;
- for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
- handle h(*i);
+ for (auto i : nodes) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
return false;
}
if (res.accept) {
- accepts.push_back(*i);
+ accepts.push_back(i);
if (res.n_a >= n_a) {
tprintf("found a newer accepted proposal\n");
v = res.v_a;
// run() calls this to send out accept RPCs to accepts.
// fill in accepts with list of nodes that accepted.
void
-proposer::accept(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes, std::string v)
+proposer::accept(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes, const std::string & v)
{
struct paxos_protocol::acceptarg arg = { instance, my_n, v };
rpcc *r;
- for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
- handle h(*i);
+ for (auto i : nodes) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
bool accept = false;
int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
- if (status == paxos_protocol::OK) {
- if (accept)
- accepts.push_back(*i);
- }
+ if (status == paxos_protocol::OK && accept)
+ accepts.push_back(i);
}
}
void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
- std::string v)
+proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
+ const std::string & v)
{
struct paxos_protocol::decidearg arg = { instance, v };
rpcc *r;
- for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
- handle h(*i);
+ for (auto i : accepts) {
+ handle h(i);
if (!(r = h.safebind()))
continue;
int res = 0;
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
- std::string _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
+ const std::string & _value)
: cfg(_cfg), me (_me), instance_h(0)
{
- n_h.n = 0;
- n_h.m = me;
- n_a.n = 0;
- n_a.m = me;
- v_a.clear();
-
- l = new log (this, me);
-
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l->loginstance(1, _value);
- instance_h = 1;
- }
-
- pxs = new rpcs(atoi(_me.c_str()));
- pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
- pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
- pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
+ n_h.n = 0;
+ n_h.m = me;
+ n_a.n = 0;
+ n_a.m = me;
+ v_a.clear();
+
+ l = new log (this, me);
+
+ if (instance_h == 0 && _first) {
+ values[1] = _value;
+ l->loginstance(1, _value);
+ instance_h = 1;
+ }
+
+ pxs = new rpcs((uint32_t)std::stoi(_me));
+ pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+ pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+ pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
+acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+ paxos_protocol::preparearg a)
{
lock ml(pxs_mutex);
r.oldinstance = false;
}
paxos_protocol::status
-acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
{
lock ml(pxs_mutex);
r = false;
return paxos_protocol::OK;
}
-// the src argument is only for debug purpose
- paxos_protocol::status
-acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
+// the src argument is only for debugging
+paxos_protocol::status
+acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
{
lock ml(pxs_mutex);
tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
a.instance, instance_h, v_a.c_str());
if (a.instance == instance_h + 1) {
VERIFY(v_a == a.v);
- commit_wo(a.instance, v_a);
+ commit(a.instance, v_a, ml);
} else if (a.instance <= instance_h) {
// we are ahead ignore.
} else {
}
void
-acceptor::commit_wo(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
{
- //assume pxs_mutex is held
- adopt_lock ml(pxs_mutex);
tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
if (instance > instance_h) {
tprintf("commit: highestaccepteinstance = %d\n", instance);
n_a.m = me;
v_a.clear();
if (cfg) {
- ml.unlock();
+ pxs_mutex_lock.unlock();
cfg->paxos_commit(instance, value);
- ml.lock();
+ pxs_mutex_lock.lock();
}
}
}
void
-acceptor::commit(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value)
{
lock ml(pxs_mutex);
- commit_wo(instance, value);
+ commit(instance, value, ml);
}
std::string
acceptor::dump()
{
- return l->dump();
+ return l->dump();
}
void
-acceptor::restore(std::string s)
+acceptor::restore(const std::string & s)
{
- l->restore(s);
- l->logread();
+ l->restore(s);
+ l->logread();
}
void
proposer::breakpoint1()
{
- if (break1) {
- tprintf("Dying at breakpoint 1!\n");
- exit(1);
- }
+ if (break1) {
+ tprintf("Dying at breakpoint 1!\n");
+ exit(1);
+ }
}
// Call this from your code between phases accept and decide of proposer
void
proposer::breakpoint2()
{
- if (break2) {
- tprintf("Dying at breakpoint 2!\n");
- exit(1);
- }
+ if (break2) {
+ tprintf("Dying at breakpoint 2!\n");
+ exit(1);
+ }
}
void
proposer::breakpoint(int b)
{
- if (b == 3) {
- tprintf("Proposer: breakpoint 1\n");
- break1 = true;
- } else if (b == 4) {
- tprintf("Proposer: breakpoint 2\n");
- break2 = true;
- }
+ if (b == 3) {
+ tprintf("Proposer: breakpoint 1\n");
+ break1 = true;
+ } else if (b == 4) {
+ tprintf("Proposer: breakpoint 2\n");
+ break2 = true;
+ }
}
#include "rpc/rpc.h"
#include "paxos_protocol.h"
#include "log.h"
+#include "lock.h"
class paxos_change {
- public:
- virtual void paxos_commit(unsigned instance, const std::string &v) = 0;
- virtual ~paxos_change() {};
+ public:
+ virtual void paxos_commit(unsigned instance, const std::string & v) = 0;
+ virtual ~paxos_change() {}
};
class acceptor {
- private:
- log *l;
- rpcs *pxs;
- paxos_change *cfg;
- std::string me;
- std::mutex pxs_mutex;
-
- // Acceptor state
- prop_t n_h; // number of the highest proposal seen in a prepare
- prop_t n_a; // number of highest proposal accepted
- std::string v_a; // value of highest proposal accepted
- unsigned instance_h; // number of the highest instance we have decided
- std::map<unsigned,std::string> values; // vals of each instance
-
- void commit_wo(unsigned instance, std::string v);
- paxos_protocol::status preparereq(paxos_protocol::prepareres &r,
- std::string src, paxos_protocol::preparearg a);
- paxos_protocol::status acceptreq(bool &r, std::string src,
- paxos_protocol::acceptarg a);
- paxos_protocol::status decidereq(int &r, std::string src,
- paxos_protocol::decidearg a);
-
- friend class log;
-
- public:
- acceptor(class paxos_change *cfg, bool _first, std::string _me,
- std::string _value);
- ~acceptor() {};
- void commit(unsigned instance, std::string v);
- unsigned instance() { return instance_h; }
- std::string value(unsigned instance) { return values[instance]; }
- std::string dump();
- void restore(std::string);
- rpcs *get_rpcs() { return pxs; };
- prop_t get_n_h() { return n_h; };
- unsigned get_instance_h() { return instance_h; };
+ private:
+ log *l;
+ rpcs *pxs;
+ paxos_change *cfg;
+ std::string me;
+ mutex pxs_mutex;
+
+ // Acceptor state
+ prop_t n_h; // number of the highest proposal seen in a prepare
+ prop_t n_a; // number of highest proposal accepted
+ std::string v_a; // value of highest proposal accepted
+ unsigned instance_h; // number of the highest instance we have decided
+ std::map<unsigned,std::string> values; // vals of each instance
+
+ void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock);
+ paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
+ const std::string & src, paxos_protocol::preparearg a);
+ paxos_protocol::status acceptreq(bool & r, const std::string & src,
+ paxos_protocol::acceptarg a);
+ paxos_protocol::status decidereq(int & r, const std::string & src,
+ paxos_protocol::decidearg a);
+
+ friend class log;
+
+ public:
+ acceptor(class paxos_change *cfg, bool _first, const std::string & _me,
+ const std::string & _value);
+ ~acceptor() {}
+ void commit(unsigned instance, const std::string & v);
+ unsigned instance() { return instance_h; }
+ const std::string & value(unsigned instance) { return values[instance]; }
+ std::string dump();
+ void restore(const std::string &);
+ rpcs *get_rpcs() { return pxs; }
+ prop_t get_n_h() { return n_h; }
+ unsigned get_instance_h() { return instance_h; }
};
-extern bool isamember(std::string m, const std::vector<std::string> &nodes);
-extern std::string print_members(const std::vector<std::string> &nodes);
+extern bool isamember(const std::string & m, const std::vector<std::string> & nodes);
+extern std::string print_members(const std::vector<std::string> & nodes);
class proposer {
- private:
- log *l;
- paxos_change *cfg;
- acceptor *acc;
- std::string me;
- bool break1;
- bool break2;
-
- std::mutex pxs_mutex;
-
- // Proposer state
- bool stable;
- prop_t my_n; // number of the last proposal used in this instance
-
- void setn();
- bool prepare(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes,
- std::string &v);
- void accept(unsigned instance, std::vector<std::string> &accepts,
- std::vector<std::string> nodes, std::string v);
- void decide(unsigned instance, std::vector<std::string> accepts,
- std::string v);
-
- void breakpoint1();
- void breakpoint2();
- bool majority(const std::vector<std::string> &l1, const std::vector<std::string> &l2);
-
- friend class log;
- public:
- proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me);
- ~proposer() {};
- bool run(int instance, std::vector<std::string> cnodes, std::string v);
- bool isrunning();
- void breakpoint(int b);
+ private:
+ log *l;
+ paxos_change *cfg;
+ acceptor *acc;
+ std::string me;
+ bool break1;
+ bool break2;
+
+ mutex pxs_mutex;
+
+ // Proposer state
+ bool stable;
+ prop_t my_n; // number of the last proposal used in this instance
+
+ void setn();
+ bool prepare(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes,
+ std::string & v);
+ void accept(unsigned instance, std::vector<std::string> & accepts,
+ const std::vector<std::string> & nodes, const std::string & v);
+ void decide(unsigned instance, const std::vector<std::string> & accepts,
+ const std::string & v);
+
+ void breakpoint1();
+ void breakpoint2();
+ bool majority(const std::vector<std::string> & l1, const std::vector<std::string> & l2);
+
+ friend class log;
+ public:
+ proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me);
+ ~proposer() {}
+ bool run(unsigned instance, const std::vector<std::string> & cnodes, const std::string & v);
+ bool isrunning();
+ void breakpoint(int b);
};
template<class T>
class fifo {
public:
- fifo(int limit=0) : max_(limit) {};
+ fifo(size_t limit=0) : max_(limit) {}
bool enq(T, bool blocking=true);
void deq(T *);
bool size() {
lock ml(m_);
return q_.size();
- };
+ }
private:
std::list<T> q_;
mutex m_;
cond non_empty_c_; // q went non-empty
cond has_space_c_; // q is not longer overfull
- unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit
+ size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
};
template<class T> bool
#include "lang/verify.h"
struct request_header {
- request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) :
+ request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) :
xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
int xid;
int proc;
unsigned int clt_nonce;
unsigned int srv_nonce;
int xid_rep;
- request_header hton() const {
- return {
- htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep)
- };
- }
};
struct reply_header {
reply_header(int x=0, int r=0): xid(x), ret(r) {}
int xid;
int ret;
- reply_header hton() const {
- return {
- htonl(xid), htonl(ret)
- };
- }
};
+template<class T> inline T hton(T t);
+
+constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
+
+template<> inline uint8_t hton(uint8_t t) { return t; }
+template<> inline int8_t hton(int8_t t) { return t; }
+template<> inline uint16_t hton(uint16_t t) { return htons(t); }
+template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
+template<> inline uint32_t hton(uint32_t t) { return htonl(t); }
+template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
+template<> inline uint64_t hton(uint64_t t) {
+ if (!endianness.is_little_endian)
+ return t;
+ return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
+}
+template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
+template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; }
+template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; }
+
+template <class T> inline T ntoh(T t) { return hton(t); }
+
typedef int rpc_sz_t;
//size of initial buffer allocation
free(buf_);
}
- int size() { return index_;}
+ size_t size() { return index_;}
char *cstr() { return buf_;}
const char *cstr() const { return buf_;}
- void rawbyte(unsigned char x) {
+ void rawbyte(uint8_t x) {
reserve(1);
- buf_[index_++] = x;
+ buf_[index_++] = (int8_t)x;
}
- void rawbytes(const char *p, int n) {
+ void rawbytes(const char *p, size_t n) {
reserve(n);
memcpy(buf_+index_, p, n);
index_ += n;
void pack_req_header(const request_header &h);
void pack_reply_header(const reply_header &h);
- void take_buf(char **b, int *s) {
+ void take_buf(char **b, size_t *s) {
*b = buf_;
*s = index_;
buf_ = NULL;
};
marshall& operator<<(marshall &, bool);
-marshall& operator<<(marshall &, unsigned int);
-marshall& operator<<(marshall &, int);
-marshall& operator<<(marshall &, unsigned char);
-marshall& operator<<(marshall &, char);
-marshall& operator<<(marshall &, unsigned short);
-marshall& operator<<(marshall &, short);
-marshall& operator<<(marshall &, unsigned long long);
+marshall& operator<<(marshall &, uint32_t);
+marshall& operator<<(marshall &, int32_t);
+marshall& operator<<(marshall &, uint8_t);
+marshall& operator<<(marshall &, int8_t);
+marshall& operator<<(marshall &, uint16_t);
+marshall& operator<<(marshall &, int16_t);
+marshall& operator<<(marshall &, uint64_t);
marshall& operator<<(marshall &, const std::string &);
template <class A> marshall &
return m;
}
+class unmarshall;
+
+unmarshall& operator>>(unmarshall &, bool &);
+unmarshall& operator>>(unmarshall &, uint8_t &);
+unmarshall& operator>>(unmarshall &, int8_t &);
+unmarshall& operator>>(unmarshall &, uint16_t &);
+unmarshall& operator>>(unmarshall &, int16_t &);
+unmarshall& operator>>(unmarshall &, uint32_t &);
+unmarshall& operator>>(unmarshall &, int32_t &);
+unmarshall& operator>>(unmarshall &, size_t &);
+unmarshall& operator>>(unmarshall &, uint64_t &);
+unmarshall& operator>>(unmarshall &, int64_t &);
+unmarshall& operator>>(unmarshall &, std::string &);
+
class unmarshall {
private:
char *buf_;
- int sz_;
- int index_;
+ size_t sz_;
+ size_t index_;
bool ok_;
inline bool ensure(size_t n);
public:
unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
- unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {}
+ unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
{
//take the content which does not exclude a RPC header from a string
char *cstr() { return buf_;}
bool okdone() const { return ok_ && index_ == sz_; }
- unsigned int rawbyte();
+ uint8_t rawbyte();
void rawbytes(std::string &s, size_t n);
+ template <class T> void rawbytes(T &t);
- int ind() { return index_;}
- int size() { return sz_;}
- void unpack(int *); //non-const ref
- void take_buf(char **b, int *sz) {
+ size_t ind() { return index_;}
+ size_t size() { return sz_;}
+ void take_buf(char **b, size_t *sz) {
*b = buf_;
*sz = sz_;
sz_ = index_ = 0;
void unpack_req_header(request_header *h) {
//the first 4-byte is for channel to fill size of pdu
index_ = sizeof(rpc_sz_t);
- unpack(&h->xid);
- unpack(&h->proc);
- unpack((int *)&h->clt_nonce);
- unpack((int *)&h->srv_nonce);
- unpack(&h->xid_rep);
+ *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep;
index_ = RPC_HEADER_SZ;
}
void unpack_reply_header(reply_header *h) {
//the first 4-byte is for channel to fill size of pdu
index_ = sizeof(rpc_sz_t);
- unpack(&h->xid);
- unpack(&h->ret);
+ *this >> h->xid >> h->ret;
index_ = RPC_HEADER_SZ;
}
PollMgr *PollMgr::instance = NULL;
static std::once_flag pollmgr_is_initialized;
-void
+static void
PollMgrInit()
{
PollMgr::instance = new PollMgr();
th_ = std::thread(&PollMgr::wait_loop, this);
}
-PollMgr::~PollMgr()
+PollMgr::~PollMgr() [[noreturn]]
{
//never kill me!!!
VERIFY(0);
}
void
-PollMgr::wait_loop()
+PollMgr::wait_loop() [[noreturn]]
{
std::vector<int> readable;
#include "lock.h"
#include "jsl_log.h"
+#include "tprintf.h"
#include "lang/verify.h"
const rpcc::TO rpcc::to_max = { 120000 };
const rpcc::TO rpcc::to_min = { 1000 };
-rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
+rpcc::caller::caller(int xxid, unmarshall *xun)
: xid(xxid), un(xun), done(false)
{
}
void set_rand_seed()
{
auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
- srandom((int)now.time_since_epoch().count()^((int)getpid()));
+ srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
rpcc::rpcc(sockaddr_in d, bool retrans) :
{
if(retrans){
set_rand_seed();
- clt_nonce_ = random();
+ clt_nonce_ = (unsigned int)random();
} else {
// special client nonce 0 means this client does not
// require at-most-once logic from the server
int
rpcc::bind(TO to)
{
- int r;
+ unsigned int r;
int ret = call_timeout(rpc_const::bind, to, r, 0);
if(ret == 0){
lock ml(m_);
rpcc::cancel(void)
{
lock ml(m_);
- printf("rpcc::cancel: force callers to fail\n");
- std::map<int,caller*>::iterator iter;
- for(iter = calls_.begin(); iter != calls_.end(); iter++){
- caller *ca = iter->second;
+ tprintf("rpcc::cancel: force callers to fail");
+ for(auto &p : calls_){
+ caller *ca = p.second;
jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
{
destroy_wait_ = true;
destroy_wait_c_.wait(ml);
}
- printf("rpcc::cancel: done\n");
+ tprintf("rpcc::cancel: done");
}
int
{
caller ca(0, &rep);
- int xid_rep;
+ int xid_rep;
{
lock ml(m_);
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
xid_rep = xid_rep_window_.front();
}
}
else jsl_log(JSL_DBG_1, "not reachable\n");
jsl_log(JSL_DBG_2,
- "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
+ "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
clt_nonce_, proc, ca.xid, clt_nonce_);
}
transmit = false; // only send once on a given channel
lock cal(ca.m);
jsl_log(JSL_DBG_2,
- "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
+ "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
ntohs(dst_.sin_port), ca.done, ca.intret);
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *c, char *b, size_t sz)
+rpcc::got_pdu(connection *, char *b, size_t sz)
{
unmarshall rep(b, sz);
reply_header h;
// assumes thread holds mutex m
void
-rpcc::update_xid_rep(unsigned int xid)
+rpcc::update_xid_rep(int xid)
{
- std::list<unsigned int>::iterator it;
-
if(xid <= xid_rep_window_.front()){
return;
}
- for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+ for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
if(*it > xid){
xid_rep_window_.insert(it, xid);
goto compress;
xid_rep_window_.push_back(xid);
compress:
- it = xid_rep_window_.begin();
+ auto it = xid_rep_window_.begin();
for (it++; it != xid_rep_window_.end(); it++){
while (xid_rep_window_.front() + 1 == *it)
xid_rep_window_.pop_front();
}
}
-rpcs::rpcs(unsigned int p1, int count)
+rpcs::rpcs(unsigned int p1, size_t count)
: port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
{
set_rand_seed();
- nonce_ = random();
+ nonce_ = (unsigned int)random();
jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
char *loss_env = getenv("RPC_LOSSY");
counts_[proc]++;
curr_counts_--;
if(curr_counts_ == 0){
- std::map<int, int>::iterator i;
- printf("RPC STATS: ");
- for (i = counts_.begin(); i != counts_.end(); i++){
- printf("%x:%d ", i->first, i->second);
- }
- printf("\n");
+ tprintf("RPC STATS: ");
+ for (auto i = counts_.begin(); i != counts_.end(); i++)
+ tprintf("%x:%lu ", i->first, i->second);
lock rwl(reply_window_m_);
std::map<unsigned int,std::list<reply_t> >::iterator clt;
- unsigned int totalrep = 0, maxrep = 0;
+ size_t totalrep = 0, maxrep = 0;
for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
totalrep += clt->second.size();
if(clt->second.size() > maxrep)
maxrep = clt->second.size();
}
- jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
+ jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
(int) reply_window_.size()-1, totalrep, maxrep);
curr_counts_ = counting_;
}
request_header h;
req.unpack_req_header(&h);
- int proc = h.proc;
+ unsigned int proc = (unsigned int)h.proc;
if(!req.ok()){
jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
}
jsl_log(JSL_DBG_2,
- "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
+ "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
marshall rep;
}
rpcs::rpcstate_t stat;
- char *b1;
- int sz1;
+ char *b1 = nullptr;
+ size_t sz1 = 0;
if(h.clt_nonce){
// have i seen this client before?
rep.take_buf(&b1,&sz1);
jsl_log(JSL_DBG_2,
- "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
+ "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
sz1, h.xid, proc, rh.ret, h.clt_nonce);
if(h.clt_nonce > 0){
c->send(b1, sz1);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
- jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
+ jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
h.xid, h.clt_nonce);
rh.ret = rpc_const::atmostonce_failure;
rep.pack_reply_header(rh);
// DONE: seen this xid, previous reply returned in *b and *sz.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
- unsigned int xid_rep, char **b, int *sz)
+rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
+ int xid_rep, char **b, size_t *sz)
{
lock rwl(reply_window_m_);
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
- unsigned int past_xid_rep = l.begin()->xid;
+ int past_xid_rep = l.begin()->xid;
std::list<reply_t>::iterator start = l.begin(), it;
it = ++start;
- if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
+ if (past_xid_rep < xid_rep || past_xid_rep == -1) {
// scan for deletion candidates
for (; it != l.end() && it->xid < xid_rep; it++) {
if (it->cb_present)
l.begin()->xid = xid_rep;
}
- if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
+ if (xid < past_xid_rep && past_xid_rep != -1)
return FORGOTTEN;
// skip non-deletion candidates
// free_reply_window() and checkduplicate_and_update is responsible for
// calling free(b).
void
-rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
- char *b, int sz)
+rpcs::add_reply(unsigned int clt_nonce, int xid,
+ char *b, size_t sz)
{
lock rwl(reply_window_m_);
// remember the RPC reply value
void
rpcs::free_reply_window(void)
{
- std::map<unsigned int,std::list<reply_t> >::iterator clt;
- std::list<reply_t>::iterator it;
-
lock rwl(reply_window_m_);
- for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
- for (it = clt->second.begin(); it != clt->second.end(); it++){
+ for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+ for (auto it = clt->second.begin(); it != clt->second.end(); it++){
if (it->cb_present)
free(it->buf);
}
// rpc handler
int
-rpcs::rpcbind(int &r, int a)
+rpcs::rpcbind(unsigned int &r, int)
{
jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
r = nonce_;
marshall &
operator<<(marshall &m, uint16_t x) {
- x = htons(x);
+ x = hton(x);
m.rawbytes((char *)&x, 2);
return m;
}
marshall &
operator<<(marshall &m, uint32_t x) {
- x = htonl(x);
+ x = hton(x);
m.rawbytes((char *)&x, 4);
return m;
}
-marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
-marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
+marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
+marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
marshall &
}
void marshall::pack_req_header(const request_header &h) {
- int saved_sz = index_;
+ size_t saved_sz = index_;
//leave the first 4-byte empty for channel to fill size of pdu
index_ = sizeof(rpc_sz_t);
*this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
}
void marshall::pack_reply_header(const reply_header &h) {
- int saved_sz = index_;
+ size_t saved_sz = index_;
//leave the first 4-byte empty for channel to fill size of pdu
index_ = sizeof(rpc_sz_t);
*this << h.xid << h.ret;
index_ = saved_sz;
}
-void
-unmarshall::unpack(int *x)
-{
- (*x) = (rawbyte() & 0xff) << 24;
- (*x) |= (rawbyte() & 0xff) << 16;
- (*x) |= (rawbyte() & 0xff) << 8;
- (*x) |= rawbyte() & 0xff;
-}
-
// take the contents from another unmarshall object
void
unmarshall::take_in(unmarshall &another)
return ok_;
}
-unsigned int
+inline uint8_t
unmarshall::rawbyte()
{
if (!ensure(1))
return 0;
- return buf_[index_++];
+ return (uint8_t)buf_[index_++];
}
void
index_ += n;
}
-unmarshall &
-operator>>(unmarshall &u, bool &x)
-{
- x = (bool)u.rawbyte();
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned char &x)
-{
- x = (unsigned char)u.rawbyte();
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, char &x)
-{
- x = (char)u.rawbyte();
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned short &x)
-{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, short &x)
-{
- x = (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned int &x)
-{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, int &x)
-{
- x = (u.rawbyte() & 0xff) << 24;
- x |= (u.rawbyte() & 0xff) << 16;
- x |= (u.rawbyte() & 0xff) << 8;
- x |= u.rawbyte() & 0xff;
- return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned long long &x)
+template <class T>
+void
+unmarshall::rawbytes(T &t)
{
- unsigned int h, l;
- u >> h;
- u >> l;
- x = l | ((unsigned long long) h << 32);
- return u;
+ const size_t n = sizeof(T);
+ VERIFY(ensure(n));
+ memcpy(&t, buf_+index_, n);
+ t = ntoh(t);
+ index_ += n;
}
-unmarshall &
-operator>>(unmarshall &u, std::string &s)
-{
- unsigned sz;
- u >> sz;
+unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
+unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, std::string &s) {
+ unsigned sz = u.grab<unsigned>();
if(u.ok())
u.rawbytes(s, sz);
return u;
/*---------------auxilary function--------------*/
void
-make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
-
- char host[200];
- const char *localhost = "127.0.0.1";
- const char *port = index(hostandport, ':');
- if(port == NULL){
- memcpy(host, localhost, strlen(localhost)+1);
- port = hostandport;
- } else {
- memcpy(host, hostandport, port-hostandport);
- host[port-hostandport] = '\0';
- port++;
- }
-
- make_sockaddr(host, port, dst);
-
+make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
+ auto colon = hostandport.find(':');
+ if (colon == std::string::npos)
+ make_sockaddr("127.0.0.1", hostandport, dst);
+ else
+ make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
}
void
-make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
-
- in_addr_t a;
-
+make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
bzero(dst, sizeof(*dst));
dst->sin_family = AF_INET;
- a = inet_addr(host);
- if(a != INADDR_NONE){
- dst->sin_addr.s_addr = a;
- } else {
- struct hostent *hp = gethostbyname(host);
- if(hp == 0 || hp->h_length != 4){
- fprintf(stderr, "cannot find host name %s\n", host);
+ struct in_addr a{inet_addr(host.c_str())};
+
+ if(a.s_addr != INADDR_NONE)
+ dst->sin_addr.s_addr = a.s_addr;
+ else {
+ struct hostent *hp = gethostbyname(host.c_str());
+
+ if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
+ fprintf(stderr, "cannot find host name %s\n", host.c_str());
exit(1);
}
- dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+ memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
+ dst->sin_addr.s_addr = a.s_addr;
}
- dst->sin_port = htons(atoi(port));
+ dst->sin_port = hton((uint16_t)std::stoi(port));
}
//manages per rpc info
struct caller {
- caller(unsigned int xxid, unmarshall *un);
+ caller(int xxid, unmarshall *un);
~caller();
- unsigned int xid;
+ int xid;
unmarshall *un;
int intret;
bool done;
};
void get_refconn(connection **ch);
- void update_xid_rep(unsigned int xid);
+ void update_xid_rep(int xid);
sockaddr_in dst_;
unsigned int clt_nonce_;
unsigned int srv_nonce_;
bool bind_done_;
- unsigned int xid_;
+ int xid_;
int lossytest_;
bool retrans_;
bool reachable_;
std::condition_variable destroy_wait_c_;
std::map<int, caller *> calls_;
- std::list<unsigned int> xid_rep_window_;
+ std::list<int> xid_rep_window_;
struct request {
request() { clear(); }
// has been sent; in that case buf points to a copy of the reply,
// and sz holds the size of the reply.
struct reply_t {
- reply_t (unsigned int _xid) {
+ reply_t (int _xid) {
xid = _xid;
cb_present = false;
buf = NULL;
sz = 0;
}
- reply_t (unsigned int _xid, char *_buf, int _sz) {
+ reply_t (int _xid, char *_buf, size_t _sz) {
xid = _xid;
cb_present = true;
buf = _buf;
sz = _sz;
}
- unsigned int xid;
+ int xid;
bool cb_present; // whether the reply buffer is valid
char *buf; // the reply buffer
- int sz; // the size of reply buffer
+ size_t sz; // the size of reply buffer
};
- int port_;
+ unsigned int port_;
unsigned int nonce_;
// provide at most once semantics by maintaining a window of replies
std::map<unsigned int, std::list<reply_t> > reply_window_;
void free_reply_window(void);
- void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
+ void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
- unsigned int xid, unsigned int rep_xid,
- char **b, int *sz);
+ int xid, int rep_xid,
+ char **b, size_t *sz);
void updatestat(unsigned int proc);
std::map<unsigned int, connection *> conns_;
// counting
- const int counting_;
- int curr_counts_;
- std::map<int, int> counts_;
+ const size_t counting_;
+ size_t curr_counts_;
+ std::map<unsigned int, size_t> counts_;
int lossytest_;
bool reachable_;
// map proc # to function
- std::map<int, handler *> procs_;
+ std::map<unsigned int, handler *> procs_;
std::mutex procs_m_; // protect insert/delete to procs[]
std::mutex count_m_; //protect modification of counts
protected:
struct djob_t {
- djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
+ djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
char *buf;
- int sz;
+ size_t sz;
connection *conn;
};
void dispatch(djob_t *);
tcpsconn* listener_;
public:
- rpcs(unsigned int port, int counts=0);
+ rpcs(unsigned int port, size_t counts=0);
~rpcs();
- inline int port() { return listener_->port(); }
+ inline unsigned int port() { return listener_->port(); }
//RPC handler for clients binding
- int rpcbind(int &r, int a);
+ int rpcbind(unsigned int &r, int a);
void set_reachable(bool r) { reachable_ = r; }
reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
-void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const char *host, const char *port,
- struct sockaddr_in *dst);
+void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst);
+void make_sockaddr(const std::string &host, const std::string &port, struct
+ sockaddr_in *dst);
#endif
#define NUM_CL 2
+char tprintf_thread_prefix = 'r';
+
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
struct sockaddr_in dst; //server's ip address
int handle_22(std::string & r, const std::string a, const std::string b);
int handle_fast(int &r, const int a);
int handle_slow(int &r, const int a);
- int handle_bigrep(std::string &r, const int a);
+ int handle_bigrep(std::string &r, const size_t a);
};
// a handler. a and b are arguments, r is the result.
}
int
-srv::handle_bigrep(std::string &r, const int len)
+srv::handle_bigrep(std::string &r, const size_t len)
{
- r = std::string(len, 'x');
+ r = std::string((size_t)len, 'x');
return 0;
}
void startserver()
{
- server = new rpcs(port);
+ server = new rpcs((unsigned int)port);
server->reg(22, &srv::handle_22, &service);
server->reg(23, &srv::handle_fast, &service);
server->reg(24, &srv::handle_slow, &service);
m << s;
char *b;
- int sz;
+ size_t sz;
m.take_buf(&b,&sz);
- VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
+ VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
unmarshall un(b,sz);
request_header rh1;
}
void
-client1(int cl)
+client1(size_t cl)
{
// test concurrency.
- int which_cl = ((unsigned long) cl ) % NUM_CL;
+ size_t which_cl = cl % NUM_CL;
for(int i = 0; i < 100; i++){
int arg = (random() % 2000);
int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
auto end = std::chrono::steady_clock::now();
- int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (ret != 0)
- printf("%d ms have elapsed!!!\n", diff);
+ printf("%d ms have elapsed!!!\n", (int)diff);
VERIFY(ret == 0);
VERIFY(rep == (which ? arg+1 : arg+2));
}
}
void
-client2(int cl)
+client2(size_t cl)
{
- int which_cl = ((unsigned long) cl ) % NUM_CL;
+ size_t which_cl = cl % NUM_CL;
time_t t1;
time(&t1);
// specify a timeout value to an RPC that should succeed (tcp)
{
std::string arg(1000, 'x');
- std::string rep;
- c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x");
- VERIFY(rep.size() == 1001);
+ std::string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
+ VERIFY(rep2.size() == 1001);
printf(" -- no spurious timeout .. ok\n");
}
}
void
-concurrent_test(int nt)
+concurrent_test(size_t nt)
{
// create threads that make lots of calls in parallel,
// to test thread synchronization for concurrent calls
// and dispatches.
- printf("start concurrent_test (%d threads) ...", nt);
+ printf("start concurrent_test (%lu threads) ...", nt);
std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
+
+ for(size_t i = 0; i < nt; i++)
th[i] = std::thread(client1, i);
- }
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
+
printf(" OK\n");
}
VERIFY(clients[i]->bind()==0);
}
- int nt = 1;
+ size_t nt = 1;
+
std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
+
+ for(size_t i = 0; i < nt; i++)
th[i] = std::thread(client2, i);
- }
- for(int i = 0; i < nt; i++){
+
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
+
printf(".. OK\n");
VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
- int nt = 10;
- printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
+ size_t nt = 10;
+ printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt);
std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
+
+ for(size_t i = 0; i < nt; i++)
th[i] = std::thread(client3, client);
- }
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
+
printf("ok\n");
delete server;
VERIFY (client->bind() >= 0);
printf(" -- delete existing rpc client and server, create replacements.. ok\n");
- printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
- for(int i = 0; i < nt; i++){
+ printf(" -- concurrent test on new client and server w/ %lu threads ..", nt);
+
+ for(size_t i = 0; i < nt; i++)
th[i] = std::thread(client3, client);
- }
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
+
printf("ok\n");
printf("failure_test OK\n");
bool isclient = false;
bool isserver = false;
- srandom(getpid());
+ srandom((uint32_t)getpid());
port = 20000 + (getpid() % 10000);
- char ch = 0;
+ int ch = 0;
while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
switch (ch) {
case 'c':
break;
case 'l':
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ break;
default:
break;
}
// if blocking, then addJob() blocks when queue is full
// otherwise, addJob() simply returns false when queue is full
-ThrPool::ThrPool(int sz, bool blocking)
+ThrPool::ThrPool(size_t sz, bool blocking)
: nthreads_(sz),blockadd_(blocking),jobq_(100*sz)
{
- for (int i=0; i<nthreads_; i++)
+ for (size_t i=0; i<nthreads_; i++)
th_.emplace_back(&ThrPool::do_worker, this);
}
// will ever use this thread pool again or is currently blocking on it
ThrPool::~ThrPool()
{
- for (int i=0; i<nthreads_; i++)
+ for (size_t i=0; i<nthreads_; i++)
jobq_.enq(job_t());
- for (int i=0; i<nthreads_; i++)
+ for (size_t i=0; i<nthreads_; i++)
th_[i].join();
}
class ThrPool {
public:
- ThrPool(int sz, bool blocking=true);
+ ThrPool(size_t sz, bool blocking=true);
~ThrPool();
bool addJob(const job_t &j);
private:
- int nthreads_;
+ size_t nthreads_;
bool blockadd_;
fifo<job_t> jobq_;
}
}
-template <class A>
-std::ostream & operator<<(std::ostream &o, const std::vector<A> &d) {
- o << "[";
- for (typename std::vector<A>::const_iterator i=d.begin(); i!=d.end(); i++) {
- o << *i;
- if (i+1 != d.end())
- o << ", ";
- }
- o << "]";
- return o;
-}
-
bool rsm::sync_with_backups() {
adopt_lock ml(rsm_mutex);
ml.unlock();
insync = true;
cfg->get_view(vid_insync, backups);
backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
- LOG("rsm::sync_with_backups " << backups);
+ LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
sync_cond.wait(ml);
insync = false;
return true;
public:
virtual std::string marshal_state() = 0;
virtual void unmarshal_state(std::string) = 0;
- virtual ~rsm_state_transfer() {};
+ virtual ~rsm_state_transfer() {}
};
#endif
extern int next_instance_num;
extern char tprintf_thread_prefix;
+template <class A>
+struct iterator_pair : public std::pair<A, A> {
+ explicit iterator_pair(const A & first, const A & second) : std::pair<A, A>(first, second) {}
+};
+
+template <class A>
+const struct iterator_pair<A> make_iterator_pair(const A & first, const A & second) {
+ return iterator_pair<A>(first, second);
+}
+
+template <class A, class B>
+std::ostream & operator<<(std::ostream &o, const std::pair<A,B> &d) {
+ o << "<" << d.first << "," << d.second << ">";
+ return o;
+}
+
+template <class A>
+std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
+ o << "[";
+ for (auto i=d.first; i!=d.second; i++) {
+ o << *i;
+ auto j(i);
+ if (++j != d.second)
+ o << ", ";
+ }
+ o << "]";
+ return o;
+}
+
#define LOG_PREFIX { \
cerr_mutex.lock(); \
- auto self = std::this_thread::get_id(); \
- int tid = thread_name_map[self]; \
- if (tid==0) \
- tid = thread_name_map[self] = ++next_thread_num; \
- auto utime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
- std::cerr << std::left << std::setw(9) << utime << " "; \
- std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \
- std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \
+ auto _thread_ = std::this_thread::get_id(); \
+ int _tid_ = thread_name_map[_thread_]; \
+ if (_tid_==0) \
+ _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
+ auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
+ std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
+ std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \
+ std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
}
#define LOG_THIS_POINTER { \
- int self = instance_name_map[this]; \
- if (self==0) \
- self = instance_name_map[this] = ++next_instance_num; \
- std::cerr << "#" << std::setw(2) << self; \
+ int _self_ = instance_name_map[this]; \
+ if (_self_==0) \
+ _self_ = instance_name_map[this] = ++next_instance_num; \
+ std::cerr << "#" << std::setw(2) << _self_; \
}
#define LOG_SUFFIX { \
cerr_mutex.unlock(); \
}
-#define LOG_NONMEMBER(x) { \
+#define LOG_NONMEMBER(_x_) { \
LOG_PREFIX; \
- std::cerr << x << std::endl; \
+ std::cerr << _x_ << std::endl; \
LOG_SUFFIX; \
}
-#define LOG(x) { \
+#define LOG(_x_) { \
LOG_PREFIX; \
LOG_THIS_POINTER; \
- std::cerr << x << std::endl; \
+ std::cerr << _x_ << std::endl; \
LOG_SUFFIX; \
}
-#define JOIN(from,to,sep) ({ \
- ostringstream oss; \
- for(auto i=from;i!=to;i++) \
- oss << *i << sep; \
- oss.str(); \
-})
#define LOG_FUNC_ENTER { \
LOG_PREFIX; \
LOG_THIS_POINTER; \
LOG_SUFFIX; \
}
-#define tprintf(args...) { \
- int len = snprintf(NULL, 0, args); \
- char buf[len+1]; \
- buf[len] = '\0'; \
- snprintf(buf, len+1, args); \
+#define tprintf(...) { \
+ char *buf = nullptr; \
+ int len = asprintf(&buf, __VA_ARGS__); \
if (buf[len-1]=='\n') \
buf[len-1] = '\0'; \
LOG_NONMEMBER(buf); \
+ free(buf); \
}
#endif