X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/24bebc0ecf83446c7371eff69042322aab34976a..e478ac59e66e89cbc174e781ac715c8644539947:/rpc/connection.cc diff --git a/rpc/connection.cc b/rpc/connection.cc index 86d4ec5..4681ae9 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,22 +1,19 @@ -// std::bind and syscall bind have the same name, so don't use std::bind in this file -#define LIBT4_NO_FUNCTIONAL #include "connection.h" +#include "rpc_protocol.h" +#include +#include #include #include #include -#include -#include #include #include - -#define MAX_PDU (10<<20) //maximum PDF is 10M +#include "marshall.h" connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) { int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + fcntl(fd_, F_SETFL, flags | O_NONBLOCK); signal(SIGPIPE, SIG_IGN); @@ -27,9 +24,7 @@ connection::connection(chanmgr *m1, int f1, int l1) connection::~connection() { VERIFY(dead_); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); + VERIFY(!wpdu_.buf.size()); close(fd_); } @@ -46,12 +41,10 @@ bool connection::isdead() { void connection::closeconn() { { lock ml(m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - } else { + if (dead_) return; - } + dead_ = true; + shutdown(fd_,SHUT_RDWR); } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active @@ -81,23 +74,23 @@ int connection::compare(connection *another) { return 0; } -bool connection::send(char *b, size_t sz) { +bool connection::send(const string & b) { lock ml(m_); + waiters_++; - while (!dead_ && wpdu_.buf) { + while (!dead_ && wpdu_.buf.size()) send_wait_.wait(ml); - } waiters_--; - if (dead_) { + + if (dead_) return false; - } + wpdu_.buf = b; - wpdu_.sz = sz; wpdu_.solong = 0; if (lossy_) { if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_); + IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_); shutdown(fd_,SHUT_RDWR); } } @@ -107,19 +100,15 @@ bool connection::send(char *b, size_t sz) { ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - } else { - if (wpdu_.solong == wpdu_.sz) { - } else { - //should be rare to need to explicitly add write callback - PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); - while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { - send_complete_.wait(ml); - } - } + } else if (wpdu_.solong != wpdu_.buf.size()) { + // should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) + send_complete_.wait(ml); } - bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); - wpdu_.solong = wpdu_.sz = 0; - wpdu_.buf = NULL; + bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size()); + wpdu_.solong = 0; + wpdu_.buf.clear(); if (waiters_ > 0) send_wait_.notify_all(); return ret; @@ -130,7 +119,7 @@ void connection::write_cb(int s) { lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); - if (wpdu_.sz == 0) { + if (wpdu_.buf.size() == 0) { PollMgr::Instance()->del_callback(fd_,CB_WRONLY); return; } @@ -139,7 +128,7 @@ void connection::write_cb(int s) { dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong < wpdu_.sz) { + if (wpdu_.solong < wpdu_.buf.size()) { return; } } @@ -154,41 +143,40 @@ void connection::read_cb(int s) { return; } + IF_LEVEL(5) LOG("got data on fd " << s); + bool succ = true; - if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { succ = readpdu(); } if (!succ) { + IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } - if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf)) { //chanmgr has successfully consumed the pdu - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; } } } bool connection::writepdu() { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong == wpdu_.sz) + if (wpdu_.solong == wpdu_.buf.size()) return true; - if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.sz); - bcopy(&sz,wpdu_.buf,sizeof(sz)); - } - ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { - IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); + IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno); wpdu_.solong = size_t_max; - wpdu_.sz = 0; + wpdu_.buf.clear(); } return (errno == EAGAIN); } @@ -197,8 +185,9 @@ bool connection::writepdu() { } bool connection::readpdu() { - if (!rpdu_.sz) { - uint32_t sz1; + IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); + if (!rpdu_.buf.size()) { + rpc_sz_t sz1; ssize_t n = read(fd_, &sz1, sizeof(sz1)); if (n == 0) { @@ -211,50 +200,50 @@ bool connection::readpdu() { } if (n > 0 && n != sizeof(sz1)) { - IF_LEVEL(0) LOG("connection::readpdu short read of sz"); + IF_LEVEL(0) LOG("short read of sz"); return false; } - size_t sz = ntohl(sz1); + size_t sz = ntoh(sz1); if (sz > MAX_PDU) { - IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1); + IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); return false; } - rpdu_.sz = sz; - VERIFY(rpdu_.buf == NULL); - rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); - VERIFY(rpdu_.buf); - bcopy(&sz1,rpdu_.buf,sizeof(sz1)); + IF_LEVEL(5) LOG("read size of datagram = " << sz); + + VERIFY(rpdu_.buf.size() == 0); + rpdu_.buf = string(sz+sizeof(sz1), 0); rpdu_.solong = sizeof(sz1); } - ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + + IF_LEVEL(5) LOG("read " << n << " bytes"); + if (n <= 0) { if (errno == EAGAIN) return true; - if (rpdu_.buf) - free(rpdu_.buf); - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; return (errno == EAGAIN); } rpdu_.solong += (size_t)n; return true; } -tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) : mgr_(m1), lossy_(lossytest) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; - sin.sin_port = htons(port); + sin.sin_port = hton(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); if (tcp_ < 0) { - perror("tcpsconn::tcpsconn accept_loop socket:"); + perror("accept_loop socket:"); VERIFY(0); } @@ -262,21 +251,23 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { + // careful to exactly match type signature of bind arguments so we don't + // get std::bind instead + if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } if (listen(tcp_, 1000) < 0) { - perror("tcpsconn::tcpsconn listen:"); + perror("listen:"); VERIFY(0); } socklen_t addrlen = sizeof(sin); VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + port_ = ntoh(sin.sin_port); - IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port); + IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); if (pipe(pipe_) < 0) { perror("accept_loop pipe:"); @@ -312,7 +303,7 @@ void tcpsconn::process_accept() { throw thread_exit_exception(); } - IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port)); + IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); // garbage collect all dead connections with refcount of 1 @@ -349,7 +340,7 @@ void tcpsconn::accept_conn() { continue; } else { perror("accept_conn select:"); - IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno); + IF_LEVEL(0) LOG("accept_conn failure errno " << errno); VERIFY(0); } } @@ -376,11 +367,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); close(s); return NULL; } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); return new connection(mgr, s, lossy); }