X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/8b9d106fcc61fc84712c97d4db060d8302cc63fd..6623ac357055b95ce4fc0cbe9c5dc15524a9f20c:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index 6994f53..6c406a4 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,8 +6,8 @@ #include #include "marshall.h" -connection::connection(chanmgr *m1, int f1, int l1) -: mgr_(m1), fd_(f1), lossy_(l1) +connection::connection(connection_delegate *m1, socket_t && f1, int l1) +: mgr_(m1), fd_(move(f1)), lossy_(l1) { fd_.flags() |= O_NONBLOCK; @@ -15,7 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1) create_time_ = steady_clock::now(); - PollMgr::Instance().add_callback(fd_, CB_RDONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this); } connection::~connection() { @@ -24,6 +24,18 @@ connection::~connection() { VERIFY(!wpdu_.buf.size()); } +shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) { + socket_t s = socket(AF_INET, SOCK_STREAM, 0); + s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + close(s); + return nullptr; + } + IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + return make_shared(mgr, move(s), lossy); +} + void connection::closeconn() { { lock ml(m_); @@ -34,7 +46,7 @@ void connection::closeconn() { } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active - PollMgr::Instance().block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd_); } bool connection::send(const string & b) { @@ -61,11 +73,11 @@ bool connection::send(const string & b) { if (!writepdu()) { dead_ = true; ml.unlock(); - PollMgr::Instance().block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd_); ml.lock(); } else if (wpdu_.solong != wpdu_.buf.size()) { // should be rare to need to explicitly add write callback - PollMgr::Instance().add_callback(fd_, CB_WRONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) send_complete_.wait(ml); } @@ -83,11 +95,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.buf.size() == 0) { - PollMgr::Instance().del_callback(fd_,CB_WRONLY); + poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY); return; } if (!writepdu()) { - PollMgr::Instance().del_callback(fd_, CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); @@ -102,27 +114,25 @@ void connection::write_cb(int s) { void connection::read_cb(int s) { lock ml(m_); VERIFY(fd_ == s); - if (dead_) { + if (dead_) return; - } IF_LEVEL(5) LOG("got data on fd " << s); bool succ = true; - if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) succ = readpdu(); - } if (!succ) { IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); - PollMgr::Instance().del_callback(fd_,CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) { - // chanmgr has successfully consumed the pdu + // connection_delegate has successfully consumed the pdu rpdu_.buf.clear(); rpdu_.solong = 0; } @@ -195,10 +205,10 @@ bool connection::readpdu() { return true; } -tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) +tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest) { - struct sockaddr_in sin; + sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = hton(port); @@ -206,17 +216,15 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); - struct timeval timeout = {0, 50000}; - - if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); - if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); // careful to exactly match type signature of bind arguments so we don't // get std::bind instead - if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { + if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop bind"); VERIFY(0); } @@ -293,8 +301,7 @@ void tcpsconn::accept_conn() { if (FD_ISSET(pipe_[0], &rfds)) return; - if (!FD_ISSET(tcp_, &rfds)) - VERIFY(0); + VERIFY(FD_ISSET(tcp_, &rfds)); try { process_accept(); @@ -304,16 +311,3 @@ void tcpsconn::accept_conn() { } } -shared_ptr connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s = socket(AF_INET, SOCK_STREAM, 0); - int yes = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - close(s); - return nullptr; - } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - return make_shared(mgr, s, lossy); -} -