Fixed two major bugs in paxos.cc.
[invirt/third/libt4.git] / rpc / connection.cc
index 55e374a..4681ae9 100644 (file)
@@ -1,6 +1,5 @@
-// std::bind and syscall bind have the same name, so don't use std::bind in this file
-#define LIBT4_NO_FUNCTIONAL
 #include "connection.h"
+#include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
 #include <fcntl.h>
@@ -8,8 +7,7 @@
 #include <netinet/tcp.h>
 #include <unistd.h>
 #include <sys/socket.h>
-
-#define MAX_PDU (10<<20) //maximum PDF is 10M
+#include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
@@ -92,7 +90,7 @@ bool connection::send(const string & b) {
 
     if (lossy_) {
         if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
+            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
             shutdown(fd_,SHUT_RDWR);
         }
     }
@@ -173,14 +171,10 @@ bool connection::writepdu() {
     if (wpdu_.solong == wpdu_.buf.size())
         return true;
 
-    if (wpdu_.solong == 0) {
-        uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
-        copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
-    }
     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
+            IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
             wpdu_.buf.clear();
         }
@@ -193,7 +187,7 @@ bool connection::writepdu() {
 bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
-        uint32_t sz1;
+        rpc_sz_t sz1;
         ssize_t n = read(fd_, &sz1, sizeof(sz1));
 
         if (n == 0) {
@@ -210,7 +204,7 @@ bool connection::readpdu() {
             return false;
         }
 
-        size_t sz = ntohl(sz1);
+        size_t sz = ntoh(sz1);
 
         if (sz > MAX_PDU) {
             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
@@ -221,7 +215,6 @@ bool connection::readpdu() {
 
         VERIFY(rpdu_.buf.size() == 0);
         rpdu_.buf = string(sz+sizeof(sz1), 0);
-        copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
         rpdu_.solong = sizeof(sz1);
     }
 
@@ -240,13 +233,13 @@ bool connection::readpdu() {
     return true;
 }
 
-tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 : mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
-    sin.sin_port = htons(port);
+    sin.sin_port = hton(port);
 
     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp_ < 0) {
@@ -258,7 +251,9 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
 
-    if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
+    // careful to exactly match type signature of bind arguments so we don't
+    // get std::bind instead
+    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
         perror("accept_loop tcp bind:");
         VERIFY(0);
     }
@@ -270,7 +265,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
 
     socklen_t addrlen = sizeof(sin);
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
-    port_ = ntohs(sin.sin_port);
+    port_ = ntoh(sin.sin_port);
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
@@ -308,7 +303,7 @@ void tcpsconn::process_accept() {
         throw thread_exit_exception();
     }
 
-    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
+    IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
     connection *ch = new connection(mgr_, s1, lossy_);
 
     // garbage collect all dead connections with refcount of 1
@@ -345,7 +340,7 @@ void tcpsconn::accept_conn() {
                     continue;
                 } else {
                     perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
+                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
                     VERIFY(0);
                 }
             }
@@ -372,11 +367,11 @@ connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
     int yes = 1;
     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-        IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
         close(s);
         return NULL;
     }
-    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
+    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
     return new connection(mgr, s, lossy);
 }