X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b..6623ac357055b95ce4fc0cbe9c5dc15524a9f20c:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index 33e891c..6c406a4 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -2,13 +2,12 @@ #include "rpc_protocol.h" #include #include -#include #include #include #include "marshall.h" -connection::connection(chanmgr *m1, int f1, int l1) -: mgr_(m1), fd_(f1), lossy_(l1) +connection::connection(connection_delegate *m1, socket_t && f1, int l1) +: mgr_(m1), fd_(move(f1)), lossy_(l1) { fd_.flags() |= O_NONBLOCK; @@ -16,22 +15,25 @@ connection::connection(chanmgr *m1, int f1, int l1) create_time_ = steady_clock::now(); - PollMgr::Instance().add_callback(fd_, CB_RDONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this); } connection::~connection() { + closeconn(); VERIFY(dead_); VERIFY(!wpdu_.buf.size()); } -void connection::incref() { - lock rl(ref_m_); - refno_++; -} - -bool connection::isdead() { - lock ml(m_); - return dead_; +shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) { + socket_t s = socket(AF_INET, SOCK_STREAM, 0); + s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + close(s); + return nullptr; + } + IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + return make_shared(mgr, move(s), lossy); } void connection::closeconn() { @@ -44,30 +46,7 @@ void connection::closeconn() { } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active - PollMgr::Instance().block_remove_fd(fd_); -} - -void connection::decref() { - bool dead = false; - { - lock rl(ref_m_); - refno_--; - VERIFY(refno_>=0); - if (refno_==0) { - lock ml(m_); - dead = dead_; - } - } - if (dead) - delete this; -} - -int connection::compare(connection *another) { - if (create_time_ > another->create_time_) - return 1; - if (create_time_ < another->create_time_) - return -1; - return 0; + poll_mgr::shared_mgr.block_remove_fd(fd_); } bool connection::send(const string & b) { @@ -94,11 +73,11 @@ bool connection::send(const string & b) { if (!writepdu()) { dead_ = true; ml.unlock(); - PollMgr::Instance().block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd_); ml.lock(); } else if (wpdu_.solong != wpdu_.buf.size()) { // should be rare to need to explicitly add write callback - PollMgr::Instance().add_callback(fd_, CB_WRONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) send_complete_.wait(ml); } @@ -116,11 +95,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.buf.size() == 0) { - PollMgr::Instance().del_callback(fd_,CB_WRONLY); + poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY); return; } if (!writepdu()) { - PollMgr::Instance().del_callback(fd_, CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); @@ -131,31 +110,29 @@ void connection::write_cb(int s) { send_complete_.notify_one(); } -//fd_ is ready to be read +// fd_ is ready to be read void connection::read_cb(int s) { lock ml(m_); VERIFY(fd_ == s); - if (dead_) { + if (dead_) return; - } IF_LEVEL(5) LOG("got data on fd " << s); bool succ = true; - if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) succ = readpdu(); - } if (!succ) { IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); - PollMgr::Instance().del_callback(fd_,CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf)) { - //chanmgr has successfully consumed the pdu + if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) { + // connection_delegate has successfully consumed the pdu rpdu_.buf.clear(); rpdu_.solong = 0; } @@ -228,10 +205,10 @@ bool connection::readpdu() { return true; } -tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) +tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest) { - struct sockaddr_in sin; + sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = hton(port); @@ -239,17 +216,15 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); - struct timeval timeout = {0, 50000}; - - if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); - if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); // careful to exactly match type signature of bind arguments so we don't // get std::bind instead - if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { + if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop bind"); VERIFY(0); } @@ -277,12 +252,8 @@ tcpsconn::~tcpsconn() pipe_[1].close(); th_.join(); - // close all the active connections - map::iterator i; - for (i = conns_.begin(); i != conns_.end(); i++) { - i->second->closeconn(); - i->second->decref(); - } + for (auto & i : conns_) + i.second->closeconn(); } void tcpsconn::process_accept() { @@ -295,19 +266,13 @@ void tcpsconn::process_accept() { } IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); - connection *ch = new connection(mgr_, s1, lossy_); + auto ch = make_shared(mgr_, s1, lossy_); - // garbage collect all dead connections with refcount of 1 + // garbage collect dead connections for (auto i = conns_.begin(); i != conns_.end();) { - if (i->second->isdead() && i->second->ref() == 1) { - IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo()); - i->second->decref(); - // Careful not to reuse i right after erase. (i++) will - // be evaluated before the erase call because in C++, - // there is a sequence point before a function call. - // See http://en.wikipedia.org/wiki/Sequence_point. + if (i->second->isdead()) conns_.erase(i++); - } else + else ++i; } @@ -336,8 +301,7 @@ void tcpsconn::accept_conn() { if (FD_ISSET(pipe_[0], &rfds)) return; - if (!FD_ISSET(tcp_, &rfds)) - VERIFY(0); + VERIFY(FD_ISSET(tcp_, &rfds)); try { process_accept(); @@ -347,16 +311,3 @@ void tcpsconn::accept_conn() { } } -connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s = socket(AF_INET, SOCK_STREAM, 0); - int yes = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - close(s); - return NULL; - } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - return new connection(mgr, s, lossy); -} -