Lots more clean-ups
[invirt/third/libt4.git] / rpc / rpc.cc
index d53776a..47ac775 100644 (file)
@@ -1,8 +1,8 @@
 /*
 /*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a
- single RPC server.  The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+ server.  The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
@@ -11,8 +11,8 @@
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns).  When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns).  When a
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests.  The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
 
  In order to delete a connection object, we must maintain a reference count.
 
  In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object.  A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object.  A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3.  delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish.  It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections.  3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
  x exited worker threads).
  */
 
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
  x exited worker threads).
  */
 
+#include "types.h"
 #include "rpc.h"
 
 #include <sys/types.h>
 #include "rpc.h"
 
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <unistd.h>
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <unistd.h>
-#include "lock.h"
 
 
-#include "jsl_log.h"
-#include "lang/verify.h"
-
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
-rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
-: xid(xxid), un(xun), done(false)
-{
+inline void set_rand_seed() {
+    auto now = time_point_cast<nanoseconds>(steady_clock::now());
+    srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
 }
 
-rpcc::caller::~caller()
-{
-}
+static sockaddr_in make_sockaddr(const string &hostandport);
 
 
-inline
-void set_rand_seed()
-{
-    auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
-    srandom((int)now.time_since_epoch().count()^((int)getpid()));
-}
-
-rpcc::rpcc(sockaddr_in d, bool retrans) :
-    dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
+rpcc::rpcc(const string & d, bool retrans) :
+    dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
         set_rand_seed();
     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
         set_rand_seed();
-        clt_nonce_ = random();
+        clt_nonce_ = (unsigned int)random();
     } else {
         // special client nonce 0 means this client does not
         // require at-most-once logic from the server
     } else {
         // special client nonce 0 means this client does not
         // require at-most-once logic from the server
@@ -100,23 +83,19 @@ rpcc::rpcc(sockaddr_in d, bool retrans) :
     }
 
     char *loss_env = getenv("RPC_LOSSY");
     }
 
     char *loss_env = getenv("RPC_LOSSY");
-    if(loss_env != NULL){
+    if(loss_env)
         lossytest_ = atoi(loss_env);
         lossytest_ = atoi(loss_env);
-    }
 
     // xid starts with 1 and latest received reply starts with 0
     xid_rep_window_.push_back(0);
 
 
     // xid starts with 1 and latest received reply starts with 0
     xid_rep_window_.push_back(0);
 
-    jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
-            clt_nonce_, lossytest_);
+    IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
-rpcc::~rpcc()
-{
-    jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
-            clt_nonce_, chan_?chan_->channo():-1);
+rpcc::~rpcc() {
+    IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
     if(chan_){
         chan_->closeconn();
         chan_->decref();
     if(chan_){
         chan_->closeconn();
         chan_->decref();
@@ -124,33 +103,27 @@ rpcc::~rpcc()
     VERIFY(calls_.size() == 0);
 }
 
     VERIFY(calls_.size() == 0);
 }
 
-int
-rpcc::bind(TO to)
-{
-    int r;
-    int ret = call(rpc_const::bind, 0, r, to);
+int rpcc::bind(milliseconds to) {
+    unsigned int r;
+    int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
         lock ml(m_);
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
     if(ret == 0){
         lock ml(m_);
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
-        jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
-                inet_ntoa(dst_.sin_addr), ret);
+        IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
     }
     return ret;
 };
 
 // Cancel all outstanding calls
     }
     return ret;
 };
 
 // Cancel all outstanding calls
-    void
-rpcc::cancel(void)
-{
+void rpcc::cancel(void) {
     lock ml(m_);
     lock ml(m_);
-    printf("rpcc::cancel: force callers to fail\n");
-    std::map<int,caller*>::iterator iter;
-    for(iter = calls_.begin(); iter != calls_.end(); iter++){
-        caller *ca = iter->second;
+    LOG("force callers to fail");
+    for(auto &p : calls_){
+        caller *ca = p.second;
 
 
-        jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
+        IF_LEVEL(2) LOG("force caller to fail");
         {
             lock cl(ca->m);
             ca->done = true;
         {
             lock cl(ca->m);
             ca->done = true;
@@ -163,22 +136,19 @@ rpcc::cancel(void)
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
-    printf("rpcc::cancel: done\n");
+    LOG("done");
 }
 
 }
 
-int
-rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
-        TO to)
-{
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
 
     caller ca(0, &rep);
-        int xid_rep;
+    int xid_rep;
     {
         lock ml(m_);
 
         if((proc != rpc_const::bind && !bind_done_) ||
                 (proc == rpc_const::bind && bind_done_)){
     {
         lock ml(m_);
 
         if((proc != rpc_const::bind && !bind_done_) ||
                 (proc == rpc_const::bind && bind_done_)){
-            jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
+            IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
             return rpc_const::bind_failure;
         }
 
             return rpc_const::bind_failure;
         }
 
@@ -189,19 +159,12 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
-                             xid_rep_window_.front());
-        req.pack_req_header(h);
-                xid_rep = xid_rep_window_.front();
+        req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        xid_rep = xid_rep_window_.front();
     }
 
     }
 
-    TO curr_to;
-    std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
-        std::chrono::steady_clock::now() +
-        std::chrono::milliseconds(to.to),
-        nextdeadline;
-
-    curr_to.to = to_min.to;
+    milliseconds curr_to = rpc::to_min;
+    auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
     connection *ch = NULL;
 
     bool transmit = true;
     connection *ch = NULL;
@@ -220,37 +183,36 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
                         }
                     }
                     if (forgot.isvalid())
                         }
                     }
                     if (forgot.isvalid())
-                        ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
-                    ch->send(req.cstr(), req.size());
+                        ch->send(forgot.buf);
+                    ch->send(req);
                 }
                 }
-                else jsl_log(JSL_DBG_1, "not reachable\n");
-                jsl_log(JSL_DBG_2,
-                        "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
-                        clt_nonce_, proc, ca.xid, clt_nonce_);
+                else IF_LEVEL(1) LOG("not reachable");
+                IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
+                                " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
             }
             transmit = false; // only send once on a given channel
         }
 
             }
             transmit = false; // only send once on a given channel
         }
 
-        if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
+        if(finaldeadline == time_point<steady_clock>::min())
             break;
 
             break;
 
-        nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + curr_to;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
-            finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
+            finaldeadline = time_point<steady_clock>::min();
         }
 
         {
             lock cal(ca.m);
             while (!ca.done){
         }
 
         {
             lock cal(ca.m);
             while (!ca.done){
-                jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
-                if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
-                    jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
+                IF_LEVEL(2) LOG("wait");
+                if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
+                    IF_LEVEL(2) LOG("timeout");
                     break;
                 }
             }
             if(ca.done){
                     break;
                 }
             }
             if(ca.done){
-                jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
+                IF_LEVEL(2) LOG("reply received");
                 break;
             }
         }
                 break;
             }
         }
@@ -260,7 +222,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
             // on the new connection
             transmit = true;
         }
             // on the new connection
             transmit = true;
         }
-        curr_to.to <<= 1;
+        curr_to *= 2;
     }
 
     {
     }
 
     {
@@ -281,7 +243,7 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
     {
         lock ml(m_);
         if (!dup_req_.isvalid()) {
     {
         lock ml(m_);
         if (!dup_req_.isvalid()) {
-            dup_req_.buf.assign(req.cstr(), req.size());
+            dup_req_.buf = req;
             dup_req_.xid = ca.xid;
         }
         if (xid_rep > xid_rep_done_)
             dup_req_.xid = ca.xid;
         }
         if (xid_rep > xid_rep_done_)
@@ -290,10 +252,9 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
 
     lock cal(ca.m);
 
 
     lock cal(ca.m);
 
-    jsl_log(JSL_DBG_2,
-            "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
-            clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
-            ntohs(dst_.sin_port), ca.done, ca.intret);
+    IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
+                    " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
+                    ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
     if(ch)
         ch->decref();
 
     if(ch)
         ch->decref();
@@ -326,14 +287,14 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *c, char *b, int sz)
+rpcc::got_pdu(connection *, const string & b)
 {
 {
-    unmarshall rep(b, sz);
+    unmarshall rep(b, true);
     reply_header h;
     reply_header h;
-    rep.unpack_reply_header(&h);
+    rep.unpack_header(h);
 
     if(!rep.ok()){
 
     if(!rep.ok()){
-        jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
+        IF_LEVEL(1) LOG("unmarshall header failed!!!");
         return true;
     }
 
         return true;
     }
 
@@ -342,18 +303,17 @@ rpcc::got_pdu(connection *c, char *b, int sz)
     update_xid_rep(h.xid);
 
     if(calls_.find(h.xid) == calls_.end()){
     update_xid_rep(h.xid);
 
     if(calls_.find(h.xid) == calls_.end()){
-        jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
+        IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
         return true;
     }
     caller *ca = calls_[h.xid];
 
     lock cl(ca->m);
     if(!ca->done){
         return true;
     }
     caller *ca = calls_[h.xid];
 
     lock cl(ca->m);
     if(!ca->done){
-        ca->un->take_in(rep);
+        *ca->rep = b;
         ca->intret = h.ret;
         if(ca->intret < 0){
         ca->intret = h.ret;
         if(ca->intret < 0){
-            jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
-                    h.xid, ca->intret);
+            IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
         }
         ca->done = 1;
     }
         }
         ca->done = 1;
     }
@@ -363,15 +323,13 @@ rpcc::got_pdu(connection *c, char *b, int sz)
 
 // assumes thread holds mutex m
 void
 
 // assumes thread holds mutex m
 void
-rpcc::update_xid_rep(unsigned int xid)
+rpcc::update_xid_rep(int xid)
 {
 {
-    std::list<unsigned int>::iterator it;
-
     if(xid <= xid_rep_window_.front()){
         return;
     }
 
     if(xid <= xid_rep_window_.front()){
         return;
     }
 
-    for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
+    for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
         if(*it > xid){
             xid_rep_window_.insert(it, xid);
             goto compress;
         if(*it > xid){
             xid_rep_window_.insert(it, xid);
             goto compress;
@@ -380,30 +338,25 @@ rpcc::update_xid_rep(unsigned int xid)
     xid_rep_window_.push_back(xid);
 
 compress:
     xid_rep_window_.push_back(xid);
 
 compress:
-    it = xid_rep_window_.begin();
+    auto it = xid_rep_window_.begin();
     for (it++; it != xid_rep_window_.end(); it++){
         while (xid_rep_window_.front() + 1 == *it)
             xid_rep_window_.pop_front();
     }
 }
 
     for (it++; it != xid_rep_window_.end(); it++){
         while (xid_rep_window_.front() + 1 == *it)
             xid_rep_window_.pop_front();
     }
 }
 
-
-rpcs::rpcs(unsigned int p1, int count)
-  : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
+rpcs::rpcs(in_port_t p1, size_t count)
+  : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
 {
     set_rand_seed();
 {
     set_rand_seed();
-    nonce_ = random();
-    jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
+    nonce_ = (unsigned int)random();
+    IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
 
-    char *loss_env = getenv("RPC_LOSSY");
-    if(loss_env != NULL){
-        lossytest_ = atoi(loss_env);
-    }
-
-    reg(rpc_const::bind, this, &rpcs::rpcbind);
-    dispatchpool_ = new ThrPool(6,false);
+    reg(rpc_const::bind, &rpcs::rpcbind, this);
+    dispatchpool_ = new ThrPool(6, false);
 
 
-    listener_ = new tcpsconn(this, port_, lossytest_);
+    char *loss_env = getenv("RPC_LOSSY");
+    listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
 }
 
 rpcs::~rpcs()
 }
 
 rpcs::~rpcs()
@@ -415,16 +368,16 @@ rpcs::~rpcs()
 }
 
 bool
 }
 
 bool
-rpcs::got_pdu(connection *c, char *b, int sz)
+rpcs::got_pdu(connection *c, const string & b)
 {
 {
-        if(!reachable_){
-            jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
-            return true;
-        }
+    if(!reachable_){
+        IF_LEVEL(1) LOG("not reachable");
+        return true;
+    }
 
 
-    djob_t *j = new djob_t(c, b, sz);
+    djob_t *j = new djob_t{c, b};
     c->incref();
     c->incref();
-    bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
+    bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
     if(!succ || !reachable_){
         c->decref();
         delete j;
     if(!succ || !reachable_){
         c->decref();
         delete j;
@@ -433,7 +386,7 @@ rpcs::got_pdu(connection *c, char *b, int sz)
 }
 
 void
 }
 
 void
-rpcs::reg1(unsigned int proc, handler *h)
+rpcs::reg1(proc_t proc, handler *h)
 {
     lock pl(procs_m_);
     VERIFY(procs_.count(proc) == 0);
 {
     lock pl(procs_m_);
     VERIFY(procs_.count(proc) == 0);
@@ -442,30 +395,26 @@ rpcs::reg1(unsigned int proc, handler *h)
 }
 
 void
 }
 
 void
-rpcs::updatestat(unsigned int proc)
+rpcs::updatestat(proc_t proc)
 {
     lock cl(count_m_);
     counts_[proc]++;
     curr_counts_--;
     if(curr_counts_ == 0){
 {
     lock cl(count_m_);
     counts_[proc]++;
     curr_counts_--;
     if(curr_counts_ == 0){
-        std::map<int, int>::iterator i;
-        printf("RPC STATS: ");
-        for (i = counts_.begin(); i != counts_.end(); i++){
-            printf("%x:%d ", i->first, i->second);
-        }
-        printf("\n");
+        LOG("RPC STATS: ");
+        for (auto i = counts_.begin(); i != counts_.end(); i++)
+            LOG(hex << i->first << ":" << dec << i->second);
 
         lock rwl(reply_window_m_);
 
         lock rwl(reply_window_m_);
-        std::map<unsigned int,std::list<reply_t> >::iterator clt;
 
 
-        unsigned int totalrep = 0, maxrep = 0;
-        for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-            totalrep += clt->second.size();
-            if(clt->second.size() > maxrep)
-                maxrep = clt->second.size();
+        size_t totalrep = 0, maxrep = 0;
+        for (auto clt : reply_window_) {
+            totalrep += clt.second.size();
+            if(clt.second.size() > maxrep)
+                maxrep = clt.second.size();
         }
         }
-        jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
-                        (int) reply_window_.size()-1, totalrep, maxrep);
+        IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
+                        totalrep << " max per client " << maxrep);
         curr_counts_ = counting_;
     }
 }
         curr_counts_ = counting_;
     }
 }
@@ -474,34 +423,32 @@ void
 rpcs::dispatch(djob_t *j)
 {
     connection *c = j->conn;
 rpcs::dispatch(djob_t *j)
 {
     connection *c = j->conn;
-    unmarshall req(j->buf, j->sz);
+    unmarshall req(j->buf, true);
     delete j;
 
     delete j;
 
-    req_header h;
-    req.unpack_req_header(&h);
-    int proc = h.proc;
+    request_header h;
+    req.unpack_header(h);
+    proc_t proc = h.proc;
 
     if(!req.ok()){
 
     if(!req.ok()){
-        jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
+        IF_LEVEL(1) LOG("unmarshall header failed!!!");
         c->decref();
         return;
     }
 
         c->decref();
         return;
     }
 
-    jsl_log(JSL_DBG_2,
-            "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
-            h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
+    IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+                    dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
 
     marshall rep;
-    reply_header rh(h.xid,0);
+    reply_header rh{h.xid,0};
 
     // is client sending to an old instance of server?
     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
 
     // is client sending to an old instance of server?
     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
-        jsl_log(JSL_DBG_2,
-                "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
-                h.srv_nonce, nonce_, h.proc);
+        IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
+                        " (current " << nonce_ << ") proc " << hex << h.proc);
         rh.ret = rpc_const::oldsrv_failure;
         rh.ret = rpc_const::oldsrv_failure;
-        rep.pack_reply_header(rh);
-        c->send(rep.cstr(),rep.size());
+        rep.pack_header(rh);
+        c->send(rep);
         return;
     }
 
         return;
     }
 
@@ -510,10 +457,9 @@ rpcs::dispatch(djob_t *j)
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
-            fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
-                proc);
+            cerr << "unknown proc " << hex << proc << "." << endl;
             c->decref();
             c->decref();
-                        VERIFY(0);
+            VERIFY(0);
             return;
         }
 
             return;
         }
 
@@ -521,8 +467,7 @@ rpcs::dispatch(djob_t *j)
     }
 
     rpcs::rpcstate_t stat;
     }
 
     rpcs::rpcstate_t stat;
-    char *b1;
-    int sz1;
+    string b1;
 
     if(h.clt_nonce){
         // have i seen this client before?
 
     if(h.clt_nonce){
         // have i seen this client before?
@@ -532,15 +477,14 @@ rpcs::dispatch(djob_t *j)
             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-                jsl_log(JSL_DBG_2,
-                        "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
-                        h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
+                IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+                                " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
             }
         }
 
         // save the latest good connection to the client
         {
             }
         }
 
         // save the latest good connection to the client
         {
-            lock rwl(conss_m_);
+            lock rwl(conns_m_);
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
@@ -551,44 +495,41 @@ rpcs::dispatch(djob_t *j)
             }
         }
 
             }
         }
 
-        stat = checkduplicate_and_update(h.clt_nonce, h.xid,
-                                                 h.xid_rep, &b1, &sz1);
+        stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
     } else {
         // this client does not require at most once logic
         stat = NEW;
     }
 
     } else {
         // this client does not require at most once logic
         stat = NEW;
     }
 
-    switch (stat){
+    switch (stat) {
         case NEW: // new request
         case NEW: // new request
-            if(counting_){
+            if (counting_){
                 updatestat(proc);
             }
 
                 updatestat(proc);
             }
 
-            rh.ret = f->fn(req, rep);
-                        if (rh.ret == rpc_const::unmarshal_args_failure) {
-                                fprintf(stderr, "rpcs::dispatch: failed to"
-                                       " unmarshall the arguments. You are"
-                                       " probably calling RPC 0x%x with wrong"
-                                       " types of arguments.\n", proc);
-                                VERIFY(0);
-                        }
+            rh.ret = (*f)(req, rep);
+            if (rh.ret == rpc_const::unmarshal_args_failure) {
+                cerr << "failed to unmarshall the arguments. You are " <<
+                        "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+                        "types of arguments." << endl;
+                VERIFY(0);
+            }
             VERIFY(rh.ret >= 0);
 
             VERIFY(rh.ret >= 0);
 
-            rep.pack_reply_header(rh);
-            rep.take_buf(&b1,&sz1);
+            rep.pack_header(rh);
+            b1 = rep;
 
 
-            jsl_log(JSL_DBG_2,
-                    "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
-                    sz1, h.xid, proc, rh.ret, h.clt_nonce);
+            IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
+                            h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
 
 
-            if(h.clt_nonce > 0){
+            if (h.clt_nonce > 0) {
                 // only record replies for clients that require at-most-once logic
                 // only record replies for clients that require at-most-once logic
-                add_reply(h.clt_nonce, h.xid, b1, sz1);
+                add_reply(h.clt_nonce, h.xid, b1);
             }
 
             // get the latest connection to the client
             {
             }
 
             // get the latest connection to the client
             {
-                lock rwl(conss_m_);
+                lock rwl(conns_m_);
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
@@ -596,23 +537,18 @@ rpcs::dispatch(djob_t *j)
                 }
             }
 
                 }
             }
 
-            c->send(b1, sz1);
-            if(h.clt_nonce == 0){
-                // reply is not added to at-most-once window, free it
-                free(b1);
-            }
+            c->send(rep);
             break;
         case INPROGRESS: // server is working on this request
             break;
         case DONE: // duplicate and we still have the response
             break;
         case INPROGRESS: // server is working on this request
             break;
         case DONE: // duplicate and we still have the response
-            c->send(b1, sz1);
+            c->send(b1);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
-            jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
-                    h.xid, h.clt_nonce);
+            IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
             rh.ret = rpc_const::atmostonce_failure;
             rh.ret = rpc_const::atmostonce_failure;
-            rep.pack_reply_header(rh);
-            c->send(rep.cstr(),rep.size());
+            rep.pack_header(rh);
+            c->send(rep);
             break;
     }
     c->decref();
             break;
     }
     c->decref();
@@ -630,35 +566,32 @@ rpcs::dispatch(djob_t *j)
 // returns one of:
 //   NEW: never seen this xid before.
 //   INPROGRESS: seen this xid, and still processing it.
 // returns one of:
 //   NEW: never seen this xid before.
 //   INPROGRESS: seen this xid, and still processing it.
-//   DONE: seen this xid, previous reply returned in *b and *sz.
+//   DONE: seen this xid, previous reply returned in b.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
-        unsigned int xid_rep, char **b, int *sz)
+rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
+        int xid_rep, string & b)
 {
     lock rwl(reply_window_m_);
 
 {
     lock rwl(reply_window_m_);
 
-    std::list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t> &l = reply_window_[clt_nonce];
 
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
 
 
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
 
-    unsigned int past_xid_rep = l.begin()->xid;
+    int past_xid_rep = l.begin()->xid;
 
 
-    std::list<reply_t>::iterator start = l.begin(), it;
-    it = ++start;
+    list<reply_t>::iterator start = l.begin(), it = ++start;
 
 
-    if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
+    if (past_xid_rep < xid_rep || past_xid_rep == -1) {
         // scan for deletion candidates
         // scan for deletion candidates
-        for (; it != l.end() && it->xid < xid_rep; it++) {
-            if (it->cb_present)
-                free(it->buf);
-        }
+        while (it != l.end() && it->xid < xid_rep)
+            it++;
         l.erase(start, it);
         l.begin()->xid = xid_rep;
     }
 
         l.erase(start, it);
         l.begin()->xid = xid_rep;
     }
 
-    if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
+    if (xid < past_xid_rep && past_xid_rep != -1)
         return FORGOTTEN;
 
     // skip non-deletion candidates
         return FORGOTTEN;
 
     // skip non-deletion candidates
@@ -669,12 +602,10 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
     if (it != l.end() && it->xid == xid) {
         if (it->cb_present) {
             // return information about the remembered reply
     if (it != l.end() && it->xid == xid) {
         if (it->cb_present) {
             // return information about the remembered reply
-            *b = it->buf;
-            *sz = it->sz;
+            b = it->buf;
             return DONE;
             return DONE;
-        } else {
-            return INPROGRESS;
         }
         }
+        return INPROGRESS;
     } else {
         // remember that a new request has arrived
         l.insert(it, reply_t(xid));
     } else {
         // remember that a new request has arrived
         l.insert(it, reply_t(xid));
@@ -683,336 +614,66 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
 }
 
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
 }
 
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
-// and passes the return value in b and sz.
-// add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for
-// calling free(b).
-void
-rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
-        char *b, int sz)
-{
+// and passes the return value in b.
+// add_reply() should remember b.
+// free_reply_window() and checkduplicate_and_update are responsible for
+// cleaning up the remembered values.
+void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
     lock rwl(reply_window_m_);
     // remember the RPC reply value
-    std::list<reply_t> &l = reply_window_[clt_nonce];
-    std::list<reply_t>::iterator it = l.begin();
+    list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t>::iterator it = l.begin();
     // skip to our place in the list
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
     // skip to our place in the list
     for (it++; it != l.end() && it->xid < xid; it++);
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
-        fprintf(stderr, "Could not find reply struct in add_reply");
-        l.insert(it, reply_t(xid, b, sz));
+        cerr << "Could not find reply struct in add_reply" << endl;
+        l.insert(it, reply_t(xid, b));
     } else {
     } else {
-        *it = reply_t(xid, b, sz);
+        *it = reply_t(xid, b);
     }
 }
 
     }
 }
 
-void
-rpcs::free_reply_window(void)
-{
-    std::map<unsigned int,std::list<reply_t> >::iterator clt;
-    std::list<reply_t>::iterator it;
-
+void rpcs::free_reply_window(void) {
     lock rwl(reply_window_m_);
     lock rwl(reply_window_m_);
-    for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
-        for (it = clt->second.begin(); it != clt->second.end(); it++){
-            if (it->cb_present)
-                free(it->buf);
-        }
-        clt->second.clear();
-    }
     reply_window_.clear();
 }
 
     reply_window_.clear();
 }
 
-// rpc handler
-int
-rpcs::rpcbind(int a, int &r)
-{
-    jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
+int rpcs::rpcbind(unsigned int &r, int) {
+    IF_LEVEL(2) LOG("called return nonce " << nonce_);
     r = nonce_;
     return 0;
 }
 
     r = nonce_;
     return 0;
 }
 
-void
-marshall::rawbyte(unsigned char x)
-{
-    if(_ind >= _capa){
-        _capa *= 2;
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    _buf[_ind++] = x;
-}
-
-void
-marshall::rawbytes(const char *p, int n)
-{
-    if((_ind+n) > _capa){
-        _capa = _capa > n? 2*_capa:(_capa+n);
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    memcpy(_buf+_ind, p, n);
-    _ind += n;
-}
-
-marshall &
-operator<<(marshall &m, bool x)
-{
-    m.rawbyte(x);
-    return m;
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port);
 
 
-marshall &
-operator<<(marshall &m, unsigned char x)
-{
-    m.rawbyte(x);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, char x)
-{
-    m << (unsigned char) x;
-    return m;
-}
-
-
-marshall &
-operator<<(marshall &m, unsigned short x)
-{
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, short x)
-{
-    m << (unsigned short) x;
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, unsigned int x)
-{
-    // network order is big-endian
-    m.rawbyte((x >> 24) & 0xff);
-    m.rawbyte((x >> 16) & 0xff);
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, int x)
-{
-    m << (unsigned int) x;
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, const std::string &s)
-{
-    m << (unsigned int) s.size();
-    m.rawbytes(s.data(), s.size());
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, unsigned long long x)
-{
-    m << (unsigned int) (x >> 32);
-    m << (unsigned int) x;
-    return m;
-}
-
-void
-marshall::pack(int x)
-{
-    rawbyte((x >> 24) & 0xff);
-    rawbyte((x >> 16) & 0xff);
-    rawbyte((x >> 8) & 0xff);
-    rawbyte(x & 0xff);
-}
-
-void
-unmarshall::unpack(int *x)
-{
-    (*x) = (rawbyte() & 0xff) << 24;
-    (*x) |= (rawbyte() & 0xff) << 16;
-    (*x) |= (rawbyte() & 0xff) << 8;
-    (*x) |= rawbyte() & 0xff;
-}
-
-// take the contents from another unmarshall object
-void
-unmarshall::take_in(unmarshall &another)
-{
-    if(_buf)
-        free(_buf);
-    another.take_buf(&_buf, &_sz);
-    _ind = RPC_HEADER_SZ;
-    _ok = _sz >= RPC_HEADER_SZ?true:false;
-}
-
-bool
-unmarshall::okdone()
-{
-    if(ok() && _ind == _sz){
-        return true;
-    } else {
-        return false;
-    }
-}
-
-unsigned int
-unmarshall::rawbyte()
-{
-    char c = 0;
-    if(_ind >= _sz)
-        _ok = false;
+static sockaddr_in make_sockaddr(const string &hostandport) {
+    auto colon = hostandport.find(':');
+    if (colon == string::npos)
+        return make_sockaddr("127.0.0.1", hostandport);
     else
     else
-        c = _buf[_ind++];
-    return c;
+        return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
 }
 
 }
 
-unmarshall &
-operator>>(unmarshall &u, bool &x)
-{
-    x = (bool) u.rawbyte() ;
-    return u;
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port) {
+    sockaddr_in dst;
+    bzero(&dst, sizeof(dst));
+    dst.sin_family = AF_INET;
 
 
-unmarshall &
-operator>>(unmarshall &u, unsigned char &x)
-{
-    x = (unsigned char) u.rawbyte() ;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, char &x)
-{
-    x = (char) u.rawbyte();
-    return u;
-}
+    struct in_addr a{inet_addr(host.c_str())};
 
 
+    if(a.s_addr != INADDR_NONE)
+        dst.sin_addr.s_addr = a.s_addr;
+    else {
+        struct hostent *hp = gethostbyname(host.c_str());
 
 
-unmarshall &
-operator>>(unmarshall &u, unsigned short &x)
-{
-    x = (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, short &x)
-{
-    x = (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned int &x)
-{
-    x = (u.rawbyte() & 0xff) << 24;
-    x |= (u.rawbyte() & 0xff) << 16;
-    x |= (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, int &x)
-{
-    x = (u.rawbyte() & 0xff) << 24;
-    x |= (u.rawbyte() & 0xff) << 16;
-    x |= (u.rawbyte() & 0xff) << 8;
-    x |= u.rawbyte() & 0xff;
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, unsigned long long &x)
-{
-    unsigned int h, l;
-    u >> h;
-    u >> l;
-    x = l | ((unsigned long long) h << 32);
-    return u;
-}
-
-unmarshall &
-operator>>(unmarshall &u, std::string &s)
-{
-    unsigned sz;
-    u >> sz;
-    if(u.ok())
-        u.rawbytes(s, sz);
-    return u;
-}
-
-void
-unmarshall::rawbytes(std::string &ss, unsigned int n)
-{
-    if((_ind+n) > (unsigned)_sz){
-        _ok = false;
-    } else {
-        std::string tmps = std::string(_buf+_ind, n);
-        swap(ss, tmps);
-        VERIFY(ss.size() == n);
-        _ind += n;
-    }
-}
-
-bool operator<(const sockaddr_in &a, const sockaddr_in &b){
-    return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
-            ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
-             ((a.sin_port < b.sin_port))));
-}
-
-/*---------------auxilary function--------------*/
-void
-make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
-
-    char host[200];
-    const char *localhost = "127.0.0.1";
-    const char *port = index(hostandport, ':');
-    if(port == NULL){
-        memcpy(host, localhost, strlen(localhost)+1);
-        port = hostandport;
-    } else {
-        memcpy(host, hostandport, port-hostandport);
-        host[port-hostandport] = '\0';
-        port++;
-    }
-
-    make_sockaddr(host, port, dst);
-
-}
-
-void
-make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
-
-    in_addr_t a;
-
-    bzero(dst, sizeof(*dst));
-    dst->sin_family = AF_INET;
-
-    a = inet_addr(host);
-    if(a != INADDR_NONE){
-        dst->sin_addr.s_addr = a;
-    } else {
-        struct hostent *hp = gethostbyname(host);
-        if(hp == 0 || hp->h_length != 4){
-            fprintf(stderr, "cannot find host name %s\n", host);
+        if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
+            cerr << "cannot find host name " << host << endl;
             exit(1);
         }
             exit(1);
         }
-        dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
+        memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
+        dst.sin_addr.s_addr = a.s_addr;
     }
     }
-    dst->sin_port = htons(atoi(port));
+    dst.sin_port = hton((in_port_t)stoi(port));
+    return dst;
 }
 }