Lots more clean-ups
[invirt/third/libt4.git] / rpc / connection.cc
index 4681ae9..33e891c 100644 (file)
@@ -2,30 +2,26 @@
 #include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
-#include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <unistd.h>
-#include <sys/socket.h>
 #include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
-    int flags = fcntl(fd_, F_GETFL, NULL);
-    fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+    fd_.flags() |= O_NONBLOCK;
 
     signal(SIGPIPE, SIG_IGN);
 
     create_time_ = steady_clock::now();
 
-    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+    PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
 }
 
 connection::~connection() {
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
-    close(fd_);
 }
 
 void connection::incref() {
@@ -48,7 +44,7 @@ void connection::closeconn() {
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
-    PollMgr::Instance()->block_remove_fd(fd_);
+    PollMgr::Instance().block_remove_fd(fd_);
 }
 
 void connection::decref() {
@@ -98,11 +94,11 @@ bool connection::send(const string & b) {
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        PollMgr::Instance()->block_remove_fd(fd_);
+        PollMgr::Instance().block_remove_fd(fd_);
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
-        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+        PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
@@ -120,11 +116,11 @@ void connection::write_cb(int s) {
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     if (wpdu_.buf.size() == 0) {
-        PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+        PollMgr::Instance().del_callback(fd_,CB_WRONLY);
         return;
     }
     if (!writepdu()) {
-        PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+        PollMgr::Instance().del_callback(fd_, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
@@ -152,7 +148,7 @@ void connection::read_cb(int s) {
 
     if (!succ) {
         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
-        PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+        PollMgr::Instance().del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
@@ -188,11 +184,10 @@ bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
         rpc_sz_t sz1;
-        ssize_t n = read(fd_, &sz1, sizeof(sz1));
+        ssize_t n = fd_.read(sz1);
 
-        if (n == 0) {
+        if (n == 0)
             return false;
-        }
 
         if (n < 0) {
             VERIFY(errno!=EAGAIN);
@@ -218,7 +213,7 @@ bool connection::readpdu() {
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+    ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
 
     IF_LEVEL(5) LOG("read " << n << " bytes");
 
@@ -234,32 +229,33 @@ bool connection::readpdu() {
 }
 
 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
-: mgr_(m1), lossy_(lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
     sin.sin_port = hton(port);
 
-    tcp_ = socket(AF_INET, SOCK_STREAM, 0);
-    if (tcp_ < 0) {
-        perror("accept_loop socket:");
-        VERIFY(0);
-    }
+    tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
+    tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
 
-    int yes = 1;
-    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
-    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+    struct timeval timeout = {0, 50000};
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
-    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
-        perror("accept_loop tcp bind:");
+    if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+        perror("accept_loop bind");
         VERIFY(0);
     }
 
     if (listen(tcp_, 1000) < 0) {
-        perror("listen:");
+        perror("accept_loop listen");
         VERIFY(0);
     }
 
@@ -269,24 +265,19 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
-    if (pipe(pipe_) < 0) {
-        perror("accept_loop pipe:");
-        VERIFY(0);
-    }
+    file_t::pipe(pipe_);
 
-    int flags = fcntl(pipe_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipe_[0], F_SETFL, flags);
+    pipe_[0].flags() |= O_NONBLOCK;
 
     th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
-    VERIFY(close(pipe_[1]) == 0);
+    pipe_[1].close();
     th_.join();
 
-    //close all the active connections
+    // close all the active connections
     map<int, connection *>::iterator i;
     for (i = conns_.begin(); i != conns_.end(); i++) {
         i->second->closeconn();
@@ -325,40 +316,34 @@ void tcpsconn::process_accept() {
 
 void tcpsconn::accept_conn() {
     fd_set rfds;
-    int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
-
-    try {
-        while (1) {
-            FD_ZERO(&rfds);
-            FD_SET(pipe_[0], &rfds);
-            FD_SET(tcp_, &rfds);
-
-            int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
-            if (ret < 0) {
-                if (errno == EINTR) {
-                    continue;
-                } else {
-                    perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
-                    VERIFY(0);
-                }
-            }
-
-            if (FD_ISSET(pipe_[0], &rfds)) {
-                close(pipe_[0]);
-                close(tcp_);
-                return;
-            }
-            else if (FD_ISSET(tcp_, &rfds)) {
-                process_accept();
-            } else {
-                VERIFY(0);
-            }
+    int max_fd = max((int)pipe_[0], (int)tcp_);
+
+    while (1) {
+        FD_ZERO(&rfds);
+        FD_SET(pipe_[0], &rfds);
+        FD_SET(tcp_, &rfds);
+
+        int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+        if (ret < 0 && errno == EINTR)
+            continue;
+        else if (ret < 0) {
+            perror("accept_conn select:");
+            IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
+            VERIFY(0);
+        }
+
+        if (FD_ISSET(pipe_[0], &rfds))
+            return;
+
+        if (!FD_ISSET(tcp_, &rfds))
+            VERIFY(0);
+
+        try {
+            process_accept();
+        } catch (thread_exit_exception e) {
+            break;
         }
-    }
-    catch (thread_exit_exception e)
-    {
     }
 }