Cleanups
authorPeter Iannucci <iannucci@mit.edu>
Mon, 23 Sep 2013 14:07:40 +0000 (10:07 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 05:50:43 +0000 (01:50 -0400)
19 files changed:
lock_client.cc
lock_client.h
lock_demo.cc
lock_server.cc
lock_smain.cc
lock_tester.cc
paxos.cc
paxos.h
rpc/fifo.h
rpc/marshall.h
rpc/pollmgr.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm_state_transfer.h
tprintf.h

index d996b40..035d80b 100644 (file)
@@ -39,7 +39,7 @@ void lock_state::signal(std::thread::id who) {
         c[who].notify_one();
 }
 
-int lock_client::last_port = 0;
+unsigned int lock_client::last_port = 0;
 
 lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
@@ -53,11 +53,11 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
     make_sockaddr(xdst.c_str(), &dstsock);
     cl = new rpcc(dstsock);
     if (cl->bind() < 0) {
-        printf("lock_client: call bind\n");
+        LOG("lock_client: call bind");
     }
 
-    srand(time(NULL)^last_port);
-    rlock_port = ((rand()%32000) | (0x1 << 10));
+    srandom((uint32_t)time(NULL)^last_port);
+    rlock_port = ((random()%32000) | (0x1 << 10));
     const char *hname;
     // VERIFY(gethostname(hname, 100) == 0);
     hname = "127.0.0.1";
@@ -70,13 +70,13 @@ lock_client::lock_client(string xdst, class lock_release_user *_lu) : lu(_lu) {
     rlsrpc->reg(rlock_protocol::retry, &lock_client::retry_handler, this);
     {
         lock sl(xid_mutex);
-        xid = 0;
+        next_xid = 0;
     }
     rsmc = new rsm_client(xdst);
     releaser_thread = std::thread(&lock_client::releaser, this);
 }
 
-void lock_client::releaser() {
+void lock_client::releaser() [[noreturn]] {
     while (1) {
         lock_protocol::lockid_t lid;
         release_fifo.deq(&lid);
@@ -90,6 +90,8 @@ void lock_client::releaser() {
             sl.unlock();
             int r;
             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
+            if (lu)
+                lu->dorelease(lid);
             sl.lock();
         }
         st.state = lock_state::none;
@@ -123,8 +125,8 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
 
         if (st.state == lock_state::none || st.state == lock_state::retrying) {
             if (st.state == lock_state::none) {
-                lock sl(xid_mutex);
-                st.xid = xid++;
+                lock l(xid_mutex);
+                st.xid = next_xid++;
             }
             st.state = lock_state::acquiring;
             LOG("Lock " << lid << ": acquiring");
@@ -216,7 +218,7 @@ rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_
     return rlock_protocol::OK;
 }
 
-rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
+rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
     VERIFY(st.state == lock_state::acquiring);
index 7b5edf6..541cc23 100644 (file)
@@ -16,7 +16,7 @@
 class lock_release_user {
     public:
         virtual void dorelease(lock_protocol::lockid_t) = 0;
-        virtual ~lock_release_user() {};
+        virtual ~lock_release_user() {}
 };
 
 using std::string;
@@ -57,19 +57,19 @@ class lock_client {
         std::thread releaser_thread;
         rsm_client *rsmc;
         class lock_release_user *lu;
-        int rlock_port;
+        unsigned int rlock_port;
         string hostname;
         string id;
         mutex xid_mutex;
-        lock_protocol::xid_t xid;
+        lock_protocol::xid_t next_xid;
         fifo<lock_protocol::lockid_t> release_fifo;
         mutex lock_table_lock;
         lock_map lock_table;
         lock_state &get_lock_state(lock_protocol::lockid_t lid);
     public:
-        static int last_port;
+        static unsigned int last_port;
         lock_client(string xdst, class lock_release_user *l = 0);
-        ~lock_client() {};
+        ~lock_client() {}
         lock_protocol::status acquire(lock_protocol::lockid_t);
         lock_protocol::status release(lock_protocol::lockid_t);
         int stat(lock_protocol::lockid_t);
index 90f0047..3a85949 100644 (file)
@@ -1,14 +1,5 @@
-//
-// Lock demo
-//
-
-#include "lock_protocol.h"
 #include "lock_client.h"
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <vector>
-#include <stdlib.h>
-#include <stdio.h>
+#include "tprintf.h"
 
 char tprintf_thread_prefix = 'd';
 
@@ -21,5 +12,5 @@ main(int argc, char *argv[])
     }
 
     lock_client *lc = new lock_client(argv[1]);
-    printf ("stat returned %d\n", lc->stat("1"));
+    LOG_NONMEMBER("stat returned " << lc->stat("1"));
 }
index f5a1fc4..a82231e 100644 (file)
@@ -2,7 +2,6 @@
 
 #include "lock_server.h"
 #include <sstream>
-#include <stdio.h>
 #include <unistd.h>
 #include <arpa/inet.h>
 #include "lang/verify.h"
@@ -32,12 +31,6 @@ lock_state& lock_state::operator=(const lock_state& o) {
     return *this;
 }
 
-template <class A, class B>
-ostringstream & operator<<(ostringstream &o, const pair<A,B> &d) {
-    o << "<" << d.first << "," << d.second << ">";
-    return o;
-}
-
 marshall & operator<<(marshall &m, const lock_state &d) {
        return m << d.held << d.held_by << d.wanted_by;
 }
@@ -59,7 +52,7 @@ lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
     rsm->set_state_transfer(this);
 }
 
-void lock_server::revoker() {
+void lock_server::revoker() [[noreturn]] {
     while (1) {
         lock_protocol::lockid_t lid;
         revoke_fifo.deq(&lid);
@@ -87,7 +80,7 @@ void lock_server::revoker() {
     }
 }
 
-void lock_server::retryer() {
+void lock_server::retryer() [[noreturn]] {
     while (1) {
         lock_protocol::lockid_t lid;
         retry_fifo.deq(&lid);
@@ -119,7 +112,7 @@ void lock_server::retryer() {
     }
 }
 
-int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
+int lock_server::acquire(int &, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
     LOG_FUNC_ENTER_SERVER;
     holder h = holder(id, xid);
     lock_state &st = get_lock_state(lid);
@@ -168,7 +161,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr
     if (!found)
         st.wanted_by.push_back(h);
 
-    LOG("wanted_by=" << JOIN(st.wanted_by.begin(), st.wanted_by.end(), " "));
+    LOG("wanted_by=" << make_iterator_pair(st.wanted_by.begin(), st.wanted_by.end()));
 
     // send revoke if we're first in line
     if (st.wanted_by.front() == h)
@@ -177,7 +170,7 @@ int lock_server::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_pr
     return lock_protocol::RETRY;
 }
 
-int lock_server::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
+int lock_server::release(int &, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
     LOG_FUNC_ENTER_SERVER;
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
@@ -206,7 +199,7 @@ void lock_server::unmarshal_state(string state) {
 }
 
 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
-    printf("stat request\n");
+    LOG("stat request for " << lid);
     VERIFY(0);
     r = nacquire;
     return lock_protocol::OK;
index 086186e..363f886 100644 (file)
@@ -1,10 +1,9 @@
 #include "rpc/rpc.h"
 #include <arpa/inet.h>
 #include <stdlib.h>
-#include <stdio.h>
+#include "tprintf.h"
 #include <unistd.h>
 #include "lock_server.h"
-#include "paxos.h"
 #include "rsm.h"
 
 // Main loop of lock_server
@@ -17,7 +16,7 @@ main(int argc, char *argv[])
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
 
-    srandom(getpid());
+    srandom((uint32_t)getpid());
 
     if(argc != 3){
         fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
index 5c78c90..f4e68bd 100644 (file)
@@ -81,10 +81,8 @@ test1(void)
 }
 
 void *
-test2(void *x) 
+test2(int i) 
 {
-    int i = * (int *) x;
-
     tprintf ("test2: client %d acquire a release a\n", i);
     lc[i]->acquire(a);
     tprintf ("test2: client %d acquire done\n", i);
@@ -98,10 +96,8 @@ test2(void *x)
 }
 
 void *
-test3(void *x)
+test3(int i)
 {
-    int i = * (int *) x;
-
     tprintf ("test3: client %d acquire a release a concurrent\n", i);
     for (int j = 0; j < 10; j++) {
         lc[i]->acquire(a);
@@ -114,10 +110,8 @@ test3(void *x)
 }
 
 void *
-test4(void *x)
+test4(int i)
 {
-    int i = * (int *) x;
-
     tprintf ("test4: thread %d acquire a release a concurrent; same clnt\n", i);
     for (int j = 0; j < 10; j++) {
         lc[0]->acquire(a);
@@ -130,10 +124,8 @@ test4(void *x)
 }
 
 void *
-test5(void *x)
+test5(int i)
 {
-    int i = * (int *) x;
-
     tprintf ("test5: client %d acquire a release a concurrent; same and diff clnt\n", i);
     for (int j = 0; j < 10; j++) {
         if (i < 5)  lc[0]->acquire(a);
@@ -155,7 +147,7 @@ main(int argc, char *argv[])
 
     setvbuf(stdout, NULL, _IONBF, 0);
     setvbuf(stderr, NULL, _IONBF, 0);
-    srandom(getpid());
+    srandom((uint32_t)getpid());
 
     if(argc < 2) {
         fprintf(stderr, "Usage: %s [host:]port [test]\n", argv[0]);
@@ -181,53 +173,40 @@ main(int argc, char *argv[])
 
     if(!test || test == 2){
         // test2
-        for (int i = 0; i < nt; i++) {
-            int *a = new int (i);
-            th[i] = std::thread(test2, a);
-        }
-        for (int i = 0; i < nt; i++) {
+        for (int i = 0; i < nt; i++)
+            th[i] = std::thread(test2, i);
+        for (int i = 0; i < nt; i++)
             th[i].join();
-        }
     }
 
     if(!test || test == 3){
         tprintf("test 3\n");
 
         // test3
-        for (int i = 0; i < nt; i++) {
-            int *a = new int (i);
-            th[i] = std::thread(test3, a);
-        }
-        for (int i = 0; i < nt; i++) {
+        for (int i = 0; i < nt; i++)
+            th[i] = std::thread(test3, i);
+        for (int i = 0; i < nt; i++)
             th[i].join();
-        }
     }
 
     if(!test || test == 4){
         tprintf("test 4\n");
 
         // test 4
-        for (int i = 0; i < 2; i++) {
-            int *a = new int (i);
-            th[i] = std::thread(test4, a);
-        }
-        for (int i = 0; i < 2; i++) {
+        for (int i = 0; i < 2; i++)
+            th[i] = std::thread(test4, i);
+        for (int i = 0; i < 2; i++)
             th[i].join();
-        }
     }
 
     if(!test || test == 5){
         tprintf("test 5\n");
 
         // test 5
-
-        for (int i = 0; i < nt; i++) {
-            int *a = new int (i);
-            th[i] = std::thread(test5, a);
-        }
-        for (int i = 0; i < nt; i++) {
+        for (int i = 0; i < nt; i++)
+            th[i] = std::thread(test5, i);
+        for (int i = 0; i < nt; i++)
             th[i].join();
-        }
     }
 
     tprintf ("%s: passed all tests successfully\n", argv[0]);
index b0ec640..83bf4f1 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-
-bool
-operator> (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m > b.m));
+bool operator> (const prop_t &a, const prop_t &b) {
+    return (a.n > b.n || (a.n == b.n && a.m > b.m));
 }
 
-bool
-operator>= (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m >= b.m));
+bool operator>= (const prop_t &a, const prop_t &b) {
+    return (a.n > b.n || (a.n == b.n && a.m >= b.m));
 }
 
 std::string
-print_members(const std::vector<std::string> &nodes)
-{
-  std::string s;
-  s.clear();
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    s += nodes[i];
-    if (i < (nodes.size()-1))
-      s += ",";
-  }
-  return s;
+print_members(const std::vector<std::string> &nodes) {
+    std::string s;
+    s.clear();
+    for (unsigned i = 0; i < nodes.size(); i++) {
+        s += nodes[i];
+        if (i < (nodes.size()-1))
+            s += ",";
+    }
+    return s;
 }
 
-bool isamember(std::string m, const std::vector<std::string> &nodes)
-{
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    if (nodes[i] == m) return 1;
-  }
-  return 0;
+
+bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
+    for (auto n : nodes) {
+        if (n == m)
+            return 1;
+    }
+    return 0;
 }
 
-bool
-proposer::isrunning()
-{
-  bool r;
-  lock ml(pxs_mutex);
-  r = !stable;
-  return r;
+bool proposer::isrunning() {
+    bool r;
+    lock ml(pxs_mutex);
+    r = !stable;
+    return r;
 }
 
 // check if the servers in l2 contains a majority of servers in l1
-bool
-proposer::majority(const std::vector<std::string> &l1,
-               const std::vector<std::string> &l2)
-{
-  unsigned n = 0;
+bool proposer::majority(const std::vector<std::string> &l1,
+               const std::vector<std::string> &l2) {
+    unsigned n = 0;
 
-  for (unsigned i = 0; i < l1.size(); i++) {
-    if (isamember(l1[i], l2))
-      n++;
-  }
-  return n >= (l1.size() >> 1) + 1;
+    for (unsigned i = 0; i < l1.size(); i++) {
+        if (isamember(l1[i], l2))
+            n++;
+    }
+    return n >= (l1.size() >> 1) + 1;
 }
 
 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
-                  std::string _me)
+        const std::string &_me)
   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
-  my_n.n = 0;
-  my_n.m = me;
+    my_n.n = 0;
+    my_n.m = me;
 }
 
-void
-proposer::setn()
+void proposer::setn()
 {
-  my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+    my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
 }
 
-bool
-proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
+bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
+        const std::string & newv)
 {
-  std::vector<std::string> accepts;
-  std::vector<std::string> nodes;
-  std::string v;
-  bool r = false;
-
-  lock ml(pxs_mutex);
-  tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
-        print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
-  if (!stable) {  // already running proposer?
-    tprintf("proposer::run: already running\n");
-    return false;
-  }
-  stable = false;
-  setn();
-  accepts.clear();
-  v.clear();
-  if (prepare(instance, accepts, cur_nodes, v)) {
-
-    if (majority(cur_nodes, accepts)) {
-      tprintf("paxos::manager: received a majority of prepare responses\n");
-
-      if (v.size() == 0)
-       v = newv;
-
-      breakpoint1();
-
-      nodes = accepts;
-      accepts.clear();
-      accept(instance, accepts, nodes, v);
-
-      if (majority(cur_nodes, accepts)) {
-       tprintf("paxos::manager: received a majority of accept responses\n");
-
-       breakpoint2();
-
-       decide(instance, accepts, v);
-       r = true;
-      } else {
-       tprintf("paxos::manager: no majority of accept responses\n");
-      }
+    std::vector<std::string> accepts;
+    std::vector<std::string> nodes;
+    std::string v;
+    bool r = false;
+
+    lock ml(pxs_mutex);
+    tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
+            print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
+    if (!stable) {  // already running proposer?
+        tprintf("proposer::run: already running\n");
+        return false;
+    }
+    stable = false;
+    setn();
+    accepts.clear();
+    v.clear();
+    if (prepare(instance, accepts, cur_nodes, v)) {
+
+        if (majority(cur_nodes, accepts)) {
+            tprintf("paxos::manager: received a majority of prepare responses\n");
+
+            if (v.size() == 0)
+                v = newv;
+
+            breakpoint1();
+
+            nodes = accepts;
+            accepts.clear();
+            accept(instance, accepts, nodes, v);
+
+            if (majority(cur_nodes, accepts)) {
+                tprintf("paxos::manager: received a majority of accept responses\n");
+
+                breakpoint2();
+
+                decide(instance, accepts, v);
+                r = true;
+            } else {
+                tprintf("paxos::manager: no majority of accept responses\n");
+            }
+        } else {
+            tprintf("paxos::manager: no majority of prepare responses\n");
+        }
     } else {
-      tprintf("paxos::manager: no majority of prepare responses\n");
+        tprintf("paxos::manager: prepare is rejected %d\n", stable);
     }
-  } else {
-    tprintf("paxos::manager: prepare is rejected %d\n", stable);
-  }
-  stable = true;
-  return r;
+    stable = true;
+    return r;
 }
 
 // proposer::run() calls prepare to send prepare RPCs to nodes
@@ -145,16 +135,16 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
-         std::vector<std::string> nodes,
-         std::string &v)
+proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
+        const std::vector<std::string> & nodes,
+        std::string & v)
 {
     struct paxos_protocol::preparearg arg = { instance, my_n };
     struct paxos_protocol::prepareres res;
     prop_t n_a = { 0, "" };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
+    for (auto i : nodes) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
@@ -165,7 +155,7 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
                 return false;
             }
             if (res.accept) {
-                accepts.push_back(*i);
+                accepts.push_back(i);
                 if (res.n_a >= n_a) {
                     tprintf("found a newer accepted proposal\n");
                     v = res.v_a;
@@ -180,32 +170,30 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
 // run() calls this to send out accept RPCs to accepts.
 // fill in accepts with list of nodes that accepted.
 void
-proposer::accept(unsigned instance, std::vector<std::string> &accepts,
-        std::vector<std::string> nodes, std::string v)
+proposer::accept(unsigned instance, std::vector<std::string> & accepts,
+        const std::vector<std::string> & nodes, const std::string & v)
 {
     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
+    for (auto i : nodes) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         bool accept = false;
         int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
-        if (status == paxos_protocol::OK) {
-            if (accept)
-                accepts.push_back(*i);
-        }
+        if (status == paxos_protocol::OK && accept)
+            accepts.push_back(i);
     }
 }
 
 void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
-             std::string v)
+proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
+        const std::string & v)
 {
     struct paxos_protocol::decidearg arg = { instance, v };
     rpcc *r;
-    for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
-        handle h(*i);
+    for (auto i : accepts) {
+        handle h(i);
         if (!(r = h.safebind()))
             continue;
         int res = 0;
@@ -213,32 +201,33 @@ proposer::decide(unsigned instance, std::vector<std::string> accepts,
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
-            std::string _value)
+acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
+        const std::string & _value)
   : cfg(_cfg), me (_me), instance_h(0)
 {
-  n_h.n = 0;
-  n_h.m = me;
-  n_a.n = 0;
-  n_a.m = me;
-  v_a.clear();
-
-  l = new log (this, me);
-
-  if (instance_h == 0 && _first) {
-    values[1] = _value;
-    l->loginstance(1, _value);
-    instance_h = 1;
-  }
-
-  pxs = new rpcs(atoi(_me.c_str()));
-  pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
-  pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
-  pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
+    n_h.n = 0;
+    n_h.m = me;
+    n_a.n = 0;
+    n_a.m = me;
+    v_a.clear();
+
+    l = new log (this, me);
+
+    if (instance_h == 0 && _first) {
+        values[1] = _value;
+        l->loginstance(1, _value);
+        instance_h = 1;
+    }
+
+    pxs = new rpcs((uint32_t)std::stoi(_me));
+    pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+    pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+    pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
 }
 
 paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
+acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
+        paxos_protocol::preparearg a)
 {
     lock ml(pxs_mutex);
     r.oldinstance = false;
@@ -259,7 +248,7 @@ acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_proto
 }
 
 paxos_protocol::status
-acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
+acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
 {
     lock ml(pxs_mutex);
     r = false;
@@ -272,16 +261,16 @@ acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
     return paxos_protocol::OK;
 }
 
-// the src argument is only for debug purpose
-    paxos_protocol::status
-acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
+// the src argument is only for debugging
+paxos_protocol::status
+acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
 {
     lock ml(pxs_mutex);
     tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
             a.instance, instance_h, v_a.c_str());
     if (a.instance == instance_h + 1) {
         VERIFY(v_a == a.v);
-        commit_wo(a.instance, v_a);
+        commit(a.instance, v_a, ml);
     } else if (a.instance <= instance_h) {
         // we are ahead ignore.
     } else {
@@ -292,10 +281,8 @@ acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
 }
 
 void
-acceptor::commit_wo(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
 {
-    //assume pxs_mutex is held
-    adopt_lock ml(pxs_mutex);
     tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
     if (instance > instance_h) {
         tprintf("commit: highestaccepteinstance = %d\n", instance);
@@ -308,31 +295,31 @@ acceptor::commit_wo(unsigned instance, std::string value)
         n_a.m = me;
         v_a.clear();
         if (cfg) {
-            ml.unlock();
+            pxs_mutex_lock.unlock();
             cfg->paxos_commit(instance, value);
-            ml.lock();
+            pxs_mutex_lock.lock();
         }
     }
 }
 
 void
-acceptor::commit(unsigned instance, std::string value)
+acceptor::commit(unsigned instance, const std::string & value)
 {
     lock ml(pxs_mutex);
-    commit_wo(instance, value);
+    commit(instance, value, ml);
 }
 
 std::string
 acceptor::dump()
 {
-  return l->dump();
+    return l->dump();
 }
 
 void
-acceptor::restore(std::string s)
+acceptor::restore(const std::string & s)
 {
-  l->restore(s);
-  l->logread();
+    l->restore(s);
+    l->logread();
 }
 
 
@@ -343,30 +330,30 @@ acceptor::restore(std::string s)
 void
 proposer::breakpoint1()
 {
-  if (break1) {
-    tprintf("Dying at breakpoint 1!\n");
-    exit(1);
-  }
+    if (break1) {
+        tprintf("Dying at breakpoint 1!\n");
+        exit(1);
+    }
 }
 
 // Call this from your code between phases accept and decide of proposer
 void
 proposer::breakpoint2()
 {
-  if (break2) {
-    tprintf("Dying at breakpoint 2!\n");
-    exit(1);
-  }
+    if (break2) {
+        tprintf("Dying at breakpoint 2!\n");
+        exit(1);
+    }
 }
 
 void
 proposer::breakpoint(int b)
 {
-  if (b == 3) {
-    tprintf("Proposer: breakpoint 1\n");
-    break1 = true;
-  } else if (b == 4) {
-    tprintf("Proposer: breakpoint 2\n");
-    break2 = true;
-  }
+    if (b == 3) {
+        tprintf("Proposer: breakpoint 1\n");
+        break1 = true;
+    } else if (b == 4) {
+        tprintf("Proposer: breakpoint 2\n");
+        break2 = true;
+    }
 }
diff --git a/paxos.h b/paxos.h
index 170292a..9650de1 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -6,91 +6,92 @@
 #include "rpc/rpc.h"
 #include "paxos_protocol.h"
 #include "log.h"
+#include "lock.h"
 
 
 class paxos_change {
- public:
-  virtual void paxos_commit(unsigned instance, const std::string &v) = 0;
-  virtual ~paxos_change() {};
+    public:
+        virtual void paxos_commit(unsigned instance, const std::string & v) = 0;
+        virtual ~paxos_change() {}
 };
 
 class acceptor {
- private:
-  log *l;
-  rpcs *pxs;
-  paxos_change *cfg;
-  std::string me;
-  std::mutex pxs_mutex;
-
-  // Acceptor state
-  prop_t n_h;          // number of the highest proposal seen in a prepare
-  prop_t n_a;          // number of highest proposal accepted
-  std::string v_a;     // value of highest proposal accepted
-  unsigned instance_h; // number of the highest instance we have decided
-  std::map<unsigned,std::string> values;       // vals of each instance
-
-  void commit_wo(unsigned instance, std::string v);
-  paxos_protocol::status preparereq(paxos_protocol::prepareres &r,
-          std::string src, paxos_protocol::preparearg a);
-  paxos_protocol::status acceptreq(bool &r, std::string src,
-          paxos_protocol::acceptarg a);
-  paxos_protocol::status decidereq(int &r, std::string src,
-          paxos_protocol::decidearg a);
-
-  friend class log;
-
- public:
-  acceptor(class paxos_change *cfg, bool _first, std::string _me,
-       std::string _value);
-  ~acceptor() {};
-  void commit(unsigned instance, std::string v);
-  unsigned instance() { return instance_h; }
-  std::string value(unsigned instance) { return values[instance]; }
-  std::string dump();
-  void restore(std::string);
-  rpcs *get_rpcs() { return pxs; };
-  prop_t get_n_h() { return n_h; };
-  unsigned get_instance_h() { return instance_h; };
+    private:
+        log *l;
+        rpcs *pxs;
+        paxos_change *cfg;
+        std::string me;
+        mutex pxs_mutex;
+
+        // Acceptor state
+        prop_t n_h;            // number of the highest proposal seen in a prepare
+        prop_t n_a;            // number of highest proposal accepted
+        std::string v_a;       // value of highest proposal accepted
+        unsigned instance_h;   // number of the highest instance we have decided
+        std::map<unsigned,std::string> values; // vals of each instance
+
+        void commit(unsigned instance, const std::string & v, lock & pxs_mutex_lock);
+        paxos_protocol::status preparereq(paxos_protocol::prepareres & r,
+                const std::string & src, paxos_protocol::preparearg a);
+        paxos_protocol::status acceptreq(bool & r, const std::string & src,
+                paxos_protocol::acceptarg a);
+        paxos_protocol::status decidereq(int & r, const std::string & src,
+                paxos_protocol::decidearg a);
+
+        friend class log;
+
+    public:
+        acceptor(class paxos_change *cfg, bool _first, const std::string & _me,
+                const std::string & _value);
+        ~acceptor() {}
+        void commit(unsigned instance, const std::string & v);
+        unsigned instance() { return instance_h; }
+        const std::string & value(unsigned instance) { return values[instance]; }
+        std::string dump();
+        void restore(const std::string &);
+        rpcs *get_rpcs() { return pxs; }
+        prop_t get_n_h() { return n_h; }
+        unsigned get_instance_h() { return instance_h; }
 };
 
-extern bool isamember(std::string m, const std::vector<std::string> &nodes);
-extern std::string print_members(const std::vector<std::string> &nodes);
+extern bool isamember(const std::string & m, const std::vector<std::string> & nodes);
+extern std::string print_members(const std::vector<std::string> & nodes);
 
 class proposer {
- private:
-  log *l;
-  paxos_change *cfg;
-  acceptor *acc;
-  std::string me;
-  bool break1;
-  bool break2;
-
-  std::mutex pxs_mutex;
-
-  // Proposer state
-  bool stable;
-  prop_t my_n;         // number of the last proposal used in this instance
-
-  void setn();
-  bool prepare(unsigned instance, std::vector<std::string> &accepts,
-         std::vector<std::string> nodes,
-         std::string &v);
-  void accept(unsigned instance, std::vector<std::string> &accepts,
-        std::vector<std::string> nodes, std::string v);
-  void decide(unsigned instance, std::vector<std::string> accepts,
-        std::string v);
-
-  void breakpoint1();
-  void breakpoint2();
-  bool majority(const std::vector<std::string> &l1, const std::vector<std::string> &l2);
-
-  friend class log;
- public:
-  proposer(class paxos_change *cfg, class acceptor *_acceptor, std::string _me);
-  ~proposer() {};
-  bool run(int instance, std::vector<std::string> cnodes, std::string v);
-  bool isrunning();
-  void breakpoint(int b);
+    private:
+        log *l;
+        paxos_change *cfg;
+        acceptor *acc;
+        std::string me;
+        bool break1;
+        bool break2;
+
+        mutex pxs_mutex;
+
+        // Proposer state
+        bool stable;
+        prop_t my_n;           // number of the last proposal used in this instance
+
+        void setn();
+        bool prepare(unsigned instance, std::vector<std::string> & accepts,
+                const std::vector<std::string> & nodes,
+                std::string & v);
+        void accept(unsigned instance, std::vector<std::string> & accepts,
+                const std::vector<std::string> & nodes, const std::string & v);
+        void decide(unsigned instance, const std::vector<std::string> & accepts,
+                const std::string & v);
+
+        void breakpoint1();
+        void breakpoint2();
+        bool majority(const std::vector<std::string> & l1, const std::vector<std::string> & l2);
+
+        friend class log;
+    public:
+        proposer(class paxos_change *cfg, class acceptor *_acceptor, const std::string &_me);
+        ~proposer() {}
+        bool run(unsigned instance, const std::vector<std::string> & cnodes, const std::string & v);
+        bool isrunning();
+        void breakpoint(int b);
 };
 
 
index 93a79cf..dde514d 100644 (file)
@@ -8,20 +8,20 @@
 template<class T>
 class fifo {
        public:
-               fifo(int limit=0) : max_(limit) {};
+               fifo(size_t limit=0) : max_(limit) {}
                bool enq(T, bool blocking=true);
                void deq(T *);
                bool size() {
             lock ml(m_);
             return q_.size();
-        };
+        }
 
        private:
                std::list<T> q_;
         mutex m_;
         cond non_empty_c_; // q went non-empty
                cond has_space_c_; // q is not longer overfull
-               unsigned int max_; // maximum capacity of the queue, block enq threads if exceeds this limit
+               size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
 };
 
 template<class T> bool
index 7a85d6b..676a682 100644 (file)
 #include "lang/verify.h"
 
 struct request_header {
-    request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) :
+    request_header(int x=0, int p=0, unsigned c=0, unsigned s=0, int xi=0) :
         xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
     int xid;
     int proc;
     unsigned int clt_nonce;
     unsigned int srv_nonce;
     int xid_rep;
-    request_header hton() const {
-        return {
-            htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep)
-        };
-    }
 };
 
 struct reply_header {
     reply_header(int x=0, int r=0): xid(x), ret(r) {}
     int xid;
     int ret;
-    reply_header hton() const {
-        return {
-            htonl(xid), htonl(ret)
-        };
-    }
 };
 
+template<class T> inline T hton(T t);
+
+constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
+
+template<> inline uint8_t hton(uint8_t t) { return t; }
+template<> inline int8_t hton(int8_t t) { return t; }
+template<> inline uint16_t hton(uint16_t t) { return htons(t); }
+template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
+template<> inline uint32_t hton(uint32_t t) { return htonl(t); }
+template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
+template<> inline uint64_t hton(uint64_t t) {
+    if (!endianness.is_little_endian)
+        return t;
+    return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
+}
+template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
+template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; }
+template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; }
+
+template <class T> inline T ntoh(T t) { return hton(t); }
+
 typedef int rpc_sz_t;
 
 //size of initial buffer allocation
@@ -76,16 +87,16 @@ class marshall {
                 free(buf_);
         }
 
-        int size() { return index_;}
+        size_t size() { return index_;}
         char *cstr() { return buf_;}
         const char *cstr() const { return buf_;}
 
-        void rawbyte(unsigned char x) {
+        void rawbyte(uint8_t x) {
             reserve(1);
-            buf_[index_++] = x;
+            buf_[index_++] = (int8_t)x;
         }
 
-        void rawbytes(const char *p, int n) {
+        void rawbytes(const char *p, size_t n) {
             reserve(n);
             memcpy(buf_+index_, p, n);
             index_ += n;
@@ -104,7 +115,7 @@ class marshall {
         void pack_req_header(const request_header &h);
         void pack_reply_header(const reply_header &h);
 
-        void take_buf(char **b, int *s) {
+        void take_buf(char **b, size_t *s) {
             *b = buf_;
             *s = index_;
             buf_ = NULL;
@@ -114,13 +125,13 @@ class marshall {
 };
 
 marshall& operator<<(marshall &, bool);
-marshall& operator<<(marshall &, unsigned int);
-marshall& operator<<(marshall &, int);
-marshall& operator<<(marshall &, unsigned char);
-marshall& operator<<(marshall &, char);
-marshall& operator<<(marshall &, unsigned short);
-marshall& operator<<(marshall &, short);
-marshall& operator<<(marshall &, unsigned long long);
+marshall& operator<<(marshall &, uint32_t);
+marshall& operator<<(marshall &, int32_t);
+marshall& operator<<(marshall &, uint8_t);
+marshall& operator<<(marshall &, int8_t);
+marshall& operator<<(marshall &, uint16_t);
+marshall& operator<<(marshall &, int16_t);
+marshall& operator<<(marshall &, uint64_t);
 marshall& operator<<(marshall &, const std::string &);
 
 template <class A> marshall &
@@ -138,17 +149,31 @@ operator<<(marshall &m, const std::pair<A,B> &d) {
     return m;
 }
 
+class unmarshall;
+
+unmarshall& operator>>(unmarshall &, bool &);
+unmarshall& operator>>(unmarshall &, uint8_t &);
+unmarshall& operator>>(unmarshall &, int8_t &);
+unmarshall& operator>>(unmarshall &, uint16_t &);
+unmarshall& operator>>(unmarshall &, int16_t &);
+unmarshall& operator>>(unmarshall &, uint32_t &);
+unmarshall& operator>>(unmarshall &, int32_t &);
+unmarshall& operator>>(unmarshall &, size_t &);
+unmarshall& operator>>(unmarshall &, uint64_t &);
+unmarshall& operator>>(unmarshall &, int64_t &);
+unmarshall& operator>>(unmarshall &, std::string &);
+
 class unmarshall {
     private:
         char *buf_;
-        int sz_;
-        int index_;
+        size_t sz_;
+        size_t index_;
         bool ok_;
 
         inline bool ensure(size_t n);
     public:
         unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
-        unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {}
+        unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
         unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
         {
             //take the content which does not exclude a RPC header from a string
@@ -175,13 +200,13 @@ class unmarshall {
         char *cstr() { return buf_;}
         bool okdone() const { return ok_ && index_ == sz_; }
 
-        unsigned int rawbyte();
+        uint8_t rawbyte();
         void rawbytes(std::string &s, size_t n);
+        template <class T> void rawbytes(T &t);
 
-        int ind() { return index_;}
-        int size() { return sz_;}
-        void unpack(int *); //non-const ref
-        void take_buf(char **b, int *sz) {
+        size_t ind() { return index_;}
+        size_t size() { return sz_;}
+        void take_buf(char **b, size_t *sz) {
             *b = buf_;
             *sz = sz_;
             sz_ = index_ = 0;
@@ -191,19 +216,14 @@ class unmarshall {
         void unpack_req_header(request_header *h) {
             //the first 4-byte is for channel to fill size of pdu
             index_ = sizeof(rpc_sz_t);
-            unpack(&h->xid);
-            unpack(&h->proc);
-            unpack((int *)&h->clt_nonce);
-            unpack((int *)&h->srv_nonce);
-            unpack(&h->xid_rep);
+            *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep;
             index_ = RPC_HEADER_SZ;
         }
 
         void unpack_reply_header(reply_header *h) {
             //the first 4-byte is for channel to fill size of pdu
             index_ = sizeof(rpc_sz_t);
-            unpack(&h->xid);
-            unpack(&h->ret);
+            *this >> h->xid >> h->ret;
             index_ = RPC_HEADER_SZ;
         }
 
index 33aeae2..919a286 100644 (file)
@@ -10,7 +10,7 @@
 PollMgr *PollMgr::instance = NULL;
 static std::once_flag pollmgr_is_initialized;
 
-void
+static void
 PollMgrInit()
 {
        PollMgr::instance = new PollMgr();
@@ -32,7 +32,7 @@ PollMgr::PollMgr() : pending_change_(false)
     th_ = std::thread(&PollMgr::wait_loop, this);
 }
 
-PollMgr::~PollMgr()
+PollMgr::~PollMgr() [[noreturn]]
 {
        //never kill me!!!
        VERIFY(0);
@@ -83,7 +83,7 @@ PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
 }
 
 void
-PollMgr::wait_loop()
+PollMgr::wait_loop() [[noreturn]]
 {
 
        std::vector<int> readable;
index f0c4b77..5e43547 100644 (file)
 #include "lock.h"
 
 #include "jsl_log.h"
+#include "tprintf.h"
 #include "lang/verify.h"
 
 const rpcc::TO rpcc::to_max = { 120000 };
 const rpcc::TO rpcc::to_min = { 1000 };
 
-rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
+rpcc::caller::caller(int xxid, unmarshall *xun)
 : xid(xxid), un(xun), done(false)
 {
 }
@@ -82,7 +83,7 @@ inline
 void set_rand_seed()
 {
     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
-    srandom((int)now.time_since_epoch().count()^((int)getpid()));
+    srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
 rpcc::rpcc(sockaddr_in d, bool retrans) :
@@ -91,7 +92,7 @@ rpcc::rpcc(sockaddr_in d, bool retrans) :
 {
     if(retrans){
         set_rand_seed();
-        clt_nonce_ = random();
+        clt_nonce_ = (unsigned int)random();
     } else {
         // special client nonce 0 means this client does not
         // require at-most-once logic from the server
@@ -127,7 +128,7 @@ rpcc::~rpcc()
 int
 rpcc::bind(TO to)
 {
-    int r;
+    unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
         lock ml(m_);
@@ -145,10 +146,9 @@ rpcc::bind(TO to)
 rpcc::cancel(void)
 {
     lock ml(m_);
-    printf("rpcc::cancel: force callers to fail\n");
-    std::map<int,caller*>::iterator iter;
-    for(iter = calls_.begin(); iter != calls_.end(); iter++){
-        caller *ca = iter->second;
+    tprintf("rpcc::cancel: force callers to fail");
+    for(auto &p : calls_){
+        caller *ca = p.second;
 
         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
         {
@@ -163,7 +163,7 @@ rpcc::cancel(void)
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
-    printf("rpcc::cancel: done\n");
+    tprintf("rpcc::cancel: done");
 }
 
 int
@@ -172,7 +172,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
 {
 
     caller ca(0, &rep);
-        int xid_rep;
+    int xid_rep;
     {
         lock ml(m_);
 
@@ -189,7 +189,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
         xid_rep = xid_rep_window_.front();
     }
 
@@ -223,7 +223,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
                 }
                 else jsl_log(JSL_DBG_1, "not reachable\n");
                 jsl_log(JSL_DBG_2,
-                        "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
+                        "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
                         clt_nonce_, proc, ca.xid, clt_nonce_);
             }
             transmit = false; // only send once on a given channel
@@ -289,7 +289,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
     lock cal(ca.m);
 
     jsl_log(JSL_DBG_2,
-            "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
+            "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
             ntohs(dst_.sin_port), ca.done, ca.intret);
 
@@ -324,7 +324,7 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *c, char *b, size_t sz)
+rpcc::got_pdu(connection *, char *b, size_t sz)
 {
     unmarshall rep(b, sz);
     reply_header h;
@@ -361,15 +361,13 @@ rpcc::got_pdu(connection *c, char *b, size_t sz)
 
 // assumes thread holds mutex m
 void
-rpcc::update_xid_rep(unsigned int xid)
+rpcc::update_xid_rep(int xid)
 {
-    std::list<unsigned int>::iterator it;
-
     if(xid <= xid_rep_window_.front()){
         return;
     }
 
-    for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+    for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
         if(*it > xid){
             xid_rep_window_.insert(it, xid);
             goto compress;
@@ -378,18 +376,18 @@ rpcc::update_xid_rep(unsigned int xid)
     xid_rep_window_.push_back(xid);
 
 compress:
-    it = xid_rep_window_.begin();
+    auto it = xid_rep_window_.begin();
     for (it++; it != xid_rep_window_.end(); it++){
         while (xid_rep_window_.front() + 1 == *it)
             xid_rep_window_.pop_front();
     }
 }
 
-rpcs::rpcs(unsigned int p1, int count)
+rpcs::rpcs(unsigned int p1, size_t count)
   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
 {
     set_rand_seed();
-    nonce_ = random();
+    nonce_ = (unsigned int)random();
     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
 
     char *loss_env = getenv("RPC_LOSSY");
@@ -445,23 +443,20 @@ rpcs::updatestat(unsigned int proc)
     counts_[proc]++;
     curr_counts_--;
     if(curr_counts_ == 0){
-        std::map<int, int>::iterator i;
-        printf("RPC STATS: ");
-        for (i = counts_.begin(); i != counts_.end(); i++){
-            printf("%x:%d ", i->first, i->second);
-        }
-        printf("\n");
+        tprintf("RPC STATS: ");
+        for (auto i = counts_.begin(); i != counts_.end(); i++)
+            tprintf("%x:%lu ", i->first, i->second);
 
         lock rwl(reply_window_m_);
         std::map<unsigned int,std::list<reply_t> >::iterator clt;
 
-        unsigned int totalrep = 0, maxrep = 0;
+        size_t totalrep = 0, maxrep = 0;
         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
             totalrep += clt->second.size();
             if(clt->second.size() > maxrep)
                 maxrep = clt->second.size();
         }
-        jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
+        jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
                         (int) reply_window_.size()-1, totalrep, maxrep);
         curr_counts_ = counting_;
     }
@@ -476,7 +471,7 @@ rpcs::dispatch(djob_t *j)
 
     request_header h;
     req.unpack_req_header(&h);
-    int proc = h.proc;
+    unsigned int proc = (unsigned int)h.proc;
 
     if(!req.ok()){
         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
@@ -485,7 +480,7 @@ rpcs::dispatch(djob_t *j)
     }
 
     jsl_log(JSL_DBG_2,
-            "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
+            "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
 
     marshall rep;
@@ -518,8 +513,8 @@ rpcs::dispatch(djob_t *j)
     }
 
     rpcs::rpcstate_t stat;
-    char *b1;
-    int sz1;
+    char *b1 = nullptr;
+    size_t sz1 = 0;
 
     if(h.clt_nonce){
         // have i seen this client before?
@@ -575,7 +570,7 @@ rpcs::dispatch(djob_t *j)
             rep.take_buf(&b1,&sz1);
 
             jsl_log(JSL_DBG_2,
-                    "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
+                    "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
 
             if(h.clt_nonce > 0){
@@ -605,7 +600,7 @@ rpcs::dispatch(djob_t *j)
             c->send(b1, sz1);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
-            jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
+            jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
                     h.xid, h.clt_nonce);
             rh.ret = rpc_const::atmostonce_failure;
             rep.pack_reply_header(rh);
@@ -630,8 +625,8 @@ rpcs::dispatch(djob_t *j)
 //   DONE: seen this xid, previous reply returned in *b and *sz.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
-        unsigned int xid_rep, char **b, int *sz)
+rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
+        int xid_rep, char **b, size_t *sz)
 {
     lock rwl(reply_window_m_);
 
@@ -640,12 +635,12 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
 
-    unsigned int past_xid_rep = l.begin()->xid;
+    int past_xid_rep = l.begin()->xid;
 
     std::list<reply_t>::iterator start = l.begin(), it;
     it = ++start;
 
-    if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
+    if (past_xid_rep < xid_rep || past_xid_rep == -1) {
         // scan for deletion candidates
         for (; it != l.end() && it->xid < xid_rep; it++) {
             if (it->cb_present)
@@ -655,7 +650,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
         l.begin()->xid = xid_rep;
     }
 
-    if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
+    if (xid < past_xid_rep && past_xid_rep != -1)
         return FORGOTTEN;
 
     // skip non-deletion candidates
@@ -685,8 +680,8 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
 // free_reply_window() and checkduplicate_and_update is responsible for
 // calling free(b).
 void
-rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
-        char *b, int sz)
+rpcs::add_reply(unsigned int clt_nonce, int xid,
+        char *b, size_t sz)
 {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
@@ -706,12 +701,9 @@ rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
 void
 rpcs::free_reply_window(void)
 {
-    std::map<unsigned int,std::list<reply_t> >::iterator clt;
-    std::list<reply_t>::iterator it;
-
     lock rwl(reply_window_m_);
-    for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-        for (it = clt->second.begin(); it != clt->second.end(); it++){
+    for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
+        for (auto it = clt->second.begin(); it != clt->second.end(); it++){
             if (it->cb_present)
                 free(it->buf);
         }
@@ -722,7 +714,7 @@ rpcs::free_reply_window(void)
 
 // rpc handler
 int
-rpcs::rpcbind(int &r, int a)
+rpcs::rpcbind(unsigned int &r, int)
 {
     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
     r = nonce_;
@@ -737,22 +729,22 @@ operator<<(marshall &m, uint8_t x) {
 
 marshall &
 operator<<(marshall &m, uint16_t x) {
-    x = htons(x);
+    x = hton(x);
     m.rawbytes((char *)&x, 2);
     return m;
 }
 
 marshall &
 operator<<(marshall &m, uint32_t x) {
-    x = htonl(x);
+    x = hton(x);
     m.rawbytes((char *)&x, 4);
     return m;
 }
 
-marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
-marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
+marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
+marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
 
 marshall &
@@ -763,7 +755,7 @@ operator<<(marshall &m, const std::string &s) {
 }
 
 void marshall::pack_req_header(const request_header &h) {
-    int saved_sz = index_;
+    size_t saved_sz = index_;
     //leave the first 4-byte empty for channel to fill size of pdu
     index_ = sizeof(rpc_sz_t);
     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
@@ -771,22 +763,13 @@ void marshall::pack_req_header(const request_header &h) {
 }
 
 void marshall::pack_reply_header(const reply_header &h) {
-    int saved_sz = index_;
+    size_t saved_sz = index_;
     //leave the first 4-byte empty for channel to fill size of pdu
     index_ = sizeof(rpc_sz_t);
     *this << h.xid << h.ret;
     index_ = saved_sz;
 }
 
-void
-unmarshall::unpack(int *x)
-{
-    (*x) = (rawbyte() & 0xff) << 24;
-    (*x) |= (rawbyte() & 0xff) << 16;
-    (*x) |= (rawbyte() & 0xff) << 8;
-    (*x) |= rawbyte() & 0xff;
-}
-
 // take the contents from another unmarshall object
 void
 unmarshall::take_in(unmarshall &another)
@@ -805,12 +788,12 @@ unmarshall::ensure(size_t n) {
     return ok_;
 }
 
-unsigned int
+inline uint8_t
 unmarshall::rawbyte()
 {
     if (!ensure(1))
         return 0;
-    return buf_[index_++];
+    return (uint8_t)buf_[index_++];
 }
 
 void
@@ -821,78 +804,29 @@ unmarshall::rawbytes(std::string &ss, size_t n)
     index_ += n;
 }
 
-unmarshall &
-operator>>(unmarshall &u, bool &x)
-{
-    x = (bool)u.rawbyte();
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned char &x)
-{
-    x = (unsigned char)u.rawbyte();
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, char &x)
-{
-    x = (char)u.rawbyte();
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned short &x)
-{
-    x = (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, short &x)
-{
-    x = (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned int &x)
-{
-    x = (u.rawbyte() & 0xff) << 24;
-    x |= (u.rawbyte() & 0xff) << 16;
-    x |= (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, int &x)
-{
-    x = (u.rawbyte() & 0xff) << 24;
-    x |= (u.rawbyte() & 0xff) << 16;
-    x |= (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned long long &x)
+template <class T>
+void
+unmarshall::rawbytes(T &t)
 {
-    unsigned int h, l;
-    u >> h;
-    u >> l;
-    x = l | ((unsigned long long) h << 32);
-    return u;
+    const size_t n = sizeof(T);
+    VERIFY(ensure(n));
+    memcpy(&t, buf_+index_, n);
+    t = ntoh(t);
+    index_ += n;
 }
 
-unmarshall &
-operator>>(unmarshall &u, std::string &s)
-{
-    unsigned sz;
-    u >> sz;
+unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
+unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
+unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
+unmarshall & operator>>(unmarshall &u, std::string &s) {
+    unsigned sz = u.grab<unsigned>();
     if(u.ok())
         u.rawbytes(s, sz);
     return u;
@@ -906,42 +840,32 @@ bool operator<(const sockaddr_in &a, const sockaddr_in &b){
 
 /*---------------auxilary function--------------*/
 void
-make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
-
-    char host[200];
-    const char *localhost = "127.0.0.1";
-    const char *port = index(hostandport, ':');
-    if(port == NULL){
-        memcpy(host, localhost, strlen(localhost)+1);
-        port = hostandport;
-    } else {
-        memcpy(host, hostandport, port-hostandport);
-        host[port-hostandport] = '\0';
-        port++;
-    }
-
-    make_sockaddr(host, port, dst);
-
+make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
+    auto colon = hostandport.find(':');
+    if (colon == std::string::npos)
+        make_sockaddr("127.0.0.1", hostandport, dst);
+    else
+        make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
 }
 
 void
-make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
-
-    in_addr_t a;
-
+make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
     bzero(dst, sizeof(*dst));
     dst->sin_family = AF_INET;
 
-    a = inet_addr(host);
-    if(a != INADDR_NONE){
-        dst->sin_addr.s_addr = a;
-    } else {
-        struct hostent *hp = gethostbyname(host);
-        if(hp == 0 || hp->h_length != 4){
-            fprintf(stderr, "cannot find host name %s\n", host);
+    struct in_addr a{inet_addr(host.c_str())};
+
+    if(a.s_addr != INADDR_NONE)
+        dst->sin_addr.s_addr = a.s_addr;
+    else {
+        struct hostent *hp = gethostbyname(host.c_str());
+
+        if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
+            fprintf(stderr, "cannot find host name %s\n", host.c_str());
             exit(1);
         }
-        dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+        memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
+        dst->sin_addr.s_addr = a.s_addr;
     }
-    dst->sin_port = htons(atoi(port));
+    dst->sin_port = hton((uint16_t)std::stoi(port));
 }
index 1348dc8..c0420a5 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -36,10 +36,10 @@ class rpcc : public chanmgr {
 
         //manages per rpc info
         struct caller {
-            caller(unsigned int xxid, unmarshall *un);
+            caller(int xxid, unmarshall *un);
             ~caller();
 
-            unsigned int xid;
+            int xid;
             unmarshall *un;
             int intret;
             bool done;
@@ -48,14 +48,14 @@ class rpcc : public chanmgr {
         };
 
         void get_refconn(connection **ch);
-        void update_xid_rep(unsigned int xid);
+        void update_xid_rep(int xid);
 
 
         sockaddr_in dst_;
         unsigned int clt_nonce_;
         unsigned int srv_nonce_;
         bool bind_done_;
-        unsigned int xid_;
+        int xid_;
         int lossytest_;
         bool retrans_;
         bool reachable_;
@@ -69,7 +69,7 @@ class rpcc : public chanmgr {
         std::condition_variable destroy_wait_c_;
 
         std::map<int, caller *> calls_;
-        std::list<unsigned int> xid_rep_window_;
+        std::list<int> xid_rep_window_;
 
         struct request {
             request() { clear(); }
@@ -167,25 +167,25 @@ class rpcs : public chanmgr {
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
     struct reply_t {
-        reply_t (unsigned int _xid) {
+        reply_t (int _xid) {
             xid = _xid;
             cb_present = false;
             buf = NULL;
             sz = 0;
         }
-        reply_t (unsigned int _xid, char *_buf, int _sz) {
+        reply_t (int _xid, char *_buf, size_t _sz) {
             xid = _xid;
             cb_present = true;
             buf = _buf;
             sz = _sz;
         }
-        unsigned int xid;
+        int xid;
         bool cb_present; // whether the reply buffer is valid
         char *buf;      // the reply buffer
-        int sz;         // the size of reply buffer
+        size_t sz;         // the size of reply buffer
     };
 
-    int port_;
+    unsigned int port_;
     unsigned int nonce_;
 
     // provide at most once semantics by maintaining a window of replies
@@ -194,11 +194,11 @@ class rpcs : public chanmgr {
     std::map<unsigned int, std::list<reply_t> > reply_window_;
 
     void free_reply_window(void);
-    void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
+    void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
 
     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
-            unsigned int xid, unsigned int rep_xid,
-            char **b, int *sz);
+            int xid, int rep_xid,
+            char **b, size_t *sz);
 
     void updatestat(unsigned int proc);
 
@@ -206,15 +206,15 @@ class rpcs : public chanmgr {
     std::map<unsigned int, connection *> conns_;
 
     // counting
-    const int counting_;
-    int curr_counts_;
-    std::map<int, int> counts_;
+    const size_t counting_;
+    size_t curr_counts_;
+    std::map<unsigned int, size_t> counts_;
 
     int lossytest_; 
     bool reachable_;
 
     // map proc # to function
-    std::map<int, handler *> procs_;
+    std::map<unsigned int, handler *> procs_;
 
     std::mutex procs_m_; // protect insert/delete to procs[]
     std::mutex count_m_;  //protect modification of counts
@@ -225,9 +225,9 @@ class rpcs : public chanmgr {
     protected:
 
     struct djob_t {
-        djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
+        djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
         char *buf;
-        int sz;
+        size_t sz;
         connection *conn;
     };
     void dispatch(djob_t *);
@@ -239,11 +239,11 @@ class rpcs : public chanmgr {
     tcpsconn* listener_;
 
     public:
-    rpcs(unsigned int port, int counts=0);
+    rpcs(unsigned int port, size_t counts=0);
     ~rpcs();
-    inline int port() { return listener_->port(); }
+    inline unsigned int port() { return listener_->port(); }
     //RPC handler for clients binding
-    int rpcbind(int &r, int a);
+    int rpcbind(unsigned int &r, int a);
 
     void set_reachable(bool r) { reachable_ = r; }
 
@@ -262,8 +262,8 @@ template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
 }
 
-void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const char *host, const char *port,
-        struct sockaddr_in *dst);
+void make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst);
+void make_sockaddr(const std::string &host, const std::string &port, struct
+        sockaddr_in *dst);
 
 #endif
index c381745..dbb10c6 100644 (file)
@@ -14,6 +14,8 @@
 
 #define NUM_CL 2
 
+char tprintf_thread_prefix = 'r';
+
 rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
 struct sockaddr_in dst; //server's ip address
@@ -27,7 +29,7 @@ class srv {
                int handle_22(std::string & r, const std::string a, const std::string b);
                int handle_fast(int &r, const int a);
                int handle_slow(int &r, const int a);
-               int handle_bigrep(std::string &r, const int a);
+               int handle_bigrep(std::string &r, const size_t a);
 };
 
 // a handler. a and b are arguments, r is the result.
@@ -60,9 +62,9 @@ srv::handle_slow(int &r, const int a)
 }
 
 int
-srv::handle_bigrep(std::string &r, const int len)
+srv::handle_bigrep(std::string &r, const size_t len)
 {
-       r = std::string(len, 'x');
+       r = std::string((size_t)len, 'x');
        return 0;
 }
 
@@ -70,7 +72,7 @@ srv service;
 
 void startserver()
 {
-       server = new rpcs(port);
+       server = new rpcs((unsigned int)port);
        server->reg(22, &srv::handle_22, &service);
        server->reg(23, &srv::handle_fast, &service);
        server->reg(24, &srv::handle_slow, &service);
@@ -92,9 +94,9 @@ testmarshall()
        m << s;
 
        char *b;
-       int sz;
+       size_t sz;
        m.take_buf(&b,&sz);
-       VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
+       VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
 
        unmarshall un(b,sz);
        request_header rh1;
@@ -111,10 +113,10 @@ testmarshall()
 }
 
 void
-client1(int cl)
+client1(size_t cl)
 {
        // test concurrency.
-       int which_cl = ((unsigned long) cl ) % NUM_CL;
+       size_t which_cl = cl % NUM_CL;
 
        for(int i = 0; i < 100; i++){
                int arg = (random() % 2000);
@@ -138,18 +140,18 @@ client1(int cl)
 
                int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
                auto end = std::chrono::steady_clock::now();
-               int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+               auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
                if (ret != 0)
-                       printf("%d ms have elapsed!!!\n", diff);
+                       printf("%d ms have elapsed!!!\n", (int)diff);
                VERIFY(ret == 0);
                VERIFY(rep == (which ? arg+1 : arg+2));
        }
 }
 
 void
-client2(int cl)
+client2(size_t cl)
 {
-       int which_cl = ((unsigned long) cl ) % NUM_CL;
+       size_t which_cl = cl % NUM_CL;
 
        time_t t1;
        time(&t1);
@@ -208,9 +210,9 @@ simple_tests(rpcc *c)
        // specify a timeout value to an RPC that should succeed (tcp)
        {
                std::string arg(1000, 'x');
-               std::string rep;
-               c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x");
-               VERIFY(rep.size() == 1001);
+               std::string rep2;
+               c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
+               VERIFY(rep2.size() == 1001);
                printf("   -- no spurious timeout .. ok\n");
        }
 
@@ -236,21 +238,21 @@ simple_tests(rpcc *c)
 }
 
 void 
-concurrent_test(int nt)
+concurrent_test(size_t nt)
 {
        // create threads that make lots of calls in parallel,
        // to test thread synchronization for concurrent calls
        // and dispatches.
-       printf("start concurrent_test (%d threads) ...", nt);
+       printf("start concurrent_test (%lu threads) ...", nt);
 
     std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
+
+       for(size_t i = 0; i < nt; i++)
         th[i] = std::thread(client1, i);
-       }
 
-       for(int i = 0; i < nt; i++){
+       for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
+
        printf(" OK\n");
 }
 
@@ -271,14 +273,16 @@ lossy_test()
                VERIFY(clients[i]->bind()==0);
        }
 
-       int nt = 1;
+       size_t nt = 1;
+
     std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
+
+       for(size_t i = 0; i < nt; i++)
         th[i] = std::thread(client2, i);
-       }
-       for(int i = 0; i < nt; i++){
+
+       for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
+
        printf(".. OK\n");
        VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
@@ -319,17 +323,17 @@ failure_test()
        printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
 
 
-       int nt = 10;
-       printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
+       size_t nt = 10;
+       printf("   -- concurrent test on new rpc client w/ %lu threads ..", nt);
 
     std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
+
+       for(size_t i = 0; i < nt; i++)
         th[i] = std::thread(client3, client);
-       }
 
-       for(int i = 0; i < nt; i++){
+       for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
+
        printf("ok\n");
 
        delete server;
@@ -340,14 +344,14 @@ failure_test()
        VERIFY (client->bind() >= 0);
        printf("   -- delete existing rpc client and server, create replacements.. ok\n");
 
-       printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
-       for(int i = 0; i < nt; i++){
+       printf("   -- concurrent test on new client and server w/ %lu threads ..", nt);
+
+       for(size_t i = 0; i < nt; i++)
         th[i] = std::thread(client3, client);
-       }
 
-       for(int i = 0; i < nt; i++){
+       for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
+
        printf("ok\n");
 
        printf("failure_test OK\n");
@@ -364,10 +368,10 @@ main(int argc, char *argv[])
        bool isclient = false;
        bool isserver = false;
 
-       srandom(getpid());
+       srandom((uint32_t)getpid());
        port = 20000 + (getpid() % 10000);
 
-       char ch = 0;
+       int ch = 0;
        while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
                switch (ch) {
                        case 'c':
@@ -384,6 +388,7 @@ main(int argc, char *argv[])
                                break;
                        case 'l':
                                VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+                break;
                        default:
                                break;
                }
index 73f94f4..ff3557c 100644 (file)
@@ -5,10 +5,10 @@
 
 // if blocking, then addJob() blocks when queue is full
 // otherwise, addJob() simply returns false when queue is full
-ThrPool::ThrPool(int sz, bool blocking)
+ThrPool::ThrPool(size_t sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
 {
-       for (int i=0; i<nthreads_; i++)
+       for (size_t i=0; i<nthreads_; i++)
         th_.emplace_back(&ThrPool::do_worker, this);
 }
 
@@ -16,10 +16,10 @@ ThrPool::ThrPool(int sz, bool blocking)
 // will ever use this thread pool again or is currently blocking on it
 ThrPool::~ThrPool()
 {
-       for (int i=0; i<nthreads_; i++)
+       for (size_t i=0; i<nthreads_; i++)
                jobq_.enq(job_t());
 
-       for (int i=0; i<nthreads_; i++)
+       for (size_t i=0; i<nthreads_; i++)
         th_[i].join();
 }
 
index 2a1a749..4ea1bd4 100644 (file)
@@ -10,13 +10,13 @@ typedef std::function<void()> job_t;
 
 class ThrPool {
        public:
-               ThrPool(int sz, bool blocking=true);
+               ThrPool(size_t sz, bool blocking=true);
                ~ThrPool();
 
                bool addJob(const job_t &j);
 
        private:
-               int nthreads_;
+        size_t nthreads_;
                bool blockadd_;
 
                fifo<job_t> jobq_;
diff --git a/rsm.cc b/rsm.cc
index 65f60c7..8e597d6 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -173,18 +173,6 @@ void rsm::recovery() [[noreturn]] {
     }
 }
 
-template <class A>
-std::ostream & operator<<(std::ostream &o, const std::vector<A> &d) {
-    o << "[";
-    for (typename std::vector<A>::const_iterator i=d.begin(); i!=d.end(); i++) {
-        o << *i;
-        if (i+1 != d.end())
-            o << ", ";
-    }
-    o << "]";
-    return o;
-}
-
 bool rsm::sync_with_backups() {
     adopt_lock ml(rsm_mutex);
     ml.unlock();
@@ -204,7 +192,7 @@ bool rsm::sync_with_backups() {
     insync = true;
     cfg->get_view(vid_insync, backups);
     backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
-    LOG("rsm::sync_with_backups " << backups);
+    LOG("rsm::sync_with_backups " << make_iterator_pair(backups.begin(), backups.end()));
     sync_cond.wait(ml);
     insync = false;
     return true;
index 6c7e0e4..62a130c 100644 (file)
@@ -5,7 +5,7 @@ class rsm_state_transfer {
  public:
   virtual std::string marshal_state() = 0;
   virtual void unmarshal_state(std::string) = 0;
-  virtual ~rsm_state_transfer() {};
+  virtual ~rsm_state_transfer() {}
 };
 
 #endif
index c61626a..41539fe 100644 (file)
--- a/tprintf.h
+++ b/tprintf.h
@@ -14,44 +14,67 @@ extern std::map<void *, int> instance_name_map;
 extern int next_instance_num;
 extern char tprintf_thread_prefix;
 
+template <class A>
+struct iterator_pair : public std::pair<A, A> {
+    explicit iterator_pair(const A & first, const A & second) : std::pair<A, A>(first, second) {}
+};
+
+template <class A>
+const struct iterator_pair<A> make_iterator_pair(const A & first, const A & second) {
+    return iterator_pair<A>(first, second);
+}
+
+template <class A, class B>
+std::ostream & operator<<(std::ostream &o, const std::pair<A,B> &d) {
+    o << "<" << d.first << "," << d.second << ">";
+    return o;
+}
+
+template <class A>
+std::ostream & operator<<(std::ostream &o, const iterator_pair<A> &d) {
+    o << "[";
+    for (auto i=d.first; i!=d.second; i++) {
+        o << *i;
+        auto j(i);
+        if (++j != d.second)
+            o << ", ";
+    }
+    o << "]";
+    return o;
+}
+
 #define LOG_PREFIX { \
     cerr_mutex.lock(); \
-    auto self = std::this_thread::get_id(); \
-    int tid = thread_name_map[self]; \
-    if (tid==0) \
-        tid = thread_name_map[self] = ++next_thread_num; \
-    auto utime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
-    std::cerr << std::left << std::setw(9) << utime << " "; \
-    std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << tid; \
-    std::cerr << " " << std::setw(24) << __FILE__ << " " << std::setw(18) << __func__; \
+    auto _thread_ = std::this_thread::get_id(); \
+    int _tid_ = thread_name_map[_thread_]; \
+    if (_tid_==0) \
+        _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
+    auto _utime_ = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count() % 1000000000; \
+    std::cerr << std::setfill('0') << std::dec << std::left << std::setw(9) << _utime_ << " "; \
+    std::cerr << tprintf_thread_prefix << std::left << std::setw(2) << _tid_; \
+    std::cerr << " " << std::setw(20) << __FILE__ << " " << std::setw(18) << __func__; \
 }
 #define LOG_THIS_POINTER { \
-    int self = instance_name_map[this]; \
-    if (self==0) \
-        self = instance_name_map[this] = ++next_instance_num; \
-    std::cerr << "#" << std::setw(2) << self; \
+    int _self_ = instance_name_map[this]; \
+    if (_self_==0) \
+        _self_ = instance_name_map[this] = ++next_instance_num; \
+    std::cerr << "#" << std::setw(2) << _self_; \
 }
 #define LOG_SUFFIX { \
     cerr_mutex.unlock(); \
 }
 
-#define LOG_NONMEMBER(x) { \
+#define LOG_NONMEMBER(_x_) { \
     LOG_PREFIX; \
-    std::cerr << x << std::endl; \
+    std::cerr << _x_ << std::endl; \
     LOG_SUFFIX; \
 }
-#define LOG(x) { \
+#define LOG(_x_) { \
     LOG_PREFIX; \
     LOG_THIS_POINTER; \
-    std::cerr << x << std::endl; \
+    std::cerr << _x_ << std::endl; \
     LOG_SUFFIX; \
 }
-#define JOIN(from,to,sep) ({ \
-    ostringstream oss; \
-    for(auto i=from;i!=to;i++) \
-        oss << *i << sep; \
-    oss.str(); \
-})
 #define LOG_FUNC_ENTER { \
     LOG_PREFIX; \
     LOG_THIS_POINTER; \
@@ -75,14 +98,13 @@ extern char tprintf_thread_prefix;
     LOG_SUFFIX; \
 }
 
-#define tprintf(args...) { \
-    int len = snprintf(NULL, 0, args); \
-    char buf[len+1]; \
-    buf[len] = '\0'; \
-    snprintf(buf, len+1, args); \
+#define tprintf(...) { \
+    char *buf = nullptr; \
+    int len = asprintf(&buf, __VA_ARGS__); \
     if (buf[len-1]=='\n') \
         buf[len-1] = '\0'; \
     LOG_NONMEMBER(buf); \
+    free(buf); \
 }
 
 #endif