Lots more clean-ups
[invirt/third/libt4.git] / rpc / rpc.cc
index 32b25ab..47ac775 100644 (file)
@@ -1,8 +1,8 @@
 /*
 /*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a
- single RPC server.  The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+ server.  The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
@@ -11,8 +11,8 @@
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns).  When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns).  When a
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests.  The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
 
  In order to delete a connection object, we must maintain a reference count.
 
  In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object.  A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object.  A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
@@ -45,9 +43,9 @@
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3.  delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish.  It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections.  3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
@@ -63,9 +61,6 @@
 #include <netdb.h>
 #include <unistd.h>
 
 #include <netdb.h>
 #include <unistd.h>
 
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
@@ -108,7 +103,7 @@ rpcc::~rpcc() {
     VERIFY(calls_.size() == 0);
 }
 
     VERIFY(calls_.size() == 0);
 }
 
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
@@ -144,7 +139,7 @@ void rpcc::cancel(void) {
     LOG("done");
 }
 
     LOG("done");
 }
 
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
     int xid_rep;
 
     caller ca(0, &rep);
     int xid_rep;
@@ -168,11 +163,8 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         xid_rep = xid_rep_window_.front();
     }
 
         xid_rep = xid_rep_window_.front();
     }
 
-    TO curr_to;
-    auto finaldeadline = steady_clock::now() + milliseconds(to.to),
-        nextdeadline = finaldeadline;
-
-    curr_to.to = to_min.to;
+    milliseconds curr_to = rpc::to_min;
+    auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
     connection *ch = NULL;
 
     bool transmit = true;
     connection *ch = NULL;
@@ -204,7 +196,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
-        nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + curr_to;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
@@ -230,7 +222,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
             // on the new connection
             transmit = true;
         }
             // on the new connection
             transmit = true;
         }
-        curr_to.to <<= 1;
+        curr_to *= 2;
     }
 
     {
     }
 
     {
@@ -492,7 +484,7 @@ rpcs::dispatch(djob_t *j)
 
         // save the latest good connection to the client
         {
 
         // save the latest good connection to the client
         {
-            lock rwl(conss_m_);
+            lock rwl(conns_m_);
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
@@ -537,7 +529,7 @@ rpcs::dispatch(djob_t *j)
 
             // get the latest connection to the client
             {
 
             // get the latest connection to the client
             {
-                lock rwl(conss_m_);
+                lock rwl(conns_m_);
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];