Removed tabs and unneeded constructor
[invirt/third/libt4.git] / rpc / connection.cc
index 86d4ec5..4681ae9 100644 (file)
@@ -1,22 +1,19 @@
-// std::bind and syscall bind have the same name, so don't use std::bind in this file
-#define LIBT4_NO_FUNCTIONAL
 #include "connection.h"
+#include "rpc_protocol.h"
+#include <cerrno>
+#include <csignal>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
-#include <errno.h>
-#include <signal.h>
 #include <unistd.h>
 #include <sys/socket.h>
-
-#define MAX_PDU (10<<20) //maximum PDF is 10M
+#include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
     int flags = fcntl(fd_, F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(fd_, F_SETFL, flags);
+    fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
 
     signal(SIGPIPE, SIG_IGN);
 
@@ -27,9 +24,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 
 connection::~connection() {
     VERIFY(dead_);
-    if (rpdu_.buf)
-        free(rpdu_.buf);
-    VERIFY(!wpdu_.buf);
+    VERIFY(!wpdu_.buf.size());
     close(fd_);
 }
 
@@ -46,12 +41,10 @@ bool connection::isdead() {
 void connection::closeconn() {
     {
         lock ml(m_);
-        if (!dead_) {
-            dead_ = true;
-            shutdown(fd_,SHUT_RDWR);
-        } else {
+        if (dead_)
             return;
-        }
+        dead_ = true;
+        shutdown(fd_,SHUT_RDWR);
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
@@ -81,23 +74,23 @@ int connection::compare(connection *another) {
     return 0;
 }
 
-bool connection::send(char *b, size_t sz) {
+bool connection::send(const string & b) {
     lock ml(m_);
+
     waiters_++;
-    while (!dead_ && wpdu_.buf) {
+    while (!dead_ && wpdu_.buf.size())
         send_wait_.wait(ml);
-    }
     waiters_--;
-    if (dead_) {
+
+    if (dead_)
         return false;
-    }
+
     wpdu_.buf = b;
-    wpdu_.sz = sz;
     wpdu_.solong = 0;
 
     if (lossy_) {
         if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
             shutdown(fd_,SHUT_RDWR);
         }
     }
@@ -107,19 +100,15 @@ bool connection::send(char *b, size_t sz) {
         ml.unlock();
         PollMgr::Instance()->block_remove_fd(fd_);
         ml.lock();
-    } else {
-        if (wpdu_.solong == wpdu_.sz) {
-        } else {
-            //should be rare to need to explicitly add write callback
-            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
-            while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
-                send_complete_.wait(ml);
-            }
-        }
+    } else if (wpdu_.solong != wpdu_.buf.size()) {
+        // should be rare to need to explicitly add write callback
+        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+        while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
+            send_complete_.wait(ml);
     }
-    bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
-    wpdu_.solong = wpdu_.sz = 0;
-    wpdu_.buf = NULL;
+    bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
+    wpdu_.solong = 0;
+    wpdu_.buf.clear();
     if (waiters_ > 0)
         send_wait_.notify_all();
     return ret;
@@ -130,7 +119,7 @@ void connection::write_cb(int s) {
     lock ml(m_);
     VERIFY(!dead_);
     VERIFY(fd_ == s);
-    if (wpdu_.sz == 0) {
+    if (wpdu_.buf.size() == 0) {
         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
         return;
     }
@@ -139,7 +128,7 @@ void connection::write_cb(int s) {
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
-        if (wpdu_.solong < wpdu_.sz) {
+        if (wpdu_.solong < wpdu_.buf.size()) {
             return;
         }
     }
@@ -154,41 +143,40 @@ void connection::read_cb(int s) {
         return;
     }
 
+    IF_LEVEL(5) LOG("got data on fd " << s);
+
     bool succ = true;
-    if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
         succ = readpdu();
     }
 
     if (!succ) {
+        IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
 
-    if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
-        if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+    if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
+        if (mgr_->got_pdu(this, rpdu_.buf)) {
             //chanmgr has successfully consumed the pdu
-            rpdu_.buf = NULL;
-            rpdu_.sz = rpdu_.solong = 0;
+            rpdu_.buf.clear();
+            rpdu_.solong = 0;
         }
     }
 }
 
 bool connection::writepdu() {
     VERIFY(wpdu_.solong != size_t_max);
-    if (wpdu_.solong == wpdu_.sz)
+    if (wpdu_.solong == wpdu_.buf.size())
         return true;
 
-    if (wpdu_.solong == 0) {
-        uint32_t sz = htonl((uint32_t)wpdu_.sz);
-        bcopy(&sz,wpdu_.buf,sizeof(sz));
-    }
-    ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+    ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+            IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
-            wpdu_.sz = 0;
+            wpdu_.buf.clear();
         }
         return (errno == EAGAIN);
     }
@@ -197,8 +185,9 @@ bool connection::writepdu() {
 }
 
 bool connection::readpdu() {
-    if (!rpdu_.sz) {
-        uint32_t sz1;
+    IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
+    if (!rpdu_.buf.size()) {
+        rpc_sz_t sz1;
         ssize_t n = read(fd_, &sz1, sizeof(sz1));
 
         if (n == 0) {
@@ -211,50 +200,50 @@ bool connection::readpdu() {
         }
 
         if (n > 0 && n != sizeof(sz1)) {
-            IF_LEVEL(0) LOG("connection::readpdu short read of sz");
+            IF_LEVEL(0) LOG("short read of sz");
             return false;
         }
 
-        size_t sz = ntohl(sz1);
+        size_t sz = ntoh(sz1);
 
         if (sz > MAX_PDU) {
-            IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+            IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
             return false;
         }
 
-        rpdu_.sz = sz;
-        VERIFY(rpdu_.buf == NULL);
-        rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
-        VERIFY(rpdu_.buf);
-        bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+        IF_LEVEL(5) LOG("read size of datagram = " << sz);
+
+        VERIFY(rpdu_.buf.size() == 0);
+        rpdu_.buf = string(sz+sizeof(sz1), 0);
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+    ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+
+    IF_LEVEL(5) LOG("read " << n << " bytes");
+
     if (n <= 0) {
         if (errno == EAGAIN)
             return true;
-        if (rpdu_.buf)
-            free(rpdu_.buf);
-        rpdu_.buf = NULL;
-        rpdu_.sz = rpdu_.solong = 0;
+        rpdu_.buf.clear();
+        rpdu_.solong = 0;
         return (errno == EAGAIN);
     }
     rpdu_.solong += (size_t)n;
     return true;
 }
 
-tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 : mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
-    sin.sin_port = htons(port);
+    sin.sin_port = hton(port);
 
     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp_ < 0) {
-        perror("tcpsconn::tcpsconn accept_loop socket:");
+        perror("accept_loop socket:");
         VERIFY(0);
     }
 
@@ -262,21 +251,23 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
 
-    if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+    // careful to exactly match type signature of bind arguments so we don't
+    // get std::bind instead
+    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
         perror("accept_loop tcp bind:");
         VERIFY(0);
     }
 
     if (listen(tcp_, 1000) < 0) {
-        perror("tcpsconn::tcpsconn listen:");
+        perror("listen:");
         VERIFY(0);
     }
 
     socklen_t addrlen = sizeof(sin);
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
-    port_ = ntohs(sin.sin_port);
+    port_ = ntoh(sin.sin_port);
 
-    IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
+    IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
     if (pipe(pipe_) < 0) {
         perror("accept_loop pipe:");
@@ -312,7 +303,7 @@ void tcpsconn::process_accept() {
         throw thread_exit_exception();
     }
 
-    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
     connection *ch = new connection(mgr_, s1, lossy_);
 
     // garbage collect all dead connections with refcount of 1
@@ -349,7 +340,7 @@ void tcpsconn::accept_conn() {
                     continue;
                 } else {
                     perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
+                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
                     VERIFY(0);
                 }
             }
@@ -376,11 +367,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
     int yes = 1;
     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-        IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
         close(s);
         return NULL;
     }
-    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
     return new connection(mgr, s, lossy);
 }