X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..4b9798f44ae94deabf87dd534337b55259272950:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index c22ad45..6e865e8 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,127 +6,112 @@ #include #include -#include "method_thread.h" #include "connection.h" -#include "slock.h" #include "pollmgr.h" #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" +#include "lock.h" #define MAX_PDU (10<<20) //maximum PDF is 10M -connection::connection(chanmgr *m1, int f1, int l1) +connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) { - int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - VERIFY(pthread_mutex_init(&m_,0)==0); - VERIFY(pthread_mutex_init(&ref_m_,0)==0); - VERIFY(pthread_cond_init(&send_wait_,0)==0); - VERIFY(pthread_cond_init(&send_complete_,0)==0); - - VERIFY(gettimeofday(&create_time_, NULL) == 0); + create_time_ = std::chrono::steady_clock::now(); - PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } connection::~connection() { - VERIFY(dead_); - VERIFY(pthread_mutex_destroy(&m_)== 0); - VERIFY(pthread_mutex_destroy(&ref_m_)== 0); - VERIFY(pthread_cond_destroy(&send_wait_) == 0); - VERIFY(pthread_cond_destroy(&send_complete_) == 0); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); - close(fd_); + VERIFY(dead_); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); } void connection::incref() { - ScopedLock ml(&ref_m_); - refno_++; + lock rl(ref_m_); + refno_++; } bool connection::isdead() { - ScopedLock ml(&m_); - return dead_; + lock ml(m_); + return dead_; } void connection::closeconn() { - { - ScopedLock ml(&m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - }else{ - return; - } - } - //after block_remove_fd, select will never wait on fd_ - //and no callbacks will be active - PollMgr::Instance()->block_remove_fd(fd_); + { + lock ml(m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + } else { + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); } void connection::decref() { - VERIFY(pthread_mutex_lock(&ref_m_)==0); - refno_ --; - VERIFY(refno_>=0); - if (refno_==0) { - VERIFY(pthread_mutex_lock(&m_)==0); - if (dead_) { - VERIFY(pthread_mutex_unlock(&ref_m_)==0); - VERIFY(pthread_mutex_unlock(&m_)==0); - delete this; - return; - } - VERIFY(pthread_mutex_unlock(&m_)==0); - } - pthread_mutex_unlock(&ref_m_); + bool dead = false; + { + lock rl(ref_m_); + refno_--; + VERIFY(refno_>=0); + if (refno_==0) { + lock ml(m_); + dead = dead_; + } + } + if (dead) { + delete this; + } } int connection::ref() { - ScopedLock rl(&ref_m_); + lock rl(ref_m_); return refno_; } int connection::compare(connection *another) { - if (create_time_.tv_sec > another->create_time_.tv_sec) - return 1; - if (create_time_.tv_sec < another->create_time_.tv_sec) - return -1; - if (create_time_.tv_usec > another->create_time_.tv_usec) - return 1; - if (create_time_.tv_usec < another->create_time_.tv_usec) - return -1; - return 0; + if (create_time_ > another->create_time_) + return 1; + if (create_time_ < another->create_time_) + return -1; + return 0; } bool connection::send(char *b, int sz) { - ScopedLock ml(&m_); + lock ml(m_); waiters_++; while (!dead_ && wpdu_.buf) { - VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + send_wait_.wait(ml); } waiters_--; if (dead_) { @@ -145,16 +130,16 @@ connection::send(char *b, int sz) if (!writepdu()) { dead_ = true; - VERIFY(pthread_mutex_unlock(&m_) == 0); + ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); - VERIFY(pthread_mutex_lock(&m_) == 0); - }else{ + ml.lock(); + } else { if (wpdu_.solong == wpdu_.sz) { - }else{ + } else { //should be rare to need to explicitly add write callback PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { - VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + send_complete_.wait(ml); } } } @@ -162,7 +147,7 @@ connection::send(char *b, int sz) wpdu_.solong = wpdu_.sz = 0; wpdu_.buf = NULL; if (waiters_ > 0) - pthread_cond_broadcast(&send_wait_); + send_wait_.notify_all(); return ret; } @@ -170,7 +155,7 @@ connection::send(char *b, int sz) void connection::write_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.sz == 0) { @@ -180,20 +165,20 @@ connection::write_cb(int s) if (!writepdu()) { PollMgr::Instance()->del_callback(fd_, CB_RDWR); dead_ = true; - }else{ + } else { VERIFY(wpdu_.solong >= 0); if (wpdu_.solong < wpdu_.sz) { return; } - } - pthread_cond_signal(&send_complete_); + } + send_complete_.notify_one(); } //fd_ is ready to be read void connection::read_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(fd_ == s); if (dead_) { return; @@ -207,7 +192,7 @@ connection::read_cb(int s) if (!succ) { PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; - pthread_cond_signal(&send_complete_); + send_complete_.notify_one(); } if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { @@ -268,7 +253,7 @@ connection::readpdu() if (sz > MAX_PDU) { char *tmpb = (char *)&sz1; - jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); return false; } @@ -295,19 +280,16 @@ connection::readpdu() return true; } -tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) : mgr_(m1), lossy_(lossytest) { - - VERIFY(pthread_mutex_init(&m_,NULL) == 0); - struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); - if(tcp_ < 0){ + if (tcp_ < 0) { perror("tcpsconn::tcpsconn accept_loop socket:"); VERIFY(0); } @@ -316,21 +298,21 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } - if(listen(tcp_, 1000) < 0) { + if (listen(tcp_, 1000) < 0) { perror("tcpsconn::tcpsconn listen:"); VERIFY(0); } - socklen_t addrlen = sizeof(sin); - VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); - jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, sin.sin_port); if (pipe(pipe_) < 0) { @@ -342,20 +324,20 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) flags |= O_NONBLOCK; fcntl(pipe_[0], F_SETFL, flags); - VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); + th_ = std::thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() { VERIFY(close(pipe_[1]) == 0); - VERIFY(pthread_join(th_, NULL) == 0); + th_.join(); //close all the active connections std::map::iterator i; for (i = conns_.begin(); i != conns_.end(); i++) { i->second->closeconn(); i->second->decref(); - } + } } void @@ -363,31 +345,31 @@ tcpsconn::process_accept() { sockaddr_in sin; socklen_t slen = sizeof(sin); - int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); if (s1 < 0) { perror("tcpsconn::accept_conn error"); - pthread_exit(NULL); + throw thread_exit_exception(); } - jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); - // garbage collect all dead connections with refcount of 1 - std::map::iterator i; - for (i = conns_.begin(); i != conns_.end();) { - if (i->second->isdead() && i->second->ref() == 1) { - jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", - i->second->channo()); - i->second->decref(); - // Careful not to reuse i right after erase. (i++) will - // be evaluated before the erase call because in C++, - // there is a sequence point before a function call. - // See http://en.wikipedia.org/wiki/Sequence_point. - conns_.erase(i++); - } else - ++i; - } + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } conns_[ch->channo()] = ch; } @@ -398,44 +380,49 @@ tcpsconn::accept_conn() fd_set rfds; int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_[0], &rfds); - FD_SET(tcp_, &rfds); - - int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - continue; - } else { - perror("accept_conn select:"); - jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); - VERIFY(0); - } - } - - if (FD_ISSET(pipe_[0], &rfds)) { - close(pipe_[0]); - close(tcp_); - return; - } - else if (FD_ISSET(tcp_, &rfds)) { - process_accept(); - } else { - VERIFY(0); - } - } + try { + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } + } + catch (thread_exit_exception e) + { + } } connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s= socket(AF_INET, SOCK_STREAM, 0); + int s = socket(AF_INET, SOCK_STREAM, 0); int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); close(s); return NULL; @@ -445,4 +432,3 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) return new connection(mgr, s, lossy); } -