X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5d99dbf06a14904944f5593c63705934bdfdcfb7..16e7c282c6fcec8189425bd15ec9e8a4a0ee857d:/rpc/connection.cc?ds=inline diff --git a/rpc/connection.cc b/rpc/connection.cc index 55e374a..4681ae9 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,6 +1,5 @@ -// std::bind and syscall bind have the same name, so don't use std::bind in this file -#define LIBT4_NO_FUNCTIONAL #include "connection.h" +#include "rpc_protocol.h" #include #include #include @@ -8,8 +7,7 @@ #include #include #include - -#define MAX_PDU (10<<20) //maximum PDF is 10M +#include "marshall.h" connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) @@ -92,7 +90,7 @@ bool connection::send(const string & b) { if (lossy_) { if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_); + IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_); shutdown(fd_,SHUT_RDWR); } } @@ -173,14 +171,10 @@ bool connection::writepdu() { if (wpdu_.solong == wpdu_.buf.size()) return true; - if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t)); - copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]); - } ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { - IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); + IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno); wpdu_.solong = size_t_max; wpdu_.buf.clear(); } @@ -193,7 +187,7 @@ bool connection::writepdu() { bool connection::readpdu() { IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); if (!rpdu_.buf.size()) { - uint32_t sz1; + rpc_sz_t sz1; ssize_t n = read(fd_, &sz1, sizeof(sz1)); if (n == 0) { @@ -210,7 +204,7 @@ bool connection::readpdu() { return false; } - size_t sz = ntohl(sz1); + size_t sz = ntoh(sz1); if (sz > MAX_PDU) { IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); @@ -221,7 +215,6 @@ bool connection::readpdu() { VERIFY(rpdu_.buf.size() == 0); rpdu_.buf = string(sz+sizeof(sz1), 0); - copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]); rpdu_.solong = sizeof(sz1); } @@ -240,13 +233,13 @@ bool connection::readpdu() { return true; } -tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) +tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) : mgr_(m1), lossy_(lossytest) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; - sin.sin_port = htons(port); + sin.sin_port = hton(port); tcp_ = socket(AF_INET, SOCK_STREAM, 0); if (tcp_ < 0) { @@ -258,7 +251,9 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { + // careful to exactly match type signature of bind arguments so we don't + // get std::bind instead + if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop tcp bind:"); VERIFY(0); } @@ -270,7 +265,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) socklen_t addrlen = sizeof(sin); VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); - port_ = ntohs(sin.sin_port); + port_ = ntoh(sin.sin_port); IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); @@ -308,7 +303,7 @@ void tcpsconn::process_accept() { throw thread_exit_exception(); } - IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port)); + IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); connection *ch = new connection(mgr_, s1, lossy_); // garbage collect all dead connections with refcount of 1 @@ -345,7 +340,7 @@ void tcpsconn::accept_conn() { continue; } else { perror("accept_conn select:"); - IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno); + IF_LEVEL(0) LOG("accept_conn failure errno " << errno); VERIFY(0); } } @@ -372,11 +367,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { int yes = 1; setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); close(s); return NULL; } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port)); + IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); return new connection(mgr, s, lossy); }