Explicit refcounting removed from connection object
[invirt/third/libt4.git] / rpc / rpc.cc
index 47ac775..2c1f1a5 100644 (file)
@@ -70,7 +70,7 @@ static sockaddr_in make_sockaddr(const string &hostandport);
 
 rpcc::rpcc(const string & d, bool retrans) :
     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
-    retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+    retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
         set_rand_seed();
@@ -95,11 +95,10 @@ rpcc::rpcc(const string & d, bool retrans) :
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
+    cancel();
     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
-    if(chan_){
+    if(chan_)
         chan_->closeconn();
-        chan_->decref();
-    }
     VERIFY(calls_.size() == 0);
 }
 
@@ -119,24 +118,26 @@ int rpcc::bind(milliseconds to) {
 // Cancel all outstanding calls
 void rpcc::cancel(void) {
     lock ml(m_);
-    LOG("force callers to fail");
-    for(auto &p : calls_){
-        caller *ca = p.second;
+    if (calls_.size()) {
+        LOG("force callers to fail");
+        for(auto &p : calls_){
+            caller *ca = p.second;
 
-        IF_LEVEL(2) LOG("force caller to fail");
-        {
-            lock cl(ca->m);
-            ca->done = true;
-            ca->intret = rpc_const::cancel_failure;
-            ca->c.notify_one();
+            IF_LEVEL(2) LOG("force caller to fail");
+            {
+                lock cl(ca->m);
+                ca->done = true;
+                ca->intret = rpc_const::cancel_failure;
+                ca->c.notify_one();
+            }
         }
-    }
 
-    while (calls_.size () > 0){
-        destroy_wait_ = true;
-        destroy_wait_c_.wait(ml);
+        while (calls_.size () > 0){
+            destroy_wait_ = true;
+            destroy_wait_c_.wait(ml);
+        }
+        LOG("done");
     }
-    LOG("done");
 }
 
 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
@@ -167,13 +168,13 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
     auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
-    connection *ch = NULL;
+    shared_ptr<connection> ch;
 
-    while (1){
-        if(transmit){
-            get_refconn(&ch);
-            if(ch){
-                if(reachable_) {
+    while (1) {
+        if(transmit) {
+            get_refconn(ch);
+            if (ch) {
+                if (reachable_) {
                     request forgot;
                     {
                         lock ml(m_);
@@ -217,7 +218,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
             }
         }
 
-        if(retrans_ && (!ch || ch->isdead())){
+        if(retrans_ && (!ch || ch->isdead())) {
             // since connection is dead, retransmit
             // on the new connection
             transmit = true;
@@ -256,29 +257,19 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
-    if(ch)
-        ch->decref();
-
     // destruction of req automatically frees its buffer
     return (ca.done? ca.intret : rpc_const::timeout_failure);
 }
 
 void
-rpcc::get_refconn(connection **ch)
+rpcc::get_refconn(shared_ptr<connection> & ch)
 {
     lock ml(chan_m_);
-    if(!chan_ || chan_->isdead()){
-        if(chan_)
-            chan_->decref();
+    if (!chan_ || chan_->isdead())
         chan_ = connect_to_dst(dst_, this, lossytest_);
-    }
-    if(ch && chan_){
-        if(*ch){
-            (*ch)->decref();
-        }
-        *ch = chan_;
-        (*ch)->incref();
-    }
+
+    if (chan_)
+        ch = chan_;
 }
 
 // PollMgr's thread is being used to
@@ -287,7 +278,7 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 {
     unmarshall rep(b, true);
     reply_header h;
@@ -368,21 +359,14 @@ rpcs::~rpcs()
 }
 
 bool
-rpcs::got_pdu(connection *c, const string & b)
+rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
 {
     if(!reachable_){
         IF_LEVEL(1) LOG("not reachable");
         return true;
     }
 
-    djob_t *j = new djob_t{c, b};
-    c->incref();
-    bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
-    if(!succ || !reachable_){
-        c->decref();
-        delete j;
-    }
-    return succ;
+    return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
 }
 
 void
@@ -419,20 +403,15 @@ rpcs::updatestat(proc_t proc)
     }
 }
 
-void
-rpcs::dispatch(djob_t *j)
-{
-    connection *c = j->conn;
-    unmarshall req(j->buf, true);
-    delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+    unmarshall req(buf, true);
 
     request_header h;
     req.unpack_header(h);
     proc_t proc = h.proc;
 
-    if(!req.ok()){
-        IF_LEVEL(1) LOG("unmarshall header failed!!!");
-        c->decref();
+    if (!req.ok()) {
+        IF_LEVEL(1) LOG("unmarshall header failed");
         return;
     }
 
@@ -458,7 +437,6 @@ rpcs::dispatch(djob_t *j)
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
             cerr << "unknown proc " << hex << proc << "." << endl;
-            c->decref();
             VERIFY(0);
             return;
         }
@@ -485,14 +463,10 @@ rpcs::dispatch(djob_t *j)
         // save the latest good connection to the client
         {
             lock rwl(conns_m_);
-            if(conns_.find(h.clt_nonce) == conns_.end()){
-                c->incref();
+            if (conns_.find(h.clt_nonce) == conns_.end())
                 conns_[h.clt_nonce] = c;
-            } else if(conns_[h.clt_nonce]->compare(c) < 0){
-                conns_[h.clt_nonce]->decref();
-                c->incref();
+            else if(conns_[h.clt_nonce]->create_time() < c->create_time())
                 conns_[h.clt_nonce] = c;
-            }
         }
 
         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
@@ -530,11 +504,8 @@ rpcs::dispatch(djob_t *j)
             // get the latest connection to the client
             {
                 lock rwl(conns_m_);
-                if(c->isdead() && c != conns_[h.clt_nonce]){
-                    c->decref();
+                if (c->isdead())
                     c = conns_[h.clt_nonce];
-                    c->incref();
-                }
             }
 
             c->send(rep);
@@ -551,7 +522,6 @@ rpcs::dispatch(djob_t *j)
             c->send(rep);
             break;
     }
-    c->decref();
 }
 
 // rpcs::dispatch calls this when an RPC request arrives.