Explicit refcounting removed from connection object
[invirt/third/libt4.git] / rpc / rpc.cc
index 32b25ab..2c1f1a5 100644 (file)
@@ -1,8 +1,8 @@
 /*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a
- single RPC server.  The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+ server.  The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
@@ -11,8 +11,8 @@
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns).  When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns).  When a
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests.  The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
 
  In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object.  A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object.  A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
@@ -45,9 +43,9 @@
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3.  delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish.  It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections.  3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
@@ -63,9 +61,6 @@
 #include <netdb.h>
 #include <unistd.h>
 
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
@@ -75,7 +70,7 @@ static sockaddr_in make_sockaddr(const string &hostandport);
 
 rpcc::rpcc(const string & d, bool retrans) :
     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
-    retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+    retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
         set_rand_seed();
@@ -100,15 +95,14 @@ rpcc::rpcc(const string & d, bool retrans) :
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
+    cancel();
     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
-    if(chan_){
+    if(chan_)
         chan_->closeconn();
-        chan_->decref();
-    }
     VERIFY(calls_.size() == 0);
 }
 
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
@@ -124,27 +118,29 @@ int rpcc::bind(TO to) {
 // Cancel all outstanding calls
 void rpcc::cancel(void) {
     lock ml(m_);
-    LOG("force callers to fail");
-    for(auto &p : calls_){
-        caller *ca = p.second;
+    if (calls_.size()) {
+        LOG("force callers to fail");
+        for(auto &p : calls_){
+            caller *ca = p.second;
 
-        IF_LEVEL(2) LOG("force caller to fail");
-        {
-            lock cl(ca->m);
-            ca->done = true;
-            ca->intret = rpc_const::cancel_failure;
-            ca->c.notify_one();
+            IF_LEVEL(2) LOG("force caller to fail");
+            {
+                lock cl(ca->m);
+                ca->done = true;
+                ca->intret = rpc_const::cancel_failure;
+                ca->c.notify_one();
+            }
         }
-    }
 
-    while (calls_.size () > 0){
-        destroy_wait_ = true;
-        destroy_wait_c_.wait(ml);
+        while (calls_.size () > 0){
+            destroy_wait_ = true;
+            destroy_wait_c_.wait(ml);
+        }
+        LOG("done");
     }
-    LOG("done");
 }
 
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
     int xid_rep;
@@ -168,20 +164,17 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         xid_rep = xid_rep_window_.front();
     }
 
-    TO curr_to;
-    auto finaldeadline = steady_clock::now() + milliseconds(to.to),
-        nextdeadline = finaldeadline;
-
-    curr_to.to = to_min.to;
+    milliseconds curr_to = rpc::to_min;
+    auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
-    connection *ch = NULL;
+    shared_ptr<connection> ch;
 
-    while (1){
-        if(transmit){
-            get_refconn(&ch);
-            if(ch){
-                if(reachable_) {
+    while (1) {
+        if(transmit) {
+            get_refconn(ch);
+            if (ch) {
+                if (reachable_) {
                     request forgot;
                     {
                         lock ml(m_);
@@ -204,7 +197,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
-        nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + curr_to;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
@@ -225,12 +218,12 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
             }
         }
 
-        if(retrans_ && (!ch || ch->isdead())){
+        if(retrans_ && (!ch || ch->isdead())) {
             // since connection is dead, retransmit
             // on the new connection
             transmit = true;
         }
-        curr_to.to <<= 1;
+        curr_to *= 2;
     }
 
     {
@@ -264,29 +257,19 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
-    if(ch)
-        ch->decref();
-
     // destruction of req automatically frees its buffer
     return (ca.done? ca.intret : rpc_const::timeout_failure);
 }
 
 void
-rpcc::get_refconn(connection **ch)
+rpcc::get_refconn(shared_ptr<connection> & ch)
 {
     lock ml(chan_m_);
-    if(!chan_ || chan_->isdead()){
-        if(chan_)
-            chan_->decref();
+    if (!chan_ || chan_->isdead())
         chan_ = connect_to_dst(dst_, this, lossytest_);
-    }
-    if(ch && chan_){
-        if(*ch){
-            (*ch)->decref();
-        }
-        *ch = chan_;
-        (*ch)->incref();
-    }
+
+    if (chan_)
+        ch = chan_;
 }
 
 // PollMgr's thread is being used to
@@ -295,7 +278,7 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 {
     unmarshall rep(b, true);
     reply_header h;
@@ -376,21 +359,14 @@ rpcs::~rpcs()
 }
 
 bool
-rpcs::got_pdu(connection *c, const string & b)
+rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
 {
     if(!reachable_){
         IF_LEVEL(1) LOG("not reachable");
         return true;
     }
 
-    djob_t *j = new djob_t{c, b};
-    c->incref();
-    bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
-    if(!succ || !reachable_){
-        c->decref();
-        delete j;
-    }
-    return succ;
+    return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
 }
 
 void
@@ -427,20 +403,15 @@ rpcs::updatestat(proc_t proc)
     }
 }
 
-void
-rpcs::dispatch(djob_t *j)
-{
-    connection *c = j->conn;
-    unmarshall req(j->buf, true);
-    delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+    unmarshall req(buf, true);
 
     request_header h;
     req.unpack_header(h);
     proc_t proc = h.proc;
 
-    if(!req.ok()){
-        IF_LEVEL(1) LOG("unmarshall header failed!!!");
-        c->decref();
+    if (!req.ok()) {
+        IF_LEVEL(1) LOG("unmarshall header failed");
         return;
     }
 
@@ -466,7 +437,6 @@ rpcs::dispatch(djob_t *j)
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
             cerr << "unknown proc " << hex << proc << "." << endl;
-            c->decref();
             VERIFY(0);
             return;
         }
@@ -492,15 +462,11 @@ rpcs::dispatch(djob_t *j)
 
         // save the latest good connection to the client
         {
-            lock rwl(conss_m_);
-            if(conns_.find(h.clt_nonce) == conns_.end()){
-                c->incref();
+            lock rwl(conns_m_);
+            if (conns_.find(h.clt_nonce) == conns_.end())
                 conns_[h.clt_nonce] = c;
-            } else if(conns_[h.clt_nonce]->compare(c) < 0){
-                conns_[h.clt_nonce]->decref();
-                c->incref();
+            else if(conns_[h.clt_nonce]->create_time() < c->create_time())
                 conns_[h.clt_nonce] = c;
-            }
         }
 
         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
@@ -537,12 +503,9 @@ rpcs::dispatch(djob_t *j)
 
             // get the latest connection to the client
             {
-                lock rwl(conss_m_);
-                if(c->isdead() && c != conns_[h.clt_nonce]){
-                    c->decref();
+                lock rwl(conns_m_);
+                if (c->isdead())
                     c = conns_[h.clt_nonce];
-                    c->incref();
-                }
             }
 
             c->send(rep);
@@ -559,7 +522,6 @@ rpcs::dispatch(djob_t *j)
             c->send(rep);
             break;
     }
-    c->decref();
 }
 
 // rpcs::dispatch calls this when an RPC request arrives.