X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/61809b48ade4c21b1b01931d520aa2abc7507032..a4175b2e216a20b86cc872dea8a08005c60617a5:/rpc/connection.cc?ds=sidebyside diff --git a/rpc/connection.cc b/rpc/connection.cc index c22ad45..fec7a4c 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,127 +6,116 @@ #include #include -#include "method_thread.h" #include "connection.h" -#include "slock.h" #include "pollmgr.h" #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" +#include "lock.h" #define MAX_PDU (10<<20) //maximum PDF is 10M -connection::connection(chanmgr *m1, int f1, int l1) +connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) { - int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - VERIFY(pthread_mutex_init(&m_,0)==0); - VERIFY(pthread_mutex_init(&ref_m_,0)==0); - VERIFY(pthread_cond_init(&send_wait_,0)==0); - VERIFY(pthread_cond_init(&send_complete_,0)==0); - - VERIFY(gettimeofday(&create_time_, NULL) == 0); + VERIFY(gettimeofday(&create_time_, NULL) == 0); - PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } connection::~connection() { - VERIFY(dead_); - VERIFY(pthread_mutex_destroy(&m_)== 0); - VERIFY(pthread_mutex_destroy(&ref_m_)== 0); - VERIFY(pthread_cond_destroy(&send_wait_) == 0); - VERIFY(pthread_cond_destroy(&send_complete_) == 0); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); - close(fd_); + VERIFY(dead_); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); } void connection::incref() { - ScopedLock ml(&ref_m_); - refno_++; + lock rl(ref_m_); + refno_++; } bool connection::isdead() { - ScopedLock ml(&m_); - return dead_; + lock ml(m_); + return dead_; } void connection::closeconn() { - { - ScopedLock ml(&m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - }else{ - return; - } - } - //after block_remove_fd, select will never wait on fd_ - //and no callbacks will be active - PollMgr::Instance()->block_remove_fd(fd_); + { + lock ml(m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + } else { + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); } void connection::decref() { - VERIFY(pthread_mutex_lock(&ref_m_)==0); - refno_ --; - VERIFY(refno_>=0); - if (refno_==0) { - VERIFY(pthread_mutex_lock(&m_)==0); - if (dead_) { - VERIFY(pthread_mutex_unlock(&ref_m_)==0); - VERIFY(pthread_mutex_unlock(&m_)==0); - delete this; - return; - } - VERIFY(pthread_mutex_unlock(&m_)==0); - } - pthread_mutex_unlock(&ref_m_); + bool dead = false; + { + lock rl(ref_m_); + refno_--; + VERIFY(refno_>=0); + if (refno_==0) { + lock ml(m_); + dead = dead_; + } + } + if (dead) { + delete this; + } } int connection::ref() { - ScopedLock rl(&ref_m_); + lock rl(ref_m_); return refno_; } int connection::compare(connection *another) { - if (create_time_.tv_sec > another->create_time_.tv_sec) - return 1; - if (create_time_.tv_sec < another->create_time_.tv_sec) - return -1; - if (create_time_.tv_usec > another->create_time_.tv_usec) - return 1; - if (create_time_.tv_usec < another->create_time_.tv_usec) - return -1; - return 0; + if (create_time_.tv_sec > another->create_time_.tv_sec) + return 1; + if (create_time_.tv_sec < another->create_time_.tv_sec) + return -1; + if (create_time_.tv_usec > another->create_time_.tv_usec) + return 1; + if (create_time_.tv_usec < another->create_time_.tv_usec) + return -1; + return 0; } bool connection::send(char *b, int sz) { - ScopedLock ml(&m_); + lock ml(m_); waiters_++; while (!dead_ && wpdu_.buf) { - VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + send_wait_.wait(ml); } waiters_--; if (dead_) { @@ -145,16 +134,16 @@ connection::send(char *b, int sz) if (!writepdu()) { dead_ = true; - VERIFY(pthread_mutex_unlock(&m_) == 0); + ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); - VERIFY(pthread_mutex_lock(&m_) == 0); + ml.lock(); }else{ if (wpdu_.solong == wpdu_.sz) { }else{ //should be rare to need to explicitly add write callback PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { - VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + send_complete_.wait(ml); } } } @@ -162,7 +151,7 @@ connection::send(char *b, int sz) wpdu_.solong = wpdu_.sz = 0; wpdu_.buf = NULL; if (waiters_ > 0) - pthread_cond_broadcast(&send_wait_); + send_wait_.notify_all(); return ret; } @@ -170,7 +159,7 @@ connection::send(char *b, int sz) void connection::write_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.sz == 0) { @@ -180,20 +169,20 @@ connection::write_cb(int s) if (!writepdu()) { PollMgr::Instance()->del_callback(fd_, CB_RDWR); dead_ = true; - }else{ + } else { VERIFY(wpdu_.solong >= 0); if (wpdu_.solong < wpdu_.sz) { return; } - } - pthread_cond_signal(&send_complete_); + } + send_complete_.notify_one(); } //fd_ is ready to be read void connection::read_cb(int s) { - ScopedLock ml(&m_); + lock ml(m_); VERIFY(fd_ == s); if (dead_) { return; @@ -207,7 +196,7 @@ connection::read_cb(int s) if (!succ) { PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; - pthread_cond_signal(&send_complete_); + send_complete_.notify_one(); } if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { @@ -268,7 +257,7 @@ connection::readpdu() if (sz > MAX_PDU) { char *tmpb = (char *)&sz1; - jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); return false; } @@ -295,12 +284,9 @@ connection::readpdu() return true; } -tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) : mgr_(m1), lossy_(lossytest) { - - VERIFY(pthread_mutex_init(&m_,NULL) == 0); - struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; @@ -326,11 +312,11 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) VERIFY(0); } - socklen_t addrlen = sizeof(sin); - VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + socklen_t addrlen = sizeof(sin); + VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); + port_ = ntohs(sin.sin_port); - jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_, sin.sin_port); if (pipe(pipe_) < 0) { @@ -342,20 +328,20 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) flags |= O_NONBLOCK; fcntl(pipe_[0], F_SETFL, flags); - VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); + th_ = std::thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() { VERIFY(close(pipe_[1]) == 0); - VERIFY(pthread_join(th_, NULL) == 0); + th_.join(); //close all the active connections std::map::iterator i; for (i = conns_.begin(); i != conns_.end(); i++) { i->second->closeconn(); i->second->decref(); - } + } } void @@ -363,13 +349,13 @@ tcpsconn::process_accept() { sockaddr_in sin; socklen_t slen = sizeof(sin); - int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); if (s1 < 0) { perror("tcpsconn::accept_conn error"); - pthread_exit(NULL); + throw thread_exit_exception(); } - jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); @@ -398,34 +384,41 @@ tcpsconn::accept_conn() fd_set rfds; int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_[0], &rfds); - FD_SET(tcp_, &rfds); - - int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - continue; - } else { - perror("accept_conn select:"); - jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); - VERIFY(0); - } - } - - if (FD_ISSET(pipe_[0], &rfds)) { - close(pipe_[0]); - close(tcp_); - return; - } - else if (FD_ISSET(tcp_, &rfds)) { - process_accept(); - } else { - VERIFY(0); - } - } + try { + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } + } + catch (thread_exit_exception e) + { + return; + } } connection * @@ -435,7 +428,7 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); close(s); return NULL; @@ -445,4 +438,3 @@ connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) return new connection(mgr, s, lossy); } -