Explicit refcounting removed from connection object
authorPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:39:39 +0000 (12:39 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:39:39 +0000 (12:39 -0400)
rpc/connection.cc
rpc/connection.h
rpc/rpc.cc
rpc/rpc.h
types.h

index 33e891c..cc9f03c 100644 (file)
@@ -20,20 +20,11 @@ connection::connection(chanmgr *m1, int f1, int l1)
 }
 
 connection::~connection() {
+    closeconn();
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
 }
 
-void connection::incref() {
-    lock rl(ref_m_);
-    refno_++;
-}
-
-bool connection::isdead() {
-    lock ml(m_);
-    return dead_;
-}
-
 void connection::closeconn() {
     {
         lock ml(m_);
@@ -47,29 +38,6 @@ void connection::closeconn() {
     PollMgr::Instance().block_remove_fd(fd_);
 }
 
-void connection::decref() {
-    bool dead = false;
-    {
-        lock rl(ref_m_);
-        refno_--;
-        VERIFY(refno_>=0);
-        if (refno_==0) {
-            lock ml(m_);
-            dead = dead_;
-        }
-    }
-    if (dead)
-        delete this;
-}
-
-int connection::compare(connection *another) {
-    if (create_time_ > another->create_time_)
-        return 1;
-    if (create_time_ < another->create_time_)
-        return -1;
-    return 0;
-}
-
 bool connection::send(const string & b) {
     lock ml(m_);
 
@@ -131,7 +99,7 @@ void connection::write_cb(int s) {
     send_complete_.notify_one();
 }
 
-//fd_ is ready to be read
+// fd_ is ready to be read
 void connection::read_cb(int s) {
     lock ml(m_);
     VERIFY(fd_ == s);
@@ -154,8 +122,8 @@ void connection::read_cb(int s) {
     }
 
     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
-        if (mgr_->got_pdu(this, rpdu_.buf)) {
-            //chanmgr has successfully consumed the pdu
+        if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
+            // chanmgr has successfully consumed the pdu
             rpdu_.buf.clear();
             rpdu_.solong = 0;
         }
@@ -277,12 +245,8 @@ tcpsconn::~tcpsconn()
     pipe_[1].close();
     th_.join();
 
-    // close all the active connections
-    map<int, connection *>::iterator i;
-    for (i = conns_.begin(); i != conns_.end(); i++) {
-        i->second->closeconn();
-        i->second->decref();
-    }
+    for (auto & i : conns_)
+        i.second->closeconn();
 }
 
 void tcpsconn::process_accept() {
@@ -295,19 +259,13 @@ void tcpsconn::process_accept() {
     }
 
     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
-    connection *ch = new connection(mgr_, s1, lossy_);
+    auto ch = make_shared<connection>(mgr_, s1, lossy_);
 
-    // garbage collect all dead connections with refcount of 1
+    // garbage collect dead connections
     for (auto i = conns_.begin(); i != conns_.end();) {
-        if (i->second->isdead() && i->second->ref() == 1) {
-            IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
-            i->second->decref();
-            // Careful not to reuse i right after erase. (i++) will
-            // be evaluated before the erase call because in C++,
-            // there is a sequence point before a function call.
-            // See http://en.wikipedia.org/wiki/Sequence_point.
+        if (i->second->isdead())
             conns_.erase(i++);
-        } else
+        else
             ++i;
     }
 
@@ -347,16 +305,16 @@ void tcpsconn::accept_conn() {
     }
 }
 
-connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
+shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
     int s = socket(AF_INET, SOCK_STREAM, 0);
     int yes = 1;
     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
         close(s);
-        return NULL;
+        return nullptr;
     }
     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
-    return new connection(mgr, s, lossy);
+    return make_shared<connection>(mgr, s, lossy);
 }
 
index 3e19a93..7032b8c 100644 (file)
@@ -16,11 +16,11 @@ class connection;
 
 class chanmgr {
     public:
-        virtual bool got_pdu(connection *c, const string & b) = 0;
+        virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
         virtual ~chanmgr() {}
 };
 
-class connection : public aio_callback {
+class connection : public aio_callback, public enable_shared_from_this<connection> {
     public:
         struct charbuf {
             string buf;
@@ -31,18 +31,14 @@ class connection : public aio_callback {
         ~connection();
 
         int channo() { return fd_; }
-        bool isdead();
+        bool isdead() { lock ml(m_); return dead_; }
         void closeconn();
 
         bool send(const string & b);
         void write_cb(int s);
         void read_cb(int s);
 
-        void incref();
-        void decref();
-        int ref() { lock rl(ref_m_); return refno_; }
-
-        int compare(connection *another);
+        time_point<steady_clock> create_time() const { return create_time_; }
 
     private:
 
@@ -59,11 +55,9 @@ class connection : public aio_callback {
         time_point<steady_clock> create_time_;
 
         int waiters_ = 0;
-        int refno_ = 1;
         int lossy_ = 0;
 
         mutex m_;
-        mutex ref_m_;
         cond send_complete_;
         cond send_wait_;
 };
@@ -83,7 +77,7 @@ class tcpsconn {
         socket_t tcp_; // listens for connections
         chanmgr *mgr_;
         int lossy_;
-        map<int, connection *> conns_;
+        map<int, shared_ptr<connection>> conns_;
 
         void process_accept();
 };
@@ -95,5 +89,5 @@ struct bundle {
     int lossy;
 };
 
-connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
+shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
 #endif
index 47ac775..2c1f1a5 100644 (file)
@@ -70,7 +70,7 @@ static sockaddr_in make_sockaddr(const string &hostandport);
 
 rpcc::rpcc(const string & d, bool retrans) :
     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
-    retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
+    retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
 {
     if(retrans){
         set_rand_seed();
@@ -95,11 +95,10 @@ rpcc::rpcc(const string & d, bool retrans) :
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
+    cancel();
     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
-    if(chan_){
+    if(chan_)
         chan_->closeconn();
-        chan_->decref();
-    }
     VERIFY(calls_.size() == 0);
 }
 
@@ -119,24 +118,26 @@ int rpcc::bind(milliseconds to) {
 // Cancel all outstanding calls
 void rpcc::cancel(void) {
     lock ml(m_);
-    LOG("force callers to fail");
-    for(auto &p : calls_){
-        caller *ca = p.second;
+    if (calls_.size()) {
+        LOG("force callers to fail");
+        for(auto &p : calls_){
+            caller *ca = p.second;
 
-        IF_LEVEL(2) LOG("force caller to fail");
-        {
-            lock cl(ca->m);
-            ca->done = true;
-            ca->intret = rpc_const::cancel_failure;
-            ca->c.notify_one();
+            IF_LEVEL(2) LOG("force caller to fail");
+            {
+                lock cl(ca->m);
+                ca->done = true;
+                ca->intret = rpc_const::cancel_failure;
+                ca->c.notify_one();
+            }
         }
-    }
 
-    while (calls_.size () > 0){
-        destroy_wait_ = true;
-        destroy_wait_c_.wait(ml);
+        while (calls_.size () > 0){
+            destroy_wait_ = true;
+            destroy_wait_c_.wait(ml);
+        }
+        LOG("done");
     }
-    LOG("done");
 }
 
 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
@@ -167,13 +168,13 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
     auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
-    connection *ch = NULL;
+    shared_ptr<connection> ch;
 
-    while (1){
-        if(transmit){
-            get_refconn(&ch);
-            if(ch){
-                if(reachable_) {
+    while (1) {
+        if(transmit) {
+            get_refconn(ch);
+            if (ch) {
+                if (reachable_) {
                     request forgot;
                     {
                         lock ml(m_);
@@ -217,7 +218,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
             }
         }
 
-        if(retrans_ && (!ch || ch->isdead())){
+        if(retrans_ && (!ch || ch->isdead())) {
             // since connection is dead, retransmit
             // on the new connection
             transmit = true;
@@ -256,29 +257,19 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
-    if(ch)
-        ch->decref();
-
     // destruction of req automatically frees its buffer
     return (ca.done? ca.intret : rpc_const::timeout_failure);
 }
 
 void
-rpcc::get_refconn(connection **ch)
+rpcc::get_refconn(shared_ptr<connection> & ch)
 {
     lock ml(chan_m_);
-    if(!chan_ || chan_->isdead()){
-        if(chan_)
-            chan_->decref();
+    if (!chan_ || chan_->isdead())
         chan_ = connect_to_dst(dst_, this, lossytest_);
-    }
-    if(ch && chan_){
-        if(*ch){
-            (*ch)->decref();
-        }
-        *ch = chan_;
-        (*ch)->incref();
-    }
+
+    if (chan_)
+        ch = chan_;
 }
 
 // PollMgr's thread is being used to
@@ -287,7 +278,7 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *, const string & b)
+rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 {
     unmarshall rep(b, true);
     reply_header h;
@@ -368,21 +359,14 @@ rpcs::~rpcs()
 }
 
 bool
-rpcs::got_pdu(connection *c, const string & b)
+rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
 {
     if(!reachable_){
         IF_LEVEL(1) LOG("not reachable");
         return true;
     }
 
-    djob_t *j = new djob_t{c, b};
-    c->incref();
-    bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
-    if(!succ || !reachable_){
-        c->decref();
-        delete j;
-    }
-    return succ;
+    return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
 }
 
 void
@@ -419,20 +403,15 @@ rpcs::updatestat(proc_t proc)
     }
 }
 
-void
-rpcs::dispatch(djob_t *j)
-{
-    connection *c = j->conn;
-    unmarshall req(j->buf, true);
-    delete j;
+void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
+    unmarshall req(buf, true);
 
     request_header h;
     req.unpack_header(h);
     proc_t proc = h.proc;
 
-    if(!req.ok()){
-        IF_LEVEL(1) LOG("unmarshall header failed!!!");
-        c->decref();
+    if (!req.ok()) {
+        IF_LEVEL(1) LOG("unmarshall header failed");
         return;
     }
 
@@ -458,7 +437,6 @@ rpcs::dispatch(djob_t *j)
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
             cerr << "unknown proc " << hex << proc << "." << endl;
-            c->decref();
             VERIFY(0);
             return;
         }
@@ -485,14 +463,10 @@ rpcs::dispatch(djob_t *j)
         // save the latest good connection to the client
         {
             lock rwl(conns_m_);
-            if(conns_.find(h.clt_nonce) == conns_.end()){
-                c->incref();
+            if (conns_.find(h.clt_nonce) == conns_.end())
                 conns_[h.clt_nonce] = c;
-            } else if(conns_[h.clt_nonce]->compare(c) < 0){
-                conns_[h.clt_nonce]->decref();
-                c->incref();
+            else if(conns_[h.clt_nonce]->create_time() < c->create_time())
                 conns_[h.clt_nonce] = c;
-            }
         }
 
         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
@@ -530,11 +504,8 @@ rpcs::dispatch(djob_t *j)
             // get the latest connection to the client
             {
                 lock rwl(conns_m_);
-                if(c->isdead() && c != conns_[h.clt_nonce]){
-                    c->decref();
+                if (c->isdead())
                     c = conns_[h.clt_nonce];
-                    c->incref();
-                }
             }
 
             c->send(rep);
@@ -551,7 +522,6 @@ rpcs::dispatch(djob_t *j)
             c->send(rep);
             break;
     }
-    c->decref();
 }
 
 // rpcs::dispatch calls this when an RPC request arrives.
index 19ec96a..9ec2fd8 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -45,7 +45,7 @@ class rpcc : public chanmgr {
             cond c;
         };
 
-        void get_refconn(connection **ch);
+        void get_refconn(shared_ptr<connection> & ch);
         void update_xid_rep(int xid);
 
 
@@ -58,7 +58,7 @@ class rpcc : public chanmgr {
         bool retrans_;
         bool reachable_;
 
-        connection *chan_;
+        shared_ptr<connection> chan_;
 
         mutex m_; // protect insert/delete to calls[]
         mutex chan_m_;
@@ -95,7 +95,7 @@ class rpcc : public chanmgr {
 
         void cancel();
 
-        bool got_pdu(connection *c, const string & b);
+        bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
         template<class R, typename ...Args>
             inline int call(proc_t proc, R & r, const Args&... args);
@@ -175,7 +175,7 @@ class rpcs : public chanmgr {
     void updatestat(proc_t proc);
 
     // latest connection to the client
-    map<unsigned int, connection *> conns_;
+    map<unsigned int, shared_ptr<connection>> conns_;
 
     // counting
     const size_t counting_;
@@ -195,11 +195,7 @@ class rpcs : public chanmgr {
 
     protected:
 
-    struct djob_t {
-        connection *conn;
-        string buf;
-    };
-    void dispatch(djob_t *);
+    void dispatch(shared_ptr<connection> c, const string & buf);
 
     // internal handler registration
     void reg1(proc_t proc, handler *);
@@ -216,7 +212,7 @@ class rpcs : public chanmgr {
 
     void set_reachable(bool r) { reachable_ = r; }
 
-    bool got_pdu(connection *c, const string & b);
+    bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
     struct ReturnOnFailure {
         static inline int unmarshall_args_failure() {
diff --git a/types.h b/types.h
index 0897649..b326ef7 100644 (file)
--- a/types.h
+++ b/types.h
@@ -59,6 +59,12 @@ using std::list;
 #include <map>
 using std::map;
 
+#include <memory>
+using std::enable_shared_from_this;
+using std::make_shared;
+using std::shared_ptr;
+using std::unique_ptr;
+
 #include <mutex>
 using std::mutex;
 using lock = std::unique_lock<std::mutex>;