X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/46fb2b4bbe3a0a8516ab04cfafa895a882c70f86..5d99dbf06a14904944f5593c63705934bdfdcfb7:/rpc/connection.cc?ds=sidebyside diff --git a/rpc/connection.cc b/rpc/connection.cc index 86d4ec5..55e374a 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,11 +1,11 @@ // std::bind and syscall bind have the same name, so don't use std::bind in this file #define LIBT4_NO_FUNCTIONAL #include "connection.h" +#include +#include #include #include #include -#include -#include #include #include @@ -15,8 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) { int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + fcntl(fd_, F_SETFL, flags | O_NONBLOCK); signal(SIGPIPE, SIG_IGN); @@ -27,9 +26,7 @@ connection::connection(chanmgr *m1, int f1, int l1) connection::~connection() { VERIFY(dead_); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); + VERIFY(!wpdu_.buf.size()); close(fd_); } @@ -46,12 +43,10 @@ bool connection::isdead() { void connection::closeconn() { { lock ml(m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - } else { + if (dead_) return; - } + dead_ = true; + shutdown(fd_,SHUT_RDWR); } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active @@ -81,18 +76,18 @@ int connection::compare(connection *another) { return 0; } -bool connection::send(char *b, size_t sz) { +bool connection::send(const string & b) { lock ml(m_); + waiters_++; - while (!dead_ && wpdu_.buf) { + while (!dead_ && wpdu_.buf.size()) send_wait_.wait(ml); - } waiters_--; - if (dead_) { + + if (dead_) return false; - } + wpdu_.buf = b; - wpdu_.sz = sz; wpdu_.solong = 0; if (lossy_) { @@ -107,19 +102,15 @@ bool connection::send(char *b, size_t sz) { ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - } else { - if (wpdu_.solong == wpdu_.sz) { - } else { - //should be rare to need to explicitly add write callback - PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); - while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { - send_complete_.wait(ml); - } - } + } else if (wpdu_.solong != wpdu_.buf.size()) { + // should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) + send_complete_.wait(ml); } - bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); - wpdu_.solong = wpdu_.sz = 0; - wpdu_.buf = NULL; + bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size()); + wpdu_.solong = 0; + wpdu_.buf.clear(); if (waiters_ > 0) send_wait_.notify_all(); return ret; @@ -130,7 +121,7 @@ void connection::write_cb(int s) { lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); - if (wpdu_.sz == 0) { + if (wpdu_.buf.size() == 0) { PollMgr::Instance()->del_callback(fd_,CB_WRONLY); return; } @@ -139,7 +130,7 @@ void connection::write_cb(int s) { dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong < wpdu_.sz) { + if (wpdu_.solong < wpdu_.buf.size()) { return; } } @@ -154,41 +145,44 @@ void connection::read_cb(int s) { return; } + IF_LEVEL(5) LOG("got data on fd " << s); + bool succ = true; - if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { succ = readpdu(); } if (!succ) { + IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } - if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf)) { //chanmgr has successfully consumed the pdu - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; } } } bool connection::writepdu() { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong == wpdu_.sz) + if (wpdu_.solong == wpdu_.buf.size()) return true; if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.sz); - bcopy(&sz,wpdu_.buf,sizeof(sz)); + uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t)); + copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]); } - ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); wpdu_.solong = size_t_max; - wpdu_.sz = 0; + wpdu_.buf.clear(); } return (errno == EAGAIN); } @@ -197,7 +191,8 @@ bool connection::writepdu() { } bool connection::readpdu() { - if (!rpdu_.sz) { + IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); + if (!rpdu_.buf.size()) { uint32_t sz1; ssize_t n = read(fd_, &sz1, sizeof(sz1)); @@ -211,33 +206,34 @@ bool connection::readpdu() { } if (n > 0 && n != sizeof(sz1)) { - IF_LEVEL(0) LOG("connection::readpdu short read of sz"); + IF_LEVEL(0) LOG("short read of sz"); return false; } size_t sz = ntohl(sz1); if (sz > MAX_PDU) { - IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1); + IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); return false; } - rpdu_.sz = sz; - VERIFY(rpdu_.buf == NULL); - rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); - VERIFY(rpdu_.buf); - bcopy(&sz1,rpdu_.buf,sizeof(sz1)); + IF_LEVEL(5) LOG("read size of datagram = " << sz); + + VERIFY(rpdu_.buf.size() == 0); + rpdu_.buf = string(sz+sizeof(sz1), 0); + copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]); rpdu_.solong = sizeof(sz1); } - ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + + IF_LEVEL(5) LOG("read " << n << " bytes"); + if (n <= 0) { if (errno == EAGAIN) return true; - if (rpdu_.buf) - free(rpdu_.buf); - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; return (errno == EAGAIN); } rpdu_.solong += (size_t)n; @@ -254,7 +250,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) tcp_ = socket(AF_INET, SOCK_STREAM, 0); if (tcp_ < 0) { - perror("tcpsconn::tcpsconn accept_loop socket:"); + perror("accept_loop socket:"); VERIFY(0); } @@ -268,7 +264,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) } if (listen(tcp_, 1000) < 0) { - perror("tcpsconn::tcpsconn listen:"); + perror("listen:"); VERIFY(0); } @@ -276,7 +272,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); port_ = ntohs(sin.sin_port); - IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port); + IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); if (pipe(pipe_) < 0) { perror("accept_loop pipe:");