From 3615d9bf0b254442e2fddee45475dbd634cf703a Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Thu, 10 Oct 2013 12:39:39 -0400 Subject: [PATCH] Explicit refcounting removed from connection object --- rpc/connection.cc | 68 +++++++------------------------- rpc/connection.h | 18 +++------ rpc/rpc.cc | 112 ++++++++++++++++++++--------------------------------- rpc/rpc.h | 16 +++----- types.h | 6 +++ 5 files changed, 72 insertions(+), 148 deletions(-) diff --git a/rpc/connection.cc b/rpc/connection.cc index 33e891c..cc9f03c 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -20,20 +20,11 @@ connection::connection(chanmgr *m1, int f1, int l1) } connection::~connection() { + closeconn(); VERIFY(dead_); VERIFY(!wpdu_.buf.size()); } -void connection::incref() { - lock rl(ref_m_); - refno_++; -} - -bool connection::isdead() { - lock ml(m_); - return dead_; -} - void connection::closeconn() { { lock ml(m_); @@ -47,29 +38,6 @@ void connection::closeconn() { PollMgr::Instance().block_remove_fd(fd_); } -void connection::decref() { - bool dead = false; - { - lock rl(ref_m_); - refno_--; - VERIFY(refno_>=0); - if (refno_==0) { - lock ml(m_); - dead = dead_; - } - } - if (dead) - delete this; -} - -int connection::compare(connection *another) { - if (create_time_ > another->create_time_) - return 1; - if (create_time_ < another->create_time_) - return -1; - return 0; -} - bool connection::send(const string & b) { lock ml(m_); @@ -131,7 +99,7 @@ void connection::write_cb(int s) { send_complete_.notify_one(); } -//fd_ is ready to be read +// fd_ is ready to be read void connection::read_cb(int s) { lock ml(m_); VERIFY(fd_ == s); @@ -154,8 +122,8 @@ void connection::read_cb(int s) { } if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf)) { - //chanmgr has successfully consumed the pdu + if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) { + // chanmgr has successfully consumed the pdu rpdu_.buf.clear(); rpdu_.solong = 0; } @@ -277,12 +245,8 @@ tcpsconn::~tcpsconn() pipe_[1].close(); th_.join(); - // close all the active connections - map::iterator i; - for (i = conns_.begin(); i != conns_.end(); i++) { - i->second->closeconn(); - i->second->decref(); - } + for (auto & i : conns_) + i.second->closeconn(); } void tcpsconn::process_accept() { @@ -295,19 +259,13 @@ void tcpsconn::process_accept() { } IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); - connection *ch = new connection(mgr_, s1, lossy_); + auto ch = make_shared(mgr_, s1, lossy_); - // garbage collect all dead connections with refcount of 1 + // garbage collect dead connections for (auto i = conns_.begin(); i != conns_.end();) { - if (i->second->isdead() && i->second->ref() == 1) { - IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo()); - i->second->decref(); - // Careful not to reuse i right after erase. (i++) will - // be evaluated before the erase call because in C++, - // there is a sequence point before a function call. - // See http://en.wikipedia.org/wiki/Sequence_point. + if (i->second->isdead()) conns_.erase(i++); - } else + else ++i; } @@ -347,16 +305,16 @@ void tcpsconn::accept_conn() { } } -connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { +shared_ptr connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { int s = socket(AF_INET, SOCK_STREAM, 0); int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); close(s); - return NULL; + return nullptr; } IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - return new connection(mgr, s, lossy); + return make_shared(mgr, s, lossy); } diff --git a/rpc/connection.h b/rpc/connection.h index 3e19a93..7032b8c 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -16,11 +16,11 @@ class connection; class chanmgr { public: - virtual bool got_pdu(connection *c, const string & b) = 0; + virtual bool got_pdu(const shared_ptr & c, const string & b) = 0; virtual ~chanmgr() {} }; -class connection : public aio_callback { +class connection : public aio_callback, public enable_shared_from_this { public: struct charbuf { string buf; @@ -31,18 +31,14 @@ class connection : public aio_callback { ~connection(); int channo() { return fd_; } - bool isdead(); + bool isdead() { lock ml(m_); return dead_; } void closeconn(); bool send(const string & b); void write_cb(int s); void read_cb(int s); - void incref(); - void decref(); - int ref() { lock rl(ref_m_); return refno_; } - - int compare(connection *another); + time_point create_time() const { return create_time_; } private: @@ -59,11 +55,9 @@ class connection : public aio_callback { time_point create_time_; int waiters_ = 0; - int refno_ = 1; int lossy_ = 0; mutex m_; - mutex ref_m_; cond send_complete_; cond send_wait_; }; @@ -83,7 +77,7 @@ class tcpsconn { socket_t tcp_; // listens for connections chanmgr *mgr_; int lossy_; - map conns_; + map> conns_; void process_accept(); }; @@ -95,5 +89,5 @@ struct bundle { int lossy; }; -connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); +shared_ptr connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 47ac775..2c1f1a5 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -70,7 +70,7 @@ static sockaddr_in make_sockaddr(const string &hostandport); rpcc::rpcc(const string & d, bool retrans) : dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), - retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) + retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1) { if(retrans){ set_rand_seed(); @@ -95,11 +95,10 @@ rpcc::rpcc(const string & d, bool retrans) : // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { + cancel(); IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); - if(chan_){ + if(chan_) chan_->closeconn(); - chan_->decref(); - } VERIFY(calls_.size() == 0); } @@ -119,24 +118,26 @@ int rpcc::bind(milliseconds to) { // Cancel all outstanding calls void rpcc::cancel(void) { lock ml(m_); - LOG("force callers to fail"); - for(auto &p : calls_){ - caller *ca = p.second; + if (calls_.size()) { + LOG("force callers to fail"); + for(auto &p : calls_){ + caller *ca = p.second; - IF_LEVEL(2) LOG("force caller to fail"); - { - lock cl(ca->m); - ca->done = true; - ca->intret = rpc_const::cancel_failure; - ca->c.notify_one(); + IF_LEVEL(2) LOG("force caller to fail"); + { + lock cl(ca->m); + ca->done = true; + ca->intret = rpc_const::cancel_failure; + ca->c.notify_one(); + } } - } - while (calls_.size () > 0){ - destroy_wait_ = true; - destroy_wait_c_.wait(ml); + while (calls_.size () > 0){ + destroy_wait_ = true; + destroy_wait_c_.wait(ml); + } + LOG("done"); } - LOG("done"); } int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { @@ -167,13 +168,13 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline; bool transmit = true; - connection *ch = NULL; + shared_ptr ch; - while (1){ - if(transmit){ - get_refconn(&ch); - if(ch){ - if(reachable_) { + while (1) { + if(transmit) { + get_refconn(ch); + if (ch) { + if (reachable_) { request forgot; { lock ml(m_); @@ -217,7 +218,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { } } - if(retrans_ && (!ch || ch->isdead())){ + if(retrans_ && (!ch || ch->isdead())) { // since connection is dead, retransmit // on the new connection transmit = true; @@ -256,29 +257,19 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); - if(ch) - ch->decref(); - // destruction of req automatically frees its buffer return (ca.done? ca.intret : rpc_const::timeout_failure); } void -rpcc::get_refconn(connection **ch) +rpcc::get_refconn(shared_ptr & ch) { lock ml(chan_m_); - if(!chan_ || chan_->isdead()){ - if(chan_) - chan_->decref(); + if (!chan_ || chan_->isdead()) chan_ = connect_to_dst(dst_, this, lossytest_); - } - if(ch && chan_){ - if(*ch){ - (*ch)->decref(); - } - *ch = chan_; - (*ch)->incref(); - } + + if (chan_) + ch = chan_; } // PollMgr's thread is being used to @@ -287,7 +278,7 @@ rpcc::get_refconn(connection **ch) // // this function keeps no reference for connection *c bool -rpcc::got_pdu(connection *, const string & b) +rpcc::got_pdu(const shared_ptr &, const string & b) { unmarshall rep(b, true); reply_header h; @@ -368,21 +359,14 @@ rpcs::~rpcs() } bool -rpcs::got_pdu(connection *c, const string & b) +rpcs::got_pdu(const shared_ptr & c, const string & b) { if(!reachable_){ IF_LEVEL(1) LOG("not reachable"); return true; } - djob_t *j = new djob_t{c, b}; - c->incref(); - bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j)); - if(!succ || !reachable_){ - c->decref(); - delete j; - } - return succ; + return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b)); } void @@ -419,20 +403,15 @@ rpcs::updatestat(proc_t proc) } } -void -rpcs::dispatch(djob_t *j) -{ - connection *c = j->conn; - unmarshall req(j->buf, true); - delete j; +void rpcs::dispatch(shared_ptr c, const string & buf) { + unmarshall req(buf, true); request_header h; req.unpack_header(h); proc_t proc = h.proc; - if(!req.ok()){ - IF_LEVEL(1) LOG("unmarshall header failed!!!"); - c->decref(); + if (!req.ok()) { + IF_LEVEL(1) LOG("unmarshall header failed"); return; } @@ -458,7 +437,6 @@ rpcs::dispatch(djob_t *j) lock pl(procs_m_); if(procs_.count(proc) < 1){ cerr << "unknown proc " << hex << proc << "." << endl; - c->decref(); VERIFY(0); return; } @@ -485,14 +463,10 @@ rpcs::dispatch(djob_t *j) // save the latest good connection to the client { lock rwl(conns_m_); - if(conns_.find(h.clt_nonce) == conns_.end()){ - c->incref(); + if (conns_.find(h.clt_nonce) == conns_.end()) conns_[h.clt_nonce] = c; - } else if(conns_[h.clt_nonce]->compare(c) < 0){ - conns_[h.clt_nonce]->decref(); - c->incref(); + else if(conns_[h.clt_nonce]->create_time() < c->create_time()) conns_[h.clt_nonce] = c; - } } stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1); @@ -530,11 +504,8 @@ rpcs::dispatch(djob_t *j) // get the latest connection to the client { lock rwl(conns_m_); - if(c->isdead() && c != conns_[h.clt_nonce]){ - c->decref(); + if (c->isdead()) c = conns_[h.clt_nonce]; - c->incref(); - } } c->send(rep); @@ -551,7 +522,6 @@ rpcs::dispatch(djob_t *j) c->send(rep); break; } - c->decref(); } // rpcs::dispatch calls this when an RPC request arrives. diff --git a/rpc/rpc.h b/rpc/rpc.h index 19ec96a..9ec2fd8 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -45,7 +45,7 @@ class rpcc : public chanmgr { cond c; }; - void get_refconn(connection **ch); + void get_refconn(shared_ptr & ch); void update_xid_rep(int xid); @@ -58,7 +58,7 @@ class rpcc : public chanmgr { bool retrans_; bool reachable_; - connection *chan_; + shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; @@ -95,7 +95,7 @@ class rpcc : public chanmgr { void cancel(); - bool got_pdu(connection *c, const string & b); + bool got_pdu(const shared_ptr & c, const string & b); template inline int call(proc_t proc, R & r, const Args&... args); @@ -175,7 +175,7 @@ class rpcs : public chanmgr { void updatestat(proc_t proc); // latest connection to the client - map conns_; + map> conns_; // counting const size_t counting_; @@ -195,11 +195,7 @@ class rpcs : public chanmgr { protected: - struct djob_t { - connection *conn; - string buf; - }; - void dispatch(djob_t *); + void dispatch(shared_ptr c, const string & buf); // internal handler registration void reg1(proc_t proc, handler *); @@ -216,7 +212,7 @@ class rpcs : public chanmgr { void set_reachable(bool r) { reachable_ = r; } - bool got_pdu(connection *c, const string & b); + bool got_pdu(const shared_ptr & c, const string & b); struct ReturnOnFailure { static inline int unmarshall_args_failure() { diff --git a/types.h b/types.h index 0897649..b326ef7 100644 --- a/types.h +++ b/types.h @@ -59,6 +59,12 @@ using std::list; #include using std::map; +#include +using std::enable_shared_from_this; +using std::make_shared; +using std::shared_ptr; +using std::unique_ptr; + #include using std::mutex; using lock = std::unique_lock; -- 1.7.9.5