X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/a4175b2e216a20b86cc872dea8a08005c60617a5..2546a41ad36fdc9ef6471cb35a1d56930ae1b527:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index fec7a4c..db6a3ea 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -25,7 +25,7 @@ connection::connection(chanmgr *m1, int f1, int l1) signal(SIGPIPE, SIG_IGN); - VERIFY(gettimeofday(&create_time_, NULL) == 0); + create_time_ = std::chrono::steady_clock::now(); PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); } @@ -98,19 +98,15 @@ connection::ref() int connection::compare(connection *another) { - if (create_time_.tv_sec > another->create_time_.tv_sec) + if (create_time_ > another->create_time_) return 1; - if (create_time_.tv_sec < another->create_time_.tv_sec) - return -1; - if (create_time_.tv_usec > another->create_time_.tv_usec) - return 1; - if (create_time_.tv_usec < another->create_time_.tv_usec) + if (create_time_ < another->create_time_) return -1; return 0; } bool -connection::send(char *b, int sz) +connection::send(char *b, size_t sz) { lock ml(m_); waiters_++; @@ -137,12 +133,12 @@ connection::send(char *b, int sz) ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - }else{ + } else { if (wpdu_.solong == wpdu_.sz) { - }else{ + } else { //should be rare to need to explicitly add write callback PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); - while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { + while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { send_complete_.wait(ml); } } @@ -170,7 +166,7 @@ connection::write_cb(int s) PollMgr::Instance()->del_callback(fd_, CB_RDWR); dead_ = true; } else { - VERIFY(wpdu_.solong >= 0); + VERIFY(wpdu_.solong != size_t_max); if (wpdu_.solong < wpdu_.sz) { return; } @@ -211,24 +207,24 @@ connection::read_cb(int s) bool connection::writepdu() { - VERIFY(wpdu_.solong >= 0); + VERIFY(wpdu_.solong != size_t_max); if (wpdu_.solong == wpdu_.sz) return true; if (wpdu_.solong == 0) { - int sz = htonl(wpdu_.sz); + uint32_t sz = htonl((uint32_t)wpdu_.sz); bcopy(&sz,wpdu_.buf,sizeof(sz)); } - int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno); - wpdu_.solong = -1; + wpdu_.solong = size_t_max; wpdu_.sz = 0; } return (errno == EAGAIN); } - wpdu_.solong += n; + wpdu_.solong += (size_t)n; return true; } @@ -236,8 +232,8 @@ bool connection::readpdu() { if (!rpdu_.sz) { - int sz, sz1; - int n = read(fd_, &sz1, sizeof(sz1)); + uint32_t sz1; + ssize_t n = read(fd_, &sz1, sizeof(sz1)); if (n == 0) { return false; @@ -248,29 +244,29 @@ connection::readpdu() return false; } - if (n >0 && n!= sizeof(sz)) { + if (n > 0 && n != sizeof(sz1)) { jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n"); return false; } - sz = ntohl(sz1); + size_t sz = ntohl(sz1); if (sz > MAX_PDU) { char *tmpb = (char *)&sz1; - jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz, sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); return false; } rpdu_.sz = sz; VERIFY(rpdu_.buf == NULL); - rpdu_.buf = (char *)malloc(sz+sizeof(sz)); + rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); VERIFY(rpdu_.buf); - bcopy(&sz1,rpdu_.buf,sizeof(sz)); - rpdu_.solong = sizeof(sz); + bcopy(&sz1,rpdu_.buf,sizeof(sz1)); + rpdu_.solong = sizeof(sz1); } - int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); if (n <= 0) { if (errno == EAGAIN) return true; @@ -280,11 +276,11 @@ connection::readpdu() rpdu_.sz = rpdu_.solong = 0; return (errno == EAGAIN); } - rpdu_.solong += n; + rpdu_.solong += (size_t)n; return true; } -tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) : mgr_(m1), lossy_(lossytest) { struct sockaddr_in sin; @@ -293,7 +289,7 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) sin.sin_port = htons(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); - if(tcp_ < 0){ + if (tcp_ < 0) { perror("tcpsconn::tcpsconn accept_loop socket:"); VERIFY(0); } @@ -302,12 +298,12 @@ tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } - if(listen(tcp_, 1000) < 0) { + if (listen(tcp_, 1000) < 0) { perror("tcpsconn::tcpsconn listen:"); VERIFY(0); } @@ -359,21 +355,21 @@ tcpsconn::process_accept() s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); - // garbage collect all dead connections with refcount of 1 - std::map::iterator i; - for (i = conns_.begin(); i != conns_.end();) { - if (i->second->isdead() && i->second->ref() == 1) { - jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", - i->second->channo()); - i->second->decref(); - // Careful not to reuse i right after erase. (i++) will - // be evaluated before the erase call because in C++, - // there is a sequence point before a function call. - // See http://en.wikipedia.org/wiki/Sequence_point. - conns_.erase(i++); - } else - ++i; - } + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } conns_[ch->channo()] = ch; } @@ -385,7 +381,6 @@ tcpsconn::accept_conn() int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; try { - while (1) { FD_ZERO(&rfds); FD_SET(pipe_[0], &rfds); @@ -417,17 +412,16 @@ tcpsconn::accept_conn() } catch (thread_exit_exception e) { - return; } } connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s= socket(AF_INET, SOCK_STREAM, 0); + int s = socket(AF_INET, SOCK_STREAM, 0); int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); close(s);