From: Peter Iannucci Date: Mon, 23 Sep 2013 14:07:40 +0000 (-0400) Subject: Cleanups X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/f2170465073de34adf89161d4287182b518352c4 Cleanups --- diff --git a/lock_client.cc b/lock_client.cc index d996b40..035d80b 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -39,7 +39,7 @@ void lock_state::signal(std::thread::id who) { c[who].notify_one(); } -int lock_client::last_port = 0; +unsigned int lock_client::last_port = 0; lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); @@ -53,11 +53,11 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) { make_sockaddr(xdst.c_str(), &dstsock); cl = new rpcc(dstsock); if (cl->bind() < 0) { - printf("lock_client: call bind\n"); + LOG("lock_client: call bind"); } - srand(time(NULL)^last_port); - rlock_port = ((rand()%32000) | (0x1 << 10)); + srandom((uint32_t)time(NULL)^last_port); + rlock_port = ((random()%32000) | (0x1 << 10)); const char *hname; // VERIFY(gethostname(hname, 100) == 0); hname = "127.0.0.1"; @@ -70,13 +70,13 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) { rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this); { lock sl(xid_mutex); - xid = 0; + next_xid = 0; } rsmc = new rsm_client(xdst); releaser_thread = std::thread(&lock_client::releaser, this); } -void lock_client::releaser() { +void lock_client::releaser() [[noreturn]] { while (1) { lock_protocol::lockid_t lid; release_fifo.deq(&lid); @@ -90,6 +90,8 @@ void lock_client::releaser() { sl.unlock(); int r; rsmc->call(lock_protocol::release, r, lid, id, st.xid); + if (lu) + lu->dorelease(lid); sl.lock(); } st.state = lock_state::none; @@ -123,8 +125,8 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { if (st.state == lock_state::none || st.state == lock_state::retrying) { if (st.state == lock_state::none) { - lock sl(xid_mutex); - st.xid = xid++; + lock l(xid_mutex); + st.xid = next_xid++; } st.state = lock_state::acquiring; LOG("Lock " << lid << ": acquiring"); @@ -216,7 +218,7 @@ rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_ return rlock_protocol::OK; } -rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { +rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) { lock_state &st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::acquiring); diff --git a/lock_client.h b/lock_client.h index 7b5edf6..541cc23 100644 --- a/lock_client.h +++ b/lock_client.h @@ -16,7 +16,7 @@ class lock_release_user { public: virtual void dorelease(lock_protocol::lockid_t) = 0; - virtual ~lock_release_user() {}; + virtual ~lock_release_user() {} }; using std::string; @@ -57,19 +57,19 @@ class lock_client { std::thread releaser_thread; rsm_client *rsmc; class lock_release_user *lu; - int rlock_port; + unsigned int rlock_port; string hostname; string id; mutex xid_mutex; - lock_protocol::xid_t xid; + lock_protocol::xid_t next_xid; fifo release_fifo; mutex lock_table_lock; lock_map lock_table; lock_state &get_lock_state(lock_protocol::lockid_t lid); public: - static int last_port; + static unsigned int last_port; lock_client(string xdst, class lock_release_user *l = 0); - ~lock_client() {}; + ~lock_client() {} lock_protocol::status acquire(lock_protocol::lockid_t); lock_protocol::status release(lock_protocol::lockid_t); int stat(lock_protocol::lockid_t); diff --git a/lock_demo.cc b/lock_demo.cc index 90f0047..3a85949 100644 --- a/lock_demo.cc +++ b/lock_demo.cc @@ -1,14 +1,5 @@ -// -// Lock demo -// - -#include "lock_protocol.h" #include "lock_client.h" -#include "rpc/rpc.h" -#include -#include -#include -#include +#include "tprintf.h" char tprintf_thread_prefix = 'd'; @@ -21,5 +12,5 @@ main(int argc, char *argv[]) } lock_client *lc = new lock_client(argv[1]); - printf ("stat returned %d\n", lc->stat("1")); + LOG_NONMEMBER("stat returned " << lc->stat("1")); } diff --git a/lock_server.cc b/lock_server.cc index f5a1fc4..a82231e 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -2,7 +2,6 @@ #include "lock_server.h" #include -#include #include #include #include "lang/verify.h" @@ -32,12 +31,6 @@ lock_state& lock_state::operator=(const lock_state& o) { return *this; } -template -ostringstream & operator<<(ostringstream &o, const pair &d) { - o << "<" << d.first << "," << d.second << ">"; - return o; -} - marshall & operator<<(marshall &m, const lock_state &d) { return m << d.held << d.held_by << d.wanted_by; } @@ -59,7 +52,7 @@ lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { rsm->set_state_transfer(this); } -void lock_server::revoker() { +void lock_server::revoker() [[noreturn]] { while (1) { lock_protocol::lockid_t lid; revoke_fifo.deq(&lid); @@ -87,7 +80,7 @@ void lock_server::revoker() { } } -void lock_server::retryer() { +void lock_server::retryer() [[noreturn]] { while (1) { lock_protocol::lockid_t lid; retry_fifo.deq(&lid); @@ -119,7 +112,7 @@ void lock_server::retryer() { } } -int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { +int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { LOG_FUNC_ENTER_SERVER; holder h = holder(id, xid); lock_state &st = get_lock_state(lid); @@ -168,7 +161,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr if (!found) st.wanted_by.push_back(h); - LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " ")); + LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end())); // send revoke if we're first in line if (st.wanted_by.front() == h) @@ -177,7 +170,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr return lock_protocol::RETRY; } -int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { +int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { LOG_FUNC_ENTER_SERVER; lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -206,7 +199,7 @@ void lock_server::unmarshal_state(string state) { } lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { - printf("stat request\n"); + LOG("stat request for " << lid); VERIFY(0); r = nacquire; return lock_protocol::OK; diff --git a/lock_smain.cc b/lock_smain.cc index 086186e..363f886 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -1,10 +1,9 @@ #include "rpc/rpc.h" #include #include -#include +#include "tprintf.h" #include #include "lock_server.h" -#include "paxos.h" #include "rsm.h" // Main loop of lock_server @@ -17,7 +16,7 @@ main(int argc, char *argv[]) setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); - srandom(getpid()); + srandom((uint32_t)getpid()); if(argc != 3){ fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); diff --git a/lock_tester.cc b/lock_tester.cc index 5c78c90..f4e68bd 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -81,10 +81,8 @@ test1(void) } void * -test2(void *x) +test2(int i) { - int i = * (int *) x; - tprintf ("test2: client %d acquire a release a\n", i); lc[i]->acquire(a); tprintf ("test2: client %d acquire done\n", i); @@ -98,10 +96,8 @@ test2(void *x) } void * -test3(void *x) +test3(int i) { - int i = * (int *) x; - tprintf ("test3: client %d acquire a release a concurrent\n", i); for (int j = 0; j < 10; j++) { lc[i]->acquire(a); @@ -114,10 +110,8 @@ test3(void *x) } void * -test4(void *x) +test4(int i) { - int i = * (int *) x; - tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i); for (int j = 0; j < 10; j++) { lc[0]->acquire(a); @@ -130,10 +124,8 @@ test4(void *x) } void * -test5(void *x) +test5(int i) { - int i = * (int *) x; - tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i); for (int j = 0; j < 10; j++) { if (i < 5) lc[0]->acquire(a); @@ -155,7 +147,7 @@ main(int argc, char *argv[]) setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); - srandom(getpid()); + srandom((uint32_t)getpid()); if(argc < 2) { fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]); @@ -181,53 +173,40 @@ main(int argc, char *argv[]) if(!test || test == 2){ // test2 - for (int i = 0; i < nt; i++) { - int *a = new int (i); - th[i] = std::thread(test2, a); - } - for (int i = 0; i < nt; i++) { + for (int i = 0; i < nt; i++) + th[i] = std::thread(test2, i); + for (int i = 0; i < nt; i++) th[i].join(); - } } if(!test || test == 3){ tprintf("test 3\n"); // test3 - for (int i = 0; i < nt; i++) { - int *a = new int (i); - th[i] = std::thread(test3, a); - } - for (int i = 0; i < nt; i++) { + for (int i = 0; i < nt; i++) + th[i] = std::thread(test3, i); + for (int i = 0; i < nt; i++) th[i].join(); - } } if(!test || test == 4){ tprintf("test 4\n"); // test 4 - for (int i = 0; i < 2; i++) { - int *a = new int (i); - th[i] = std::thread(test4, a); - } - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 2; i++) + th[i] = std::thread(test4, i); + for (int i = 0; i < 2; i++) th[i].join(); - } } if(!test || test == 5){ tprintf("test 5\n"); // test 5 - - for (int i = 0; i < nt; i++) { - int *a = new int (i); - th[i] = std::thread(test5, a); - } - for (int i = 0; i < nt; i++) { + for (int i = 0; i < nt; i++) + th[i] = std::thread(test5, i); + for (int i = 0; i < nt; i++) th[i].join(); - } } tprintf ("%s: passed all tests successfully\n", argv[0]); diff --git a/paxos.cc b/paxos.cc index b0ec640..83bf4f1 100644 --- a/paxos.cc +++ b/paxos.cc @@ -14,129 +14,119 @@ // paxos_commit to inform higher layers of the agreed value for this // instance. - -bool -operator> (const prop_t &a, const prop_t &b) -{ - return (a.n > b.n || (a.n == b.n && a.m > b.m)); +bool operator> (const prop_t &a, const prop_t &b) { + return (a.n > b.n || (a.n == b.n && a.m > b.m)); } -bool -operator>= (const prop_t &a, const prop_t &b) -{ - return (a.n > b.n || (a.n == b.n && a.m >= b.m)); +bool operator>= (const prop_t &a, const prop_t &b) { + return (a.n > b.n || (a.n == b.n && a.m >= b.m)); } std::string -print_members(const std::vector &nodes) -{ - std::string s; - s.clear(); - for (unsigned i = 0; i < nodes.size(); i++) { - s += nodes[i]; - if (i < (nodes.size()-1)) - s += ","; - } - return s; +print_members(const std::vector &nodes) { + std::string s; + s.clear(); + for (unsigned i = 0; i < nodes.size(); i++) { + s += nodes[i]; + if (i < (nodes.size()-1)) + s += ","; + } + return s; } -bool isamember(std::string m, const std::vector &nodes) -{ - for (unsigned i = 0; i < nodes.size(); i++) { - if (nodes[i] == m) return 1; - } - return 0; + +bool isamember(const std::string & m, const std::vector & nodes) { + for (auto n : nodes) { + if (n == m) + return 1; + } + return 0; } -bool -proposer::isrunning() -{ - bool r; - lock ml(pxs_mutex); - r = !stable; - return r; +bool proposer::isrunning() { + bool r; + lock ml(pxs_mutex); + r = !stable; + return r; } // check if the servers in l2 contains a majority of servers in l1 -bool -proposer::majority(const std::vector &l1, - const std::vector &l2) -{ - unsigned n = 0; +bool proposer::majority(const std::vector &l1, + const std::vector &l2) { + unsigned n = 0; - for (unsigned i = 0; i < l1.size(); i++) { - if (isamember(l1[i], l2)) - n++; - } - return n >= (l1.size() >> 1) + 1; + for (unsigned i = 0; i < l1.size(); i++) { + if (isamember(l1[i], l2)) + n++; + } + return n >= (l1.size() >> 1) + 1; } proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, - std::string _me) + const std::string &_me) : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { - my_n.n = 0; - my_n.m = me; + my_n.n = 0; + my_n.m = me; } -void -proposer::setn() +void proposer::setn() { - my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; + my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1; } -bool -proposer::run(int instance, std::vector cur_nodes, std::string newv) +bool proposer::run(unsigned instance, const std::vector & cur_nodes, + const std::string & newv) { - std::vector accepts; - std::vector nodes; - std::string v; - bool r = false; - - lock ml(pxs_mutex); - tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", - print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); - if (!stable) { // already running proposer? - tprintf("proposer::run: already running\n"); - return false; - } - stable = false; - setn(); - accepts.clear(); - v.clear(); - if (prepare(instance, accepts, cur_nodes, v)) { - - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of prepare responses\n"); - - if (v.size() == 0) - v = newv; - - breakpoint1(); - - nodes = accepts; - accepts.clear(); - accept(instance, accepts, nodes, v); - - if (majority(cur_nodes, accepts)) { - tprintf("paxos::manager: received a majority of accept responses\n"); - - breakpoint2(); - - decide(instance, accepts, v); - r = true; - } else { - tprintf("paxos::manager: no majority of accept responses\n"); - } + std::vector accepts; + std::vector nodes; + std::string v; + bool r = false; + + lock ml(pxs_mutex); + tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n", + print_members(cur_nodes).c_str(), instance, newv.c_str(), stable); + if (!stable) { // already running proposer? + tprintf("proposer::run: already running\n"); + return false; + } + stable = false; + setn(); + accepts.clear(); + v.clear(); + if (prepare(instance, accepts, cur_nodes, v)) { + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of prepare responses\n"); + + if (v.size() == 0) + v = newv; + + breakpoint1(); + + nodes = accepts; + accepts.clear(); + accept(instance, accepts, nodes, v); + + if (majority(cur_nodes, accepts)) { + tprintf("paxos::manager: received a majority of accept responses\n"); + + breakpoint2(); + + decide(instance, accepts, v); + r = true; + } else { + tprintf("paxos::manager: no majority of accept responses\n"); + } + } else { + tprintf("paxos::manager: no majority of prepare responses\n"); + } } else { - tprintf("paxos::manager: no majority of prepare responses\n"); + tprintf("paxos::manager: prepare is rejected %d\n", stable); } - } else { - tprintf("paxos::manager: prepare is rejected %d\n", stable); - } - stable = true; - return r; + stable = true; + return r; } // proposer::run() calls prepare to send prepare RPCs to nodes @@ -145,16 +135,16 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector &accepts, - std::vector nodes, - std::string &v) +proposer::prepare(unsigned instance, std::vector & accepts, + const std::vector & nodes, + std::string & v) { struct paxos_protocol::preparearg arg = { instance, my_n }; struct paxos_protocol::prepareres res; prop_t n_a = { 0, "" }; rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); + for (auto i : nodes) { + handle h(i); if (!(r = h.safebind())) continue; int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); @@ -165,7 +155,7 @@ proposer::prepare(unsigned instance, std::vector &accepts, return false; } if (res.accept) { - accepts.push_back(*i); + accepts.push_back(i); if (res.n_a >= n_a) { tprintf("found a newer accepted proposal\n"); v = res.v_a; @@ -180,32 +170,30 @@ proposer::prepare(unsigned instance, std::vector &accepts, // run() calls this to send out accept RPCs to accepts. // fill in accepts with list of nodes that accepted. void -proposer::accept(unsigned instance, std::vector &accepts, - std::vector nodes, std::string v) +proposer::accept(unsigned instance, std::vector & accepts, + const std::vector & nodes, const std::string & v) { struct paxos_protocol::acceptarg arg = { instance, my_n, v }; rpcc *r; - for (std::vector::iterator i=nodes.begin(); i!=nodes.end(); i++) { - handle h(*i); + for (auto i : nodes) { + handle h(i); if (!(r = h.safebind())) continue; bool accept = false; int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); - if (status == paxos_protocol::OK) { - if (accept) - accepts.push_back(*i); - } + if (status == paxos_protocol::OK && accept) + accepts.push_back(i); } } void -proposer::decide(unsigned instance, std::vector accepts, - std::string v) +proposer::decide(unsigned instance, const std::vector & accepts, + const std::string & v) { struct paxos_protocol::decidearg arg = { instance, v }; rpcc *r; - for (std::vector::iterator i=accepts.begin(); i!=accepts.end(); i++) { - handle h(*i); + for (auto i : accepts) { + handle h(i); if (!(r = h.safebind())) continue; int res = 0; @@ -213,32 +201,33 @@ proposer::decide(unsigned instance, std::vector accepts, } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, - std::string _value) +acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me, + const std::string & _value) : cfg(_cfg), me (_me), instance_h(0) { - n_h.n = 0; - n_h.m = me; - n_a.n = 0; - n_a.m = me; - v_a.clear(); - - l = new log (this, me); - - if (instance_h == 0 && _first) { - values[1] = _value; - l->loginstance(1, _value); - instance_h = 1; - } - - pxs = new rpcs(atoi(_me.c_str())); - pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); - pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); - pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); + n_h.n = 0; + n_h.m = me; + n_a.n = 0; + n_a.m = me; + v_a.clear(); + + l = new log (this, me); + + if (instance_h == 0 && _first) { + values[1] = _value; + l->loginstance(1, _value); + instance_h = 1; + } + + pxs = new rpcs((uint32_t)std::stoi(_me)); + pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); + pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); + pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a) +acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &, + paxos_protocol::preparearg a) { lock ml(pxs_mutex); r.oldinstance = false; @@ -259,7 +248,7 @@ acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_proto } paxos_protocol::status -acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) +acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a) { lock ml(pxs_mutex); r = false; @@ -272,16 +261,16 @@ acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) return paxos_protocol::OK; } -// the src argument is only for debug purpose - paxos_protocol::status -acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) +// the src argument is only for debugging +paxos_protocol::status +acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a) { lock ml(pxs_mutex); tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", a.instance, instance_h, v_a.c_str()); if (a.instance == instance_h + 1) { VERIFY(v_a == a.v); - commit_wo(a.instance, v_a); + commit(a.instance, v_a, ml); } else if (a.instance <= instance_h) { // we are ahead ignore. } else { @@ -292,10 +281,8 @@ acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) } void -acceptor::commit_wo(unsigned instance, std::string value) +acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock) { - //assume pxs_mutex is held - adopt_lock ml(pxs_mutex); tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str()); if (instance > instance_h) { tprintf("commit: highestaccepteinstance = %d\n", instance); @@ -308,31 +295,31 @@ acceptor::commit_wo(unsigned instance, std::string value) n_a.m = me; v_a.clear(); if (cfg) { - ml.unlock(); + pxs_mutex_lock.unlock(); cfg->paxos_commit(instance, value); - ml.lock(); + pxs_mutex_lock.lock(); } } } void -acceptor::commit(unsigned instance, std::string value) +acceptor::commit(unsigned instance, const std::string & value) { lock ml(pxs_mutex); - commit_wo(instance, value); + commit(instance, value, ml); } std::string acceptor::dump() { - return l->dump(); + return l->dump(); } void -acceptor::restore(std::string s) +acceptor::restore(const std::string & s) { - l->restore(s); - l->logread(); + l->restore(s); + l->logread(); } @@ -343,30 +330,30 @@ acceptor::restore(std::string s) void proposer::breakpoint1() { - if (break1) { - tprintf("Dying at breakpoint 1!\n"); - exit(1); - } + if (break1) { + tprintf("Dying at breakpoint 1!\n"); + exit(1); + } } // Call this from your code between phases accept and decide of proposer void proposer::breakpoint2() { - if (break2) { - tprintf("Dying at breakpoint 2!\n"); - exit(1); - } + if (break2) { + tprintf("Dying at breakpoint 2!\n"); + exit(1); + } } void proposer::breakpoint(int b) { - if (b == 3) { - tprintf("Proposer: breakpoint 1\n"); - break1 = true; - } else if (b == 4) { - tprintf("Proposer: breakpoint 2\n"); - break2 = true; - } + if (b == 3) { + tprintf("Proposer: breakpoint 1\n"); + break1 = true; + } else if (b == 4) { + tprintf("Proposer: breakpoint 2\n"); + break2 = true; + } } diff --git a/paxos.h b/paxos.h index 170292a..9650de1 100644 --- a/paxos.h +++ b/paxos.h @@ -6,91 +6,92 @@ #include "rpc/rpc.h" #include "paxos_protocol.h" #include "log.h" +#include "lock.h" class paxos_change { - public: - virtual void paxos_commit(unsigned instance, const std::string &v) = 0; - virtual ~paxos_change() {}; + public: + virtual void paxos_commit(unsigned instance, const std::string & v) = 0; + virtual ~paxos_change() {} }; class acceptor { - private: - log *l; - rpcs *pxs; - paxos_change *cfg; - std::string me; - std::mutex pxs_mutex; - - // Acceptor state - prop_t n_h; // number of the highest proposal seen in a prepare - prop_t n_a; // number of highest proposal accepted - std::string v_a; // value of highest proposal accepted - unsigned instance_h; // number of the highest instance we have decided - std::map values; // vals of each instance - - void commit_wo(unsigned instance, std::string v); - paxos_protocol::status preparereq(paxos_protocol::prepareres &r, - std::string src, paxos_protocol::preparearg a); - paxos_protocol::status acceptreq(bool &r, std::string src, - paxos_protocol::acceptarg a); - paxos_protocol::status decidereq(int &r, std::string src, - paxos_protocol::decidearg a); - - friend class log; - - public: - acceptor(class paxos_change *cfg, bool _first, std::string _me, - std::string _value); - ~acceptor() {}; - void commit(unsigned instance, std::string v); - unsigned instance() { return instance_h; } - std::string value(unsigned instance) { return values[instance]; } - std::string dump(); - void restore(std::string); - rpcs *get_rpcs() { return pxs; }; - prop_t get_n_h() { return n_h; }; - unsigned get_instance_h() { return instance_h; }; + private: + log *l; + rpcs *pxs; + paxos_change *cfg; + std::string me; + mutex pxs_mutex; + + // Acceptor state + prop_t n_h; // number of the highest proposal seen in a prepare + prop_t n_a; // number of highest proposal accepted + std::string v_a; // value of highest proposal accepted + unsigned instance_h; // number of the highest instance we have decided + std::map values; // vals of each instance + + void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock); + paxos_protocol::status preparereq(paxos_protocol::prepareres & r, + const std::string & src, paxos_protocol::preparearg a); + paxos_protocol::status acceptreq(bool & r, const std::string & src, + paxos_protocol::acceptarg a); + paxos_protocol::status decidereq(int & r, const std::string & src, + paxos_protocol::decidearg a); + + friend class log; + + public: + acceptor(class paxos_change *cfg, bool _first, const std::string & _me, + const std::string & _value); + ~acceptor() {} + void commit(unsigned instance, const std::string & v); + unsigned instance() { return instance_h; } + const std::string & value(unsigned instance) { return values[instance]; } + std::string dump(); + void restore(const std::string &); + rpcs *get_rpcs() { return pxs; } + prop_t get_n_h() { return n_h; } + unsigned get_instance_h() { return instance_h; } }; -extern bool isamember(std::string m, const std::vector &nodes); -extern std::string print_members(const std::vector &nodes); +extern bool isamember(const std::string & m, const std::vector & nodes); +extern std::string print_members(const std::vector & nodes); class proposer { - private: - log *l; - paxos_change *cfg; - acceptor *acc; - std::string me; - bool break1; - bool break2; - - std::mutex pxs_mutex; - - // Proposer state - bool stable; - prop_t my_n; // number of the last proposal used in this instance - - void setn(); - bool prepare(unsigned instance, std::vector &accepts, - std::vector nodes, - std::string &v); - void accept(unsigned instance, std::vector &accepts, - std::vector nodes, std::string v); - void decide(unsigned instance, std::vector accepts, - std::string v); - - void breakpoint1(); - void breakpoint2(); - bool majority(const std::vector &l1, const std::vector &l2); - - friend class log; - public: - proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me); - ~proposer() {}; - bool run(int instance, std::vector cnodes, std::string v); - bool isrunning(); - void breakpoint(int b); + private: + log *l; + paxos_change *cfg; + acceptor *acc; + std::string me; + bool break1; + bool break2; + + mutex pxs_mutex; + + // Proposer state + bool stable; + prop_t my_n; // number of the last proposal used in this instance + + void setn(); + bool prepare(unsigned instance, std::vector & accepts, + const std::vector & nodes, + std::string & v); + void accept(unsigned instance, std::vector & accepts, + const std::vector & nodes, const std::string & v); + void decide(unsigned instance, const std::vector & accepts, + const std::string & v); + + void breakpoint1(); + void breakpoint2(); + bool majority(const std::vector & l1, const std::vector & l2); + + friend class log; + public: + proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me); + ~proposer() {} + bool run(unsigned instance, const std::vector & cnodes, const std::string & v); + bool isrunning(); + void breakpoint(int b); }; diff --git a/rpc/fifo.h b/rpc/fifo.h index 93a79cf..dde514d 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -8,20 +8,20 @@ template class fifo { public: - fifo(int limit=0) : max_(limit) {}; + fifo(size_t limit=0) : max_(limit) {} bool enq(T, bool blocking=true); void deq(T *); bool size() { lock ml(m_); return q_.size(); - }; + } private: std::list q_; mutex m_; cond non_empty_c_; // q went non-empty cond has_space_c_; // q is not longer overfull - unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit + size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit }; template bool diff --git a/rpc/marshall.h b/rpc/marshall.h index 7a85d6b..676a682 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -13,31 +13,42 @@ #include "lang/verify.h" struct request_header { - request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) : + request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) : xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} int xid; int proc; unsigned int clt_nonce; unsigned int srv_nonce; int xid_rep; - request_header hton() const { - return { - htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep) - }; - } }; struct reply_header { reply_header(int x=0, int r=0): xid(x), ret(r) {} int xid; int ret; - reply_header hton() const { - return { - htonl(xid), htonl(ret) - }; - } }; +template inline T hton(T t); + +constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1}; + +template<> inline uint8_t hton(uint8_t t) { return t; } +template<> inline int8_t hton(int8_t t) { return t; } +template<> inline uint16_t hton(uint16_t t) { return htons(t); } +template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); } +template<> inline uint32_t hton(uint32_t t) { return htonl(t); } +template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); } +template<> inline uint64_t hton(uint64_t t) { + if (!endianness.is_little_endian) + return t; + return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32); +} +template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); } +template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; } +template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; } + +template inline T ntoh(T t) { return hton(t); } + typedef int rpc_sz_t; //size of initial buffer allocation @@ -76,16 +87,16 @@ class marshall { free(buf_); } - int size() { return index_;} + size_t size() { return index_;} char *cstr() { return buf_;} const char *cstr() const { return buf_;} - void rawbyte(unsigned char x) { + void rawbyte(uint8_t x) { reserve(1); - buf_[index_++] = x; + buf_[index_++] = (int8_t)x; } - void rawbytes(const char *p, int n) { + void rawbytes(const char *p, size_t n) { reserve(n); memcpy(buf_+index_, p, n); index_ += n; @@ -104,7 +115,7 @@ class marshall { void pack_req_header(const request_header &h); void pack_reply_header(const reply_header &h); - void take_buf(char **b, int *s) { + void take_buf(char **b, size_t *s) { *b = buf_; *s = index_; buf_ = NULL; @@ -114,13 +125,13 @@ class marshall { }; marshall& operator<<(marshall &, bool); -marshall& operator<<(marshall &, unsigned int); -marshall& operator<<(marshall &, int); -marshall& operator<<(marshall &, unsigned char); -marshall& operator<<(marshall &, char); -marshall& operator<<(marshall &, unsigned short); -marshall& operator<<(marshall &, short); -marshall& operator<<(marshall &, unsigned long long); +marshall& operator<<(marshall &, uint32_t); +marshall& operator<<(marshall &, int32_t); +marshall& operator<<(marshall &, uint8_t); +marshall& operator<<(marshall &, int8_t); +marshall& operator<<(marshall &, uint16_t); +marshall& operator<<(marshall &, int16_t); +marshall& operator<<(marshall &, uint64_t); marshall& operator<<(marshall &, const std::string &); template marshall & @@ -138,17 +149,31 @@ operator<<(marshall &m, const std::pair &d) { return m; } +class unmarshall; + +unmarshall& operator>>(unmarshall &, bool &); +unmarshall& operator>>(unmarshall &, uint8_t &); +unmarshall& operator>>(unmarshall &, int8_t &); +unmarshall& operator>>(unmarshall &, uint16_t &); +unmarshall& operator>>(unmarshall &, int16_t &); +unmarshall& operator>>(unmarshall &, uint32_t &); +unmarshall& operator>>(unmarshall &, int32_t &); +unmarshall& operator>>(unmarshall &, size_t &); +unmarshall& operator>>(unmarshall &, uint64_t &); +unmarshall& operator>>(unmarshall &, int64_t &); +unmarshall& operator>>(unmarshall &, std::string &); + class unmarshall { private: char *buf_; - int sz_; - int index_; + size_t sz_; + size_t index_; bool ok_; inline bool ensure(size_t n); public: unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {} - unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {} + unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {} unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false) { //take the content which does not exclude a RPC header from a string @@ -175,13 +200,13 @@ class unmarshall { char *cstr() { return buf_;} bool okdone() const { return ok_ && index_ == sz_; } - unsigned int rawbyte(); + uint8_t rawbyte(); void rawbytes(std::string &s, size_t n); + template void rawbytes(T &t); - int ind() { return index_;} - int size() { return sz_;} - void unpack(int *); //non-const ref - void take_buf(char **b, int *sz) { + size_t ind() { return index_;} + size_t size() { return sz_;} + void take_buf(char **b, size_t *sz) { *b = buf_; *sz = sz_; sz_ = index_ = 0; @@ -191,19 +216,14 @@ class unmarshall { void unpack_req_header(request_header *h) { //the first 4-byte is for channel to fill size of pdu index_ = sizeof(rpc_sz_t); - unpack(&h->xid); - unpack(&h->proc); - unpack((int *)&h->clt_nonce); - unpack((int *)&h->srv_nonce); - unpack(&h->xid_rep); + *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep; index_ = RPC_HEADER_SZ; } void unpack_reply_header(reply_header *h) { //the first 4-byte is for channel to fill size of pdu index_ = sizeof(rpc_sz_t); - unpack(&h->xid); - unpack(&h->ret); + *this >> h->xid >> h->ret; index_ = RPC_HEADER_SZ; } diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 33aeae2..919a286 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -10,7 +10,7 @@ PollMgr *PollMgr::instance = NULL; static std::once_flag pollmgr_is_initialized; -void +static void PollMgrInit() { PollMgr::instance = new PollMgr(); @@ -32,7 +32,7 @@ PollMgr::PollMgr() : pending_change_(false) th_ = std::thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() +PollMgr::~PollMgr() [[noreturn]] { //never kill me!!! VERIFY(0); @@ -83,7 +83,7 @@ PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) } void -PollMgr::wait_loop() +PollMgr::wait_loop() [[noreturn]] { std::vector readable; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index f0c4b77..5e43547 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -64,12 +64,13 @@ #include "lock.h" #include "jsl_log.h" +#include "tprintf.h" #include "lang/verify.h" const rpcc::TO rpcc::to_max = { 120000 }; const rpcc::TO rpcc::to_min = { 1000 }; -rpcc::caller::caller(unsigned int xxid, unmarshall *xun) +rpcc::caller::caller(int xxid, unmarshall *xun) : xid(xxid), un(xun), done(false) { } @@ -82,7 +83,7 @@ inline void set_rand_seed() { auto now = std::chrono::time_point_cast(std::chrono::steady_clock::now()); - srandom((int)now.time_since_epoch().count()^((int)getpid())); + srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); } rpcc::rpcc(sockaddr_in d, bool retrans) : @@ -91,7 +92,7 @@ rpcc::rpcc(sockaddr_in d, bool retrans) : { if(retrans){ set_rand_seed(); - clt_nonce_ = random(); + clt_nonce_ = (unsigned int)random(); } else { // special client nonce 0 means this client does not // require at-most-once logic from the server @@ -127,7 +128,7 @@ rpcc::~rpcc() int rpcc::bind(TO to) { - int r; + unsigned int r; int ret = call_timeout(rpc_const::bind, to, r, 0); if(ret == 0){ lock ml(m_); @@ -145,10 +146,9 @@ rpcc::bind(TO to) rpcc::cancel(void) { lock ml(m_); - printf("rpcc::cancel: force callers to fail\n"); - std::map::iterator iter; - for(iter = calls_.begin(); iter != calls_.end(); iter++){ - caller *ca = iter->second; + tprintf("rpcc::cancel: force callers to fail"); + for(auto &p : calls_){ + caller *ca = p.second; jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); { @@ -163,7 +163,7 @@ rpcc::cancel(void) destroy_wait_ = true; destroy_wait_c_.wait(ml); } - printf("rpcc::cancel: done\n"); + tprintf("rpcc::cancel: done"); } int @@ -172,7 +172,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, { caller ca(0, &rep); - int xid_rep; + int xid_rep; { lock ml(m_); @@ -189,7 +189,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); + req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); xid_rep = xid_rep_window_.front(); } @@ -223,7 +223,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, } else jsl_log(JSL_DBG_1, "not reachable\n"); jsl_log(JSL_DBG_2, - "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", + "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n", clt_nonce_, proc, ca.xid, clt_nonce_); } transmit = false; // only send once on a given channel @@ -289,7 +289,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, lock cal(ca.m); jsl_log(JSL_DBG_2, - "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", + "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n", clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), ntohs(dst_.sin_port), ca.done, ca.intret); @@ -324,7 +324,7 @@ rpcc::get_refconn(connection **ch) // // this function keeps no reference for connection *c bool -rpcc::got_pdu(connection *c, char *b, size_t sz) +rpcc::got_pdu(connection *, char *b, size_t sz) { unmarshall rep(b, sz); reply_header h; @@ -361,15 +361,13 @@ rpcc::got_pdu(connection *c, char *b, size_t sz) // assumes thread holds mutex m void -rpcc::update_xid_rep(unsigned int xid) +rpcc::update_xid_rep(int xid) { - std::list::iterator it; - if(xid <= xid_rep_window_.front()){ return; } - for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ + for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ if(*it > xid){ xid_rep_window_.insert(it, xid); goto compress; @@ -378,18 +376,18 @@ rpcc::update_xid_rep(unsigned int xid) xid_rep_window_.push_back(xid); compress: - it = xid_rep_window_.begin(); + auto it = xid_rep_window_.begin(); for (it++; it != xid_rep_window_.end(); it++){ while (xid_rep_window_.front() + 1 == *it) xid_rep_window_.pop_front(); } } -rpcs::rpcs(unsigned int p1, int count) +rpcs::rpcs(unsigned int p1, size_t count) : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) { set_rand_seed(); - nonce_ = random(); + nonce_ = (unsigned int)random(); jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); char *loss_env = getenv("RPC_LOSSY"); @@ -445,23 +443,20 @@ rpcs::updatestat(unsigned int proc) counts_[proc]++; curr_counts_--; if(curr_counts_ == 0){ - std::map::iterator i; - printf("RPC STATS: "); - for (i = counts_.begin(); i != counts_.end(); i++){ - printf("%x:%d ", i->first, i->second); - } - printf("\n"); + tprintf("RPC STATS: "); + for (auto i = counts_.begin(); i != counts_.end(); i++) + tprintf("%x:%lu ", i->first, i->second); lock rwl(reply_window_m_); std::map >::iterator clt; - unsigned int totalrep = 0, maxrep = 0; + size_t totalrep = 0, maxrep = 0; for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ totalrep += clt->second.size(); if(clt->second.size() > maxrep) maxrep = clt->second.size(); } - jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", + jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n", (int) reply_window_.size()-1, totalrep, maxrep); curr_counts_ = counting_; } @@ -476,7 +471,7 @@ rpcs::dispatch(djob_t *j) request_header h; req.unpack_req_header(&h); - int proc = h.proc; + unsigned int proc = (unsigned int)h.proc; if(!req.ok()){ jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); @@ -485,7 +480,7 @@ rpcs::dispatch(djob_t *j) } jsl_log(JSL_DBG_2, - "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", + "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n", h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); marshall rep; @@ -518,8 +513,8 @@ rpcs::dispatch(djob_t *j) } rpcs::rpcstate_t stat; - char *b1; - int sz1; + char *b1 = nullptr; + size_t sz1 = 0; if(h.clt_nonce){ // have i seen this client before? @@ -575,7 +570,7 @@ rpcs::dispatch(djob_t *j) rep.take_buf(&b1,&sz1); jsl_log(JSL_DBG_2, - "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", + "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n", sz1, h.xid, proc, rh.ret, h.clt_nonce); if(h.clt_nonce > 0){ @@ -605,7 +600,7 @@ rpcs::dispatch(djob_t *j) c->send(b1, sz1); break; case FORGOTTEN: // very old request and we don't have the response anymore - jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", + jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n", h.xid, h.clt_nonce); rh.ret = rpc_const::atmostonce_failure; rep.pack_reply_header(rh); @@ -630,8 +625,8 @@ rpcs::dispatch(djob_t *j) // DONE: seen this xid, previous reply returned in *b and *sz. // FORGOTTEN: might have seen this xid, but deleted previous reply. rpcs::rpcstate_t -rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, - unsigned int xid_rep, char **b, int *sz) +rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, + int xid_rep, char **b, size_t *sz) { lock rwl(reply_window_m_); @@ -640,12 +635,12 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, VERIFY(l.size() > 0); VERIFY(xid >= xid_rep); - unsigned int past_xid_rep = l.begin()->xid; + int past_xid_rep = l.begin()->xid; std::list::iterator start = l.begin(), it; it = ++start; - if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) { + if (past_xid_rep < xid_rep || past_xid_rep == -1) { // scan for deletion candidates for (; it != l.end() && it->xid < xid_rep; it++) { if (it->cb_present) @@ -655,7 +650,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, l.begin()->xid = xid_rep; } - if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1) + if (xid < past_xid_rep && past_xid_rep != -1) return FORGOTTEN; // skip non-deletion candidates @@ -685,8 +680,8 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, // free_reply_window() and checkduplicate_and_update is responsible for // calling free(b). void -rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, - char *b, int sz) +rpcs::add_reply(unsigned int clt_nonce, int xid, + char *b, size_t sz) { lock rwl(reply_window_m_); // remember the RPC reply value @@ -706,12 +701,9 @@ rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, void rpcs::free_reply_window(void) { - std::map >::iterator clt; - std::list::iterator it; - lock rwl(reply_window_m_); - for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ - for (it = clt->second.begin(); it != clt->second.end(); it++){ + for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + for (auto it = clt->second.begin(); it != clt->second.end(); it++){ if (it->cb_present) free(it->buf); } @@ -722,7 +714,7 @@ rpcs::free_reply_window(void) // rpc handler int -rpcs::rpcbind(int &r, int a) +rpcs::rpcbind(unsigned int &r, int) { jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); r = nonce_; @@ -737,22 +729,22 @@ operator<<(marshall &m, uint8_t x) { marshall & operator<<(marshall &m, uint16_t x) { - x = htons(x); + x = hton(x); m.rawbytes((char *)&x, 2); return m; } marshall & operator<<(marshall &m, uint32_t x) { - x = htonl(x); + x = hton(x); m.rawbytes((char *)&x, 4); return m; } -marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; } -marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; } +marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; } +marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; } marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; } -marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; } +marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; } marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; } marshall & @@ -763,7 +755,7 @@ operator<<(marshall &m, const std::string &s) { } void marshall::pack_req_header(const request_header &h) { - int saved_sz = index_; + size_t saved_sz = index_; //leave the first 4-byte empty for channel to fill size of pdu index_ = sizeof(rpc_sz_t); *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep; @@ -771,22 +763,13 @@ void marshall::pack_req_header(const request_header &h) { } void marshall::pack_reply_header(const reply_header &h) { - int saved_sz = index_; + size_t saved_sz = index_; //leave the first 4-byte empty for channel to fill size of pdu index_ = sizeof(rpc_sz_t); *this << h.xid << h.ret; index_ = saved_sz; } -void -unmarshall::unpack(int *x) -{ - (*x) = (rawbyte() & 0xff) << 24; - (*x) |= (rawbyte() & 0xff) << 16; - (*x) |= (rawbyte() & 0xff) << 8; - (*x) |= rawbyte() & 0xff; -} - // take the contents from another unmarshall object void unmarshall::take_in(unmarshall &another) @@ -805,12 +788,12 @@ unmarshall::ensure(size_t n) { return ok_; } -unsigned int +inline uint8_t unmarshall::rawbyte() { if (!ensure(1)) return 0; - return buf_[index_++]; + return (uint8_t)buf_[index_++]; } void @@ -821,78 +804,29 @@ unmarshall::rawbytes(std::string &ss, size_t n) index_ += n; } -unmarshall & -operator>>(unmarshall &u, bool &x) -{ - x = (bool)u.rawbyte(); - return u; -} - -unmarshall & -operator>>(unmarshall &u, unsigned char &x) -{ - x = (unsigned char)u.rawbyte(); - return u; -} - -unmarshall & -operator>>(unmarshall &u, char &x) -{ - x = (char)u.rawbyte(); - return u; -} - -unmarshall & -operator>>(unmarshall &u, unsigned short &x) -{ - x = (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; -} - -unmarshall & -operator>>(unmarshall &u, short &x) -{ - x = (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; -} - -unmarshall & -operator>>(unmarshall &u, unsigned int &x) -{ - x = (u.rawbyte() & 0xff) << 24; - x |= (u.rawbyte() & 0xff) << 16; - x |= (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; -} - -unmarshall & -operator>>(unmarshall &u, int &x) -{ - x = (u.rawbyte() & 0xff) << 24; - x |= (u.rawbyte() & 0xff) << 16; - x |= (u.rawbyte() & 0xff) << 8; - x |= u.rawbyte() & 0xff; - return u; -} - -unmarshall & -operator>>(unmarshall &u, unsigned long long &x) +template +void +unmarshall::rawbytes(T &t) { - unsigned int h, l; - u >> h; - u >> l; - x = l | ((unsigned long long) h << 32); - return u; + const size_t n = sizeof(T); + VERIFY(ensure(n)); + memcpy(&t, buf_+index_, n); + t = ntoh(t); + index_ += n; } -unmarshall & -operator>>(unmarshall &u, std::string &s) -{ - unsigned sz; - u >> sz; +unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; } +unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; } +unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; } +unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes(xx); x = xx; return u; } +unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes(x); return u; } +unmarshall & operator>>(unmarshall &u, std::string &s) { + unsigned sz = u.grab(); if(u.ok()) u.rawbytes(s, sz); return u; @@ -906,42 +840,32 @@ bool operator<(const sockaddr_in &a, const sockaddr_in &b){ /*---------------auxilary function--------------*/ void -make_sockaddr(const char *hostandport, struct sockaddr_in *dst){ - - char host[200]; - const char *localhost = "127.0.0.1"; - const char *port = index(hostandport, ':'); - if(port == NULL){ - memcpy(host, localhost, strlen(localhost)+1); - port = hostandport; - } else { - memcpy(host, hostandport, port-hostandport); - host[port-hostandport] = '\0'; - port++; - } - - make_sockaddr(host, port, dst); - +make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) { + auto colon = hostandport.find(':'); + if (colon == std::string::npos) + make_sockaddr("127.0.0.1", hostandport, dst); + else + make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst); } void -make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){ - - in_addr_t a; - +make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) { bzero(dst, sizeof(*dst)); dst->sin_family = AF_INET; - a = inet_addr(host); - if(a != INADDR_NONE){ - dst->sin_addr.s_addr = a; - } else { - struct hostent *hp = gethostbyname(host); - if(hp == 0 || hp->h_length != 4){ - fprintf(stderr, "cannot find host name %s\n", host); + struct in_addr a{inet_addr(host.c_str())}; + + if(a.s_addr != INADDR_NONE) + dst->sin_addr.s_addr = a.s_addr; + else { + struct hostent *hp = gethostbyname(host.c_str()); + + if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) { + fprintf(stderr, "cannot find host name %s\n", host.c_str()); exit(1); } - dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; + memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t)); + dst->sin_addr.s_addr = a.s_addr; } - dst->sin_port = htons(atoi(port)); + dst->sin_port = hton((uint16_t)std::stoi(port)); } diff --git a/rpc/rpc.h b/rpc/rpc.h index 1348dc8..c0420a5 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -36,10 +36,10 @@ class rpcc : public chanmgr { //manages per rpc info struct caller { - caller(unsigned int xxid, unmarshall *un); + caller(int xxid, unmarshall *un); ~caller(); - unsigned int xid; + int xid; unmarshall *un; int intret; bool done; @@ -48,14 +48,14 @@ class rpcc : public chanmgr { }; void get_refconn(connection **ch); - void update_xid_rep(unsigned int xid); + void update_xid_rep(int xid); sockaddr_in dst_; unsigned int clt_nonce_; unsigned int srv_nonce_; bool bind_done_; - unsigned int xid_; + int xid_; int lossytest_; bool retrans_; bool reachable_; @@ -69,7 +69,7 @@ class rpcc : public chanmgr { std::condition_variable destroy_wait_c_; std::map calls_; - std::list xid_rep_window_; + std::list xid_rep_window_; struct request { request() { clear(); } @@ -167,25 +167,25 @@ class rpcs : public chanmgr { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (unsigned int _xid) { + reply_t (int _xid) { xid = _xid; cb_present = false; buf = NULL; sz = 0; } - reply_t (unsigned int _xid, char *_buf, int _sz) { + reply_t (int _xid, char *_buf, size_t _sz) { xid = _xid; cb_present = true; buf = _buf; sz = _sz; } - unsigned int xid; + int xid; bool cb_present; // whether the reply buffer is valid char *buf; // the reply buffer - int sz; // the size of reply buffer + size_t sz; // the size of reply buffer }; - int port_; + unsigned int port_; unsigned int nonce_; // provide at most once semantics by maintaining a window of replies @@ -194,11 +194,11 @@ class rpcs : public chanmgr { std::map > reply_window_; void free_reply_window(void); - void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); + void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz); rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - unsigned int xid, unsigned int rep_xid, - char **b, int *sz); + int xid, int rep_xid, + char **b, size_t *sz); void updatestat(unsigned int proc); @@ -206,15 +206,15 @@ class rpcs : public chanmgr { std::map conns_; // counting - const int counting_; - int curr_counts_; - std::map counts_; + const size_t counting_; + size_t curr_counts_; + std::map counts_; int lossytest_; bool reachable_; // map proc # to function - std::map procs_; + std::map procs_; std::mutex procs_m_; // protect insert/delete to procs[] std::mutex count_m_; //protect modification of counts @@ -225,9 +225,9 @@ class rpcs : public chanmgr { protected: struct djob_t { - djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {} char *buf; - int sz; + size_t sz; connection *conn; }; void dispatch(djob_t *); @@ -239,11 +239,11 @@ class rpcs : public chanmgr { tcpsconn* listener_; public: - rpcs(unsigned int port, int counts=0); + rpcs(unsigned int port, size_t counts=0); ~rpcs(); - inline int port() { return listener_->port(); } + inline unsigned int port() { return listener_->port(); } //RPC handler for clients binding - int rpcbind(int &r, int a); + int rpcbind(unsigned int &r, int a); void set_reachable(bool r) { reachable_ = r; } @@ -262,8 +262,8 @@ template void rpcs::reg(unsigned int proc, F f, C *c) { reg1(proc, marshalled_func::wrap(f, c)); } -void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); -void make_sockaddr(const char *host, const char *port, - struct sockaddr_in *dst); +void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst); +void make_sockaddr(const std::string &host, const std::string &port, struct + sockaddr_in *dst); #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index c381745..dbb10c6 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -14,6 +14,8 @@ #define NUM_CL 2 +char tprintf_thread_prefix = 'r'; + rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object struct sockaddr_in dst; //server's ip address @@ -27,7 +29,7 @@ class srv { int handle_22(std::string & r, const std::string a, const std::string b); int handle_fast(int &r, const int a); int handle_slow(int &r, const int a); - int handle_bigrep(std::string &r, const int a); + int handle_bigrep(std::string &r, const size_t a); }; // a handler. a and b are arguments, r is the result. @@ -60,9 +62,9 @@ srv::handle_slow(int &r, const int a) } int -srv::handle_bigrep(std::string &r, const int len) +srv::handle_bigrep(std::string &r, const size_t len) { - r = std::string(len, 'x'); + r = std::string((size_t)len, 'x'); return 0; } @@ -70,7 +72,7 @@ srv service; void startserver() { - server = new rpcs(port); + server = new rpcs((unsigned int)port); server->reg(22, &srv::handle_22, &service); server->reg(23, &srv::handle_fast, &service); server->reg(24, &srv::handle_slow, &service); @@ -92,9 +94,9 @@ testmarshall() m << s; char *b; - int sz; + size_t sz; m.take_buf(&b,&sz); - VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); + VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); unmarshall un(b,sz); request_header rh1; @@ -111,10 +113,10 @@ testmarshall() } void -client1(int cl) +client1(size_t cl) { // test concurrency. - int which_cl = ((unsigned long) cl ) % NUM_CL; + size_t which_cl = cl % NUM_CL; for(int i = 0; i < 100; i++){ int arg = (random() % 2000); @@ -138,18 +140,18 @@ client1(int cl) int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); auto end = std::chrono::steady_clock::now(); - int diff = std::chrono::duration_cast(end - start).count(); + auto diff = std::chrono::duration_cast(end - start).count(); if (ret != 0) - printf("%d ms have elapsed!!!\n", diff); + printf("%d ms have elapsed!!!\n", (int)diff); VERIFY(ret == 0); VERIFY(rep == (which ? arg+1 : arg+2)); } } void -client2(int cl) +client2(size_t cl) { - int which_cl = ((unsigned long) cl ) % NUM_CL; + size_t which_cl = cl % NUM_CL; time_t t1; time(&t1); @@ -208,9 +210,9 @@ simple_tests(rpcc *c) // specify a timeout value to an RPC that should succeed (tcp) { std::string arg(1000, 'x'); - std::string rep; - c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x"); - VERIFY(rep.size() == 1001); + std::string rep2; + c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x"); + VERIFY(rep2.size() == 1001); printf(" -- no spurious timeout .. ok\n"); } @@ -236,21 +238,21 @@ simple_tests(rpcc *c) } void -concurrent_test(int nt) +concurrent_test(size_t nt) { // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. - printf("start concurrent_test (%d threads) ...", nt); + printf("start concurrent_test (%lu threads) ...", nt); std::vector th(nt); - for(int i = 0; i < nt; i++){ + + for(size_t i = 0; i < nt; i++) th[i] = std::thread(client1, i); - } - for(int i = 0; i < nt; i++){ + for(size_t i = 0; i < nt; i++) th[i].join(); - } + printf(" OK\n"); } @@ -271,14 +273,16 @@ lossy_test() VERIFY(clients[i]->bind()==0); } - int nt = 1; + size_t nt = 1; + std::vector th(nt); - for(int i = 0; i < nt; i++){ + + for(size_t i = 0; i < nt; i++) th[i] = std::thread(client2, i); - } - for(int i = 0; i < nt; i++){ + + for(size_t i = 0; i < nt; i++) th[i].join(); - } + printf(".. OK\n"); VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } @@ -319,17 +323,17 @@ failure_test() printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); - int nt = 10; - printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); + size_t nt = 10; + printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt); std::vector th(nt); - for(int i = 0; i < nt; i++){ + + for(size_t i = 0; i < nt; i++) th[i] = std::thread(client3, client); - } - for(int i = 0; i < nt; i++){ + for(size_t i = 0; i < nt; i++) th[i].join(); - } + printf("ok\n"); delete server; @@ -340,14 +344,14 @@ failure_test() VERIFY (client->bind() >= 0); printf(" -- delete existing rpc client and server, create replacements.. ok\n"); - printf(" -- concurrent test on new client and server w/ %d threads ..", nt); - for(int i = 0; i < nt; i++){ + printf(" -- concurrent test on new client and server w/ %lu threads ..", nt); + + for(size_t i = 0; i < nt; i++) th[i] = std::thread(client3, client); - } - for(int i = 0; i < nt; i++){ + for(size_t i = 0; i < nt; i++) th[i].join(); - } + printf("ok\n"); printf("failure_test OK\n"); @@ -364,10 +368,10 @@ main(int argc, char *argv[]) bool isclient = false; bool isserver = false; - srandom(getpid()); + srandom((uint32_t)getpid()); port = 20000 + (getpid() % 10000); - char ch = 0; + int ch = 0; while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { switch (ch) { case 'c': @@ -384,6 +388,7 @@ main(int argc, char *argv[]) break; case 'l': VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + break; default: break; } diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 73f94f4..ff3557c 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -5,10 +5,10 @@ // if blocking, then addJob() blocks when queue is full // otherwise, addJob() simply returns false when queue is full -ThrPool::ThrPool(int sz, bool blocking) +ThrPool::ThrPool(size_t sz, bool blocking) : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { - for (int i=0; i job_t; class ThrPool { public: - ThrPool(int sz, bool blocking=true); + ThrPool(size_t sz, bool blocking=true); ~ThrPool(); bool addJob(const job_t &j); private: - int nthreads_; + size_t nthreads_; bool blockadd_; fifo jobq_; diff --git a/rsm.cc b/rsm.cc index 65f60c7..8e597d6 100644 --- a/rsm.cc +++ b/rsm.cc @@ -173,18 +173,6 @@ void rsm::recovery() [[noreturn]] { } } -template -std::ostream & operator<<(std::ostream &o, const std::vector &d) { - o << "["; - for (typename std::vector::const_iterator i=d.begin(); i!=d.end(); i++) { - o << *i; - if (i+1 != d.end()) - o << ", "; - } - o << "]"; - return o; -} - bool rsm::sync_with_backups() { adopt_lock ml(rsm_mutex); ml.unlock(); @@ -204,7 +192,7 @@ bool rsm::sync_with_backups() { insync = true; cfg->get_view(vid_insync, backups); backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); - LOG("rsm::sync_with_backups " << backups); + LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end())); sync_cond.wait(ml); insync = false; return true; diff --git a/rsm_state_transfer.h b/rsm_state_transfer.h index 6c7e0e4..62a130c 100644 --- a/rsm_state_transfer.h +++ b/rsm_state_transfer.h @@ -5,7 +5,7 @@ class rsm_state_transfer { public: virtual std::string marshal_state() = 0; virtual void unmarshal_state(std::string) = 0; - virtual ~rsm_state_transfer() {}; + virtual ~rsm_state_transfer() {} }; #endif diff --git a/tprintf.h b/tprintf.h index c61626a..41539fe 100644 --- a/tprintf.h +++ b/tprintf.h @@ -14,44 +14,67 @@ extern std::map instance_name_map; extern int next_instance_num; extern char tprintf_thread_prefix; +template +struct iterator_pair : public std::pair { + explicit iterator_pair(const A & first, const A & second) : std::pair(first, second) {} +}; + +template +const struct iterator_pair make_iterator_pair(const A & first, const A & second) { + return iterator_pair(first, second); +} + +template +std::ostream & operator<<(std::ostream &o, const std::pair &d) { + o << "<" << d.first << "," << d.second << ">"; + return o; +} + +template +std::ostream & operator<<(std::ostream &o, const iterator_pair &d) { + o << "["; + for (auto i=d.first; i!=d.second; i++) { + o << *i; + auto j(i); + if (++j != d.second) + o << ", "; + } + o << "]"; + return o; +} + #define LOG_PREFIX { \ cerr_mutex.lock(); \ - auto self = std::this_thread::get_id(); \ - int tid = thread_name_map[self]; \ - if (tid==0) \ - tid = thread_name_map[self] = ++next_thread_num; \ - auto utime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \ - std::cerr << std::left << std::setw(9) << utime << " "; \ - std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \ - std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \ + auto _thread_ = std::this_thread::get_id(); \ + int _tid_ = thread_name_map[_thread_]; \ + if (_tid_==0) \ + _tid_ = thread_name_map[_thread_] = ++next_thread_num; \ + auto _utime_ = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \ + std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \ + std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \ + std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \ } #define LOG_THIS_POINTER { \ - int self = instance_name_map[this]; \ - if (self==0) \ - self = instance_name_map[this] = ++next_instance_num; \ - std::cerr << "#" << std::setw(2) << self; \ + int _self_ = instance_name_map[this]; \ + if (_self_==0) \ + _self_ = instance_name_map[this] = ++next_instance_num; \ + std::cerr << "#" << std::setw(2) << _self_; \ } #define LOG_SUFFIX { \ cerr_mutex.unlock(); \ } -#define LOG_NONMEMBER(x) { \ +#define LOG_NONMEMBER(_x_) { \ LOG_PREFIX; \ - std::cerr << x << std::endl; \ + std::cerr << _x_ << std::endl; \ LOG_SUFFIX; \ } -#define LOG(x) { \ +#define LOG(_x_) { \ LOG_PREFIX; \ LOG_THIS_POINTER; \ - std::cerr << x << std::endl; \ + std::cerr << _x_ << std::endl; \ LOG_SUFFIX; \ } -#define JOIN(from,to,sep) ({ \ - ostringstream oss; \ - for(auto i=from;i!=to;i++) \ - oss << *i << sep; \ - oss.str(); \ -}) #define LOG_FUNC_ENTER { \ LOG_PREFIX; \ LOG_THIS_POINTER; \ @@ -75,14 +98,13 @@ extern char tprintf_thread_prefix; LOG_SUFFIX; \ } -#define tprintf(args...) { \ - int len = snprintf(NULL, 0, args); \ - char buf[len+1]; \ - buf[len] = '\0'; \ - snprintf(buf, len+1, args); \ +#define tprintf(...) { \ + char *buf = nullptr; \ + int len = asprintf(&buf, __VA_ARGS__); \ if (buf[len-1]=='\n') \ buf[len-1] = '\0'; \ LOG_NONMEMBER(buf); \ + free(buf); \ } #endif