Lots more clean-ups
authorPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:35:48 +0000 (12:35 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:35:48 +0000 (12:35 -0400)
13 files changed:
config.cc
handle.cc
paxos.cc
rpc/connection.cc
rpc/connection.h
rpc/file.h [new file with mode: 0644]
rpc/pollmgr.cc
rpc/pollmgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm_client.cc

index 5373007..35654d8 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -204,7 +204,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     cfg_mutex_lock.unlock();
     int r = 0, ret = rpc_const::bind_failure;
     if (rpcc *cl = h.safebind())
     cfg_mutex_lock.unlock();
     int r = 0, ret = rpc_const::bind_failure;
     if (rpcc *cl = h.safebind())
-        ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(100), r, me, vid);
+        ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
     cfg_mutex_lock.lock();
 
     heartbeat_t res = OK;
     cfg_mutex_lock.lock();
 
     heartbeat_t res = OK;
index 5287a35..1cb5cc2 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -40,7 +40,7 @@ rpcc * handle::safebind() {
     // value to support the assumption.
     // 
     // With RPC_LOSSY=5, tests may fail due to delays and time outs.
     // value to support the assumption.
     // 
     // With RPC_LOSSY=5, tests may fail due to delays and time outs.
-    int ret = cl->bind(rpcc::to(1000));
+    int ret = cl->bind(milliseconds(1000));
     if (ret < 0) {
         LOG("bind failure! " << h->m << " " << ret);
         delete cl;
     if (ret < 0) {
         LOG("bind failure! " << h->m << " " << ret);
         delete cl;
index b83a044..8b00ad8 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -94,7 +94,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
-                paxos_protocol::preparereq, rpcc::to(100), res, me, instance, proposal);
+                paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
@@ -125,7 +125,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
             continue;
         bool accept = false;
         int status = r->call_timeout(
             continue;
         bool accept = false;
         int status = r->call_timeout(
-                paxos_protocol::acceptreq, rpcc::to(100), accept, me, instance, proposal, v);
+                paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
@@ -138,7 +138,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
         if (!r)
             continue;
         int res = 0;
         if (!r)
             continue;
         int res = 0;
-        r->call_timeout(paxos_protocol::decidereq, rpcc::to(100), res, me, instance, v);
+        r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
     }
 }
 
     }
 }
 
index 4681ae9..33e891c 100644 (file)
@@ -2,30 +2,26 @@
 #include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
 #include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
-#include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <unistd.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <unistd.h>
-#include <sys/socket.h>
 #include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
 #include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
-    int flags = fcntl(fd_, F_GETFL, NULL);
-    fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+    fd_.flags() |= O_NONBLOCK;
 
     signal(SIGPIPE, SIG_IGN);
 
     create_time_ = steady_clock::now();
 
 
     signal(SIGPIPE, SIG_IGN);
 
     create_time_ = steady_clock::now();
 
-    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+    PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
 }
 
 connection::~connection() {
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
 }
 
 connection::~connection() {
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
-    close(fd_);
 }
 
 void connection::incref() {
 }
 
 void connection::incref() {
@@ -48,7 +44,7 @@ void connection::closeconn() {
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
-    PollMgr::Instance()->block_remove_fd(fd_);
+    PollMgr::Instance().block_remove_fd(fd_);
 }
 
 void connection::decref() {
 }
 
 void connection::decref() {
@@ -98,11 +94,11 @@ bool connection::send(const string & b) {
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        PollMgr::Instance()->block_remove_fd(fd_);
+        PollMgr::Instance().block_remove_fd(fd_);
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
-        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+        PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
@@ -120,11 +116,11 @@ void connection::write_cb(int s) {
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     if (wpdu_.buf.size() == 0) {
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     if (wpdu_.buf.size() == 0) {
-        PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+        PollMgr::Instance().del_callback(fd_,CB_WRONLY);
         return;
     }
     if (!writepdu()) {
         return;
     }
     if (!writepdu()) {
-        PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+        PollMgr::Instance().del_callback(fd_, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
@@ -152,7 +148,7 @@ void connection::read_cb(int s) {
 
     if (!succ) {
         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
 
     if (!succ) {
         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
-        PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+        PollMgr::Instance().del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
         dead_ = true;
         send_complete_.notify_one();
     }
@@ -188,11 +184,10 @@ bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
         rpc_sz_t sz1;
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
         rpc_sz_t sz1;
-        ssize_t n = read(fd_, &sz1, sizeof(sz1));
+        ssize_t n = fd_.read(sz1);
 
 
-        if (n == 0) {
+        if (n == 0)
             return false;
             return false;
-        }
 
         if (n < 0) {
             VERIFY(errno!=EAGAIN);
 
         if (n < 0) {
             VERIFY(errno!=EAGAIN);
@@ -218,7 +213,7 @@ bool connection::readpdu() {
         rpdu_.solong = sizeof(sz1);
     }
 
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+    ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
 
     IF_LEVEL(5) LOG("read " << n << " bytes");
 
 
     IF_LEVEL(5) LOG("read " << n << " bytes");
 
@@ -234,32 +229,33 @@ bool connection::readpdu() {
 }
 
 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 }
 
 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
-: mgr_(m1), lossy_(lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
     sin.sin_port = hton(port);
 
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
     sin.sin_port = hton(port);
 
-    tcp_ = socket(AF_INET, SOCK_STREAM, 0);
-    if (tcp_ < 0) {
-        perror("accept_loop socket:");
-        VERIFY(0);
-    }
+    tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
+    tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
 
 
-    int yes = 1;
-    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
-    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+    struct timeval timeout = {0, 50000};
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
-    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
-        perror("accept_loop tcp bind:");
+    if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+        perror("accept_loop bind");
         VERIFY(0);
     }
 
     if (listen(tcp_, 1000) < 0) {
         VERIFY(0);
     }
 
     if (listen(tcp_, 1000) < 0) {
-        perror("listen:");
+        perror("accept_loop listen");
         VERIFY(0);
     }
 
         VERIFY(0);
     }
 
@@ -269,24 +265,19 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
-    if (pipe(pipe_) < 0) {
-        perror("accept_loop pipe:");
-        VERIFY(0);
-    }
+    file_t::pipe(pipe_);
 
 
-    int flags = fcntl(pipe_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipe_[0], F_SETFL, flags);
+    pipe_[0].flags() |= O_NONBLOCK;
 
     th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
 
     th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
-    VERIFY(close(pipe_[1]) == 0);
+    pipe_[1].close();
     th_.join();
 
     th_.join();
 
-    //close all the active connections
+    // close all the active connections
     map<int, connection *>::iterator i;
     for (i = conns_.begin(); i != conns_.end(); i++) {
         i->second->closeconn();
     map<int, connection *>::iterator i;
     for (i = conns_.begin(); i != conns_.end(); i++) {
         i->second->closeconn();
@@ -325,40 +316,34 @@ void tcpsconn::process_accept() {
 
 void tcpsconn::accept_conn() {
     fd_set rfds;
 
 void tcpsconn::accept_conn() {
     fd_set rfds;
-    int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
-
-    try {
-        while (1) {
-            FD_ZERO(&rfds);
-            FD_SET(pipe_[0], &rfds);
-            FD_SET(tcp_, &rfds);
-
-            int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
-            if (ret < 0) {
-                if (errno == EINTR) {
-                    continue;
-                } else {
-                    perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
-                    VERIFY(0);
-                }
-            }
-
-            if (FD_ISSET(pipe_[0], &rfds)) {
-                close(pipe_[0]);
-                close(tcp_);
-                return;
-            }
-            else if (FD_ISSET(tcp_, &rfds)) {
-                process_accept();
-            } else {
-                VERIFY(0);
-            }
+    int max_fd = max((int)pipe_[0], (int)tcp_);
+
+    while (1) {
+        FD_ZERO(&rfds);
+        FD_SET(pipe_[0], &rfds);
+        FD_SET(tcp_, &rfds);
+
+        int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+        if (ret < 0 && errno == EINTR)
+            continue;
+        else if (ret < 0) {
+            perror("accept_conn select:");
+            IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
+            VERIFY(0);
+        }
+
+        if (FD_ISSET(pipe_[0], &rfds))
+            return;
+
+        if (!FD_ISSET(tcp_, &rfds))
+            VERIFY(0);
+
+        try {
+            process_accept();
+        } catch (thread_exit_exception e) {
+            break;
         }
         }
-    }
-    catch (thread_exit_exception e)
-    {
     }
 }
 
     }
 }
 
index 1eb625b..3e19a93 100644 (file)
@@ -6,6 +6,7 @@
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include "pollmgr.h"
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include "pollmgr.h"
+#include "file.h"
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
 
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
 
@@ -49,7 +50,7 @@ class connection : public aio_callback {
         bool writepdu();
 
         chanmgr *mgr_;
         bool writepdu();
 
         chanmgr *mgr_;
-        const int fd_;
+        const file_t fd_;
         bool dead_ = false;
 
         charbuf wpdu_;
         bool dead_ = false;
 
         charbuf wpdu_;
@@ -77,9 +78,9 @@ class tcpsconn {
         in_port_t port_;
         mutex m_;
         thread th_;
         in_port_t port_;
         mutex m_;
         thread th_;
-        int pipe_[2];
+        file_t pipe_[2];
 
 
-        int tcp_; //file desciptor for accepting connection
+        socket_t tcp_; // listens for connections
         chanmgr *mgr_;
         int lossy_;
         map<int, connection *> conns_;
         chanmgr *mgr_;
         int lossy_;
         map<int, connection *> conns_;
diff --git a/rpc/file.h b/rpc/file.h
new file mode 100644 (file)
index 0000000..75c0d6e
--- /dev/null
@@ -0,0 +1,56 @@
+#ifndef file_h
+#define file_h
+
+#include <fcntl.h>
+#include <unistd.h>
+#include "types.h"
+#include <sys/socket.h>
+
+class file_t {
+    private:
+        int fd_;
+
+        class flags_t {
+            private:
+                const file_t & f_;
+                int flags_;
+            public:
+                flags_t(const file_t & f) : f_(f), flags_(fcntl(f_.fd_, F_GETFL, NULL)) { }
+                ~flags_t() { fcntl(f_.fd_, F_SETFL, flags_); }
+                operator int & () { return flags_; }
+        };
+    public:
+        inline file_t(int fd=-1) : fd_(fd) {}
+        inline file_t(const file_t &) = delete;
+        inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
+        inline ~file_t() { if (fd_ != -1) ::close(fd_); }
+        static inline void pipe(file_t *ends) {
+            int fds[2];
+            VERIFY(::pipe(fds) == 0);
+            ends[0].fd_ = fds[0];
+            ends[1].fd_ = fds[1];
+        }
+        inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
+        inline flags_t flags() const { return *this; }
+        inline void close() {
+            ::close(fd_);
+            fd_ = -1;
+        }
+        template <class T>
+        inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
+        inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); }
+        template <class T>
+        inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
+        inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); }
+};
+
+class socket_t : public file_t {
+    public:
+        socket_t(int fd=-1) : file_t(fd) {}
+        template <class T>
+        int setsockopt(int level, int option, T && value) {
+            return ::setsockopt(*this, level, option, &value, sizeof(T));
+        }
+};
+
+#endif
index a938284..4acff93 100644 (file)
@@ -1,46 +1,77 @@
 #include "types.h"
 #include <errno.h>
 #include "types.h"
 #include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
 
 
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
 
-PollMgr *PollMgr::instance = NULL;
-static once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
 
 
-static void
-PollMgrInit()
-{
-    PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+    public:
+        virtual void watch_fd(int fd, poll_flag flag) = 0;
+        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+        virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+        virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+    public :
+        SelectAIO();
+        ~SelectAIO() {}
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        fd_set rfds_, wfds_;
+        int highfds_;
+        file_t pipe_[2];
+        mutex m_;
+};
 
 
-PollMgr *
-PollMgr::Instance()
-{
-    call_once(pollmgr_is_initialized, PollMgrInit);
-    return instance;
-}
+#ifdef __linux__ 
+class EPollAIO : public wait_manager {
+    public:
+        EPollAIO() {}
+        ~EPollAIO() throw() { }
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        file_t poll_ = epoll_create(MAX_POLL_FDS);
+        struct epoll_event ready_[MAX_POLL_FDS];
+        vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
 
 
-PollMgr::PollMgr() : pending_change_(false)
-{
-    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
-    aio_ = new SelectAIO();
-    //aio_ = new EPollAIO();
 
 
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
     th_ = thread(&PollMgr::wait_loop, this);
 }
 
     th_ = thread(&PollMgr::wait_loop, this);
 }
 
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
 {
 {
-    //never kill me!!!
-    VERIFY(0);
+    lock ml(m_);
+    for (auto p : callbacks_)
+        aio_->unwatch_fd(p.first, CB_RDWR);
+    pending_change_ = true;
+    shutdown_ = true;
+    changedone_c_.wait(ml);
+    delete aio_;
+    th_.join();
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
-    VERIFY(fd < MAX_POLL_FDS);
-
     lock ml(m_);
     aio_->watch_fd(fd, flag);
 
     lock ml(m_);
     aio_->watch_fd(fd, flag);
 
@@ -48,44 +79,26 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
     callbacks_[fd] = ch;
 }
 
     callbacks_[fd] = ch;
 }
 
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd.  After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
     lock ml(m_);
     aio_->unwatch_fd(fd, CB_RDWR);
     pending_change_ = true;
     changedone_c_.wait(ml);
     lock ml(m_);
     aio_->unwatch_fd(fd, CB_RDWR);
     pending_change_ = true;
     changedone_c_.wait(ml);
-    callbacks_[fd] = NULL;
+    callbacks_[fd] = nullptr;
 }
 
 }
 
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
     lock ml(m_);
     lock ml(m_);
-    if (aio_->unwatch_fd(fd, flag)) {
-        callbacks_[fd] = NULL;
-    }
+    if (aio_->unwatch_fd(fd, flag))
+        callbacks_[fd] = nullptr;
 }
 
 }
 
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
-    lock ml(m_);
-    if (!callbacks_[fd] || callbacks_[fd]!=c)
-        return false;
-
-    return aio_->is_watched(fd, flag);
-}
-
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
+void PollMgr::wait_loop() {
     vector<int> readable;
     vector<int> writable;
     vector<int> readable;
     vector<int> writable;
+    aio_callback * cb;
 
     while (1) {
         {
 
     while (1) {
         {
@@ -93,121 +106,80 @@ PollMgr::wait_loop() [[noreturn]]
             if (pending_change_) {
                 pending_change_ = false;
                 changedone_c_.notify_all();
             if (pending_change_) {
                 pending_change_ = false;
                 changedone_c_.notify_all();
+                if (shutdown_)
+                    break;
             }
         }
         readable.clear();
         writable.clear();
             }
         }
         readable.clear();
         writable.clear();
-        aio_->wait_ready(&readable,&writable);
-
-        if (!readable.size() && !writable.size()) {
-            continue;
-        } 
-        //no locking of m_
-        //because no add_callback() and del_callback should 
-        //modify callbacks_[fd] while the fd is not dead
-        for (unsigned int i = 0; i < readable.size(); i++) {
-            int fd = readable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->read_cb(fd);
+        aio_->wait_ready(readable, writable);
+
+        for (auto fd : readable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->read_cb(fd);
         }
 
         }
 
-        for (unsigned int i = 0; i < writable.size(); i++) {
-            int fd = writable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->write_cb(fd);
+        for (auto fd : writable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->write_cb(fd);
         }
     }
 }
 
         }
     }
 }
 
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
 {
     FD_ZERO(&rfds_);
     FD_ZERO(&wfds_);
 
 {
     FD_ZERO(&rfds_);
     FD_ZERO(&wfds_);
 
-    VERIFY(pipe(pipefd_) == 0);
-    FD_SET(pipefd_[0], &rfds_);
-    highfds_ = pipefd_[0];
+    file_t::pipe(pipe_);
 
 
-    int flags = fcntl(pipefd_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipefd_[0], F_SETFL, flags);
-}
+    FD_SET(pipe_[0], &rfds_);
+    highfds_ = pipe_[0];
 
 
-SelectAIO::~SelectAIO()
-{
+    pipe_[0].flags() |= O_NONBLOCK;
 }
 
 }
 
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
+
     lock ml(m_);
     if (highfds_ <= fd) 
         highfds_ = fd;
 
     lock ml(m_);
     if (highfds_ <= fd) 
         highfds_ = fd;
 
-    if (flag == CB_RDONLY) {
-        FD_SET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        FD_SET(fd,&wfds_);
-    }else {
+    if (flag & CB_RDONLY)
         FD_SET(fd,&rfds_);
         FD_SET(fd,&rfds_);
+
+    if (flag & CB_WRONLY)
         FD_SET(fd,&wfds_);
         FD_SET(fd,&wfds_);
-    }
 
 
-    char tmp = 1;
-    VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    VERIFY(pipe_[1].write((char)1)==1);
 }
 
 }
 
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
-    lock ml(m_);
-    if (flag == CB_RDONLY) {
-        return FD_ISSET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        return FD_ISSET(fd,&wfds_);
-    }else{
-        return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
-    }
-}
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
 
 
-bool 
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
     lock ml(m_);
     lock ml(m_);
-    if (flag == CB_RDONLY) {
+    VERIFY((flag & ~CB_RDWR) == 0);
+    if (flag & CB_RDONLY)
         FD_CLR(fd, &rfds_);
         FD_CLR(fd, &rfds_);
-    }else if (flag == CB_WRONLY) {
+    if (flag & CB_WRONLY)
         FD_CLR(fd, &wfds_);
         FD_CLR(fd, &wfds_);
-    }else if (flag == CB_RDWR) {
-        FD_CLR(fd, &wfds_);
-        FD_CLR(fd, &rfds_);
-    }else{
-        VERIFY(0);
-    }
 
 
-    if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
-        if (fd == highfds_) {
-            int newh = pipefd_[0];
-            for (int i = 0; i <= highfds_; i++) {
-                if (FD_ISSET(i, &rfds_)) {
-                    newh = i;
-                }else if (FD_ISSET(i, &wfds_)) {
-                    newh = i;
-                }
-            }
-            highfds_ = newh;
-        }
-    }
-    if (flag == CB_RDWR) {
-        char tmp = 1;
-        VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    int newh = pipe_[0];
+    for (int i = 0; i <= highfds_; i++) {
+        if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+            newh = i;
     }
     }
+    highfds_ = newh;
+
+    if (flag == CB_RDWR)
+        VERIFY(pipe_[1].write((char)1)==1);
+
     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
-void
-SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
     fd_set trfds, twfds;
     int high;
 
     fd_set trfds, twfds;
     int high;
 
@@ -220,130 +192,89 @@ SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
 
     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
 
 
     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
 
-    if (ret < 0) {
-        if (errno == EINTR) {
-            return;
-        } else {
-            perror("select:");
-            IF_LEVEL(0) LOG("select_loop failure errno " << errno);
-            VERIFY(0);
-        }
+    if (ret < 0 && errno == EINTR)
+        return;
+    else if (ret < 0) {
+        perror("select:");
+        IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+        VERIFY(0);
     }
 
     for (int fd = 0; fd <= high; fd++) {
     }
 
     for (int fd = 0; fd <= high; fd++) {
-        if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+        if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
             char tmp;
             char tmp;
-            VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+            VERIFY(pipe_[0].read(tmp)==1);
             VERIFY(tmp==1);
             VERIFY(tmp==1);
-        }else {
-            if (FD_ISSET(fd, &twfds)) {
-                writable->push_back(fd);
-            }
-            if (FD_ISSET(fd, &trfds)) {
-                readable->push_back(fd);
-            }
+        } else {
+            if (FD_ISSET(fd, &twfds))
+                writable.push_back(fd);
+
+            if (FD_ISSET(fd, &trfds))
+                readable.push_back(fd);
         }
     }
 }
 
 #ifdef __linux__ 
 
         }
     }
 }
 
 #ifdef __linux__ 
 
-EPollAIO::EPollAIO()
-{
-    pollfd_ = epoll_create(MAX_POLL_FDS);
-    VERIFY(pollfd_ >= 0);
-    bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
-
-EPollAIO::~EPollAIO()
-{
-    close(pollfd_);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
 
 
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
-    int f;
-    if (flag == CB_RDONLY) {
-        f = EPOLLIN;
-    }else if (flag == CB_WRONLY) {
-        f = EPOLLOUT;
-    }else { //flag == CB_RDWR
-        f = EPOLLIN | EPOLLOUT;
-    }
-    return f;
-}
-
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
     VERIFY(fd < MAX_POLL_FDS);
 
     struct epoll_event ev;
     VERIFY(fd < MAX_POLL_FDS);
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-    fdstatus_[fd] |= (int)flag;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+    fdstatus_[fd] |= (unsigned)flag;
 
     ev.events = EPOLLET;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
         ev.events |= EPOLLOUT;
-    }
 
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
-    }
 
 
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
 }
 
 }
 
-bool 
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
+
     VERIFY(fd < MAX_POLL_FDS);
     VERIFY(fd < MAX_POLL_FDS);
-    fdstatus_[fd] &= ~(int)flag;
+    fdstatus_[fd] &= ~(unsigned)flag;
 
     struct epoll_event ev;
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 
     ev.events = EPOLLET;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
         ev.events |= EPOLLOUT;
-    }
 
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(op == EPOLL_CTL_DEL);
         VERIFY(op == EPOLL_CTL_DEL);
-    }
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
     return (op == EPOLL_CTL_DEL);
 }
 
     return (op == EPOLL_CTL_DEL);
 }
 
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
-    VERIFY(fd < MAX_POLL_FDS);
-    return ((fdstatus_[fd] & CB_MASK) == flag);
-}
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
 
 
-void
-EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
-    int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+    int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
     for (int i = 0; i < nfds; i++) {
     for (int i = 0; i < nfds; i++) {
-        if (ready_[i].events & EPOLLIN) {
-            readable->push_back(ready_[i].data.fd);
-        }
-        if (ready_[i].events & EPOLLOUT) {
-            writable->push_back(ready_[i].data.fd);
-        }
+        if (ready_[i].events & EPOLLIN)
+            readable.push_back(ready_[i].data.fd);
+
+        if (ready_[i].events & EPOLLOUT)
+            writable.push_back(ready_[i].data.fd);
     }
 }
 
     }
 }
 
index 2da3167..ede35f8 100644 (file)
@@ -2,11 +2,6 @@
 #define pollmgr_h 
 
 #include "types.h"
 #define pollmgr_h 
 
 #include "types.h"
-#include <sys/select.h>
-
-#ifdef __linux__
-#include <sys/epoll.h>
-#endif
 
 #define MAX_POLL_FDS 128
 
 
 #define MAX_POLL_FDS 128
 
@@ -18,15 +13,6 @@ typedef enum {
     CB_MASK = ~0x11,
 } poll_flag;
 
     CB_MASK = ~0x11,
 } poll_flag;
 
-class aio_mgr {
-    public:
-        virtual void watch_fd(int fd, poll_flag flag) = 0;
-        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
-        virtual bool is_watched(int fd, poll_flag flag) = 0;
-        virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
-        virtual ~aio_mgr() {}
-};
-
 class aio_callback {
     public:
         virtual void read_cb(int fd) = 0;
 class aio_callback {
     public:
         virtual void read_cb(int fd) = 0;
@@ -39,69 +25,22 @@ class PollMgr {
         PollMgr();
         ~PollMgr();
 
         PollMgr();
         ~PollMgr();
 
-        static PollMgr *Instance();
-        static PollMgr *CreateInst();
+        static PollMgr & Instance();
 
         void add_callback(int fd, poll_flag flag, aio_callback *ch);
         void del_callback(int fd, poll_flag flag);
 
         void add_callback(int fd, poll_flag flag, aio_callback *ch);
         void del_callback(int fd, poll_flag flag);
-        bool has_callback(int fd, poll_flag flag, aio_callback *ch);
         void block_remove_fd(int fd);
         void wait_loop();
 
         void block_remove_fd(int fd);
         void wait_loop();
 
-
-        static PollMgr *instance;
-        static int useful;
-        static int useless;
-
     private:
         mutex m_;
         cond changedone_c_;
     private:
         mutex m_;
         cond changedone_c_;
-        thread th_;
-
-        aio_callback *callbacks_[MAX_POLL_FDS];
-        aio_mgr *aio_;
-        bool pending_change_;
-
-};
-
-class SelectAIO : public aio_mgr {
-    public :
-
-        SelectAIO();
-        ~SelectAIO();
-        void watch_fd(int fd, poll_flag flag);
-        bool unwatch_fd(int fd, poll_flag flag);
-        bool is_watched(int fd, poll_flag flag);
-        void wait_ready(vector<int> *readable, vector<int> *writable);
 
 
-    private:
-
-        fd_set rfds_;
-        fd_set wfds_;
-        int highfds_;
-        int pipefd_[2];
-
-        mutex m_;
-
-};
-
-#ifdef __linux__ 
-class EPollAIO : public aio_mgr {
-    public:
-        EPollAIO();
-        ~EPollAIO();
-        void watch_fd(int fd, poll_flag flag);
-        bool unwatch_fd(int fd, poll_flag flag);
-        bool is_watched(int fd, poll_flag flag);
-        void wait_ready(vector<int> *readable, vector<int> *writable);
-
-    private:
-        int pollfd_;
-        struct epoll_event ready_[MAX_POLL_FDS];
-        int fdstatus_[MAX_POLL_FDS];
+        map<int, aio_callback *> callbacks_;
+        class wait_manager *aio_;
+        bool pending_change_=false, shutdown_=false;
 
 
+        thread th_;
 };
 };
-#endif /* __linux */
-
-#endif /* pollmgr_h */
 
 
+#endif
index 32b25ab..47ac775 100644 (file)
@@ -1,8 +1,8 @@
 /*
 /*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a
- single RPC server.  The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+ server.  The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
@@ -11,8 +11,8 @@
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns).  When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns).  When a
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests.  The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
 
  In order to delete a connection object, we must maintain a reference count.
 
  In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object.  A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object.  A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
@@ -45,9 +43,9 @@
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3.  delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish.  It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections.  3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
@@ -63,9 +61,6 @@
 #include <netdb.h>
 #include <unistd.h>
 
 #include <netdb.h>
 #include <unistd.h>
 
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
@@ -108,7 +103,7 @@ rpcc::~rpcc() {
     VERIFY(calls_.size() == 0);
 }
 
     VERIFY(calls_.size() == 0);
 }
 
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
@@ -144,7 +139,7 @@ void rpcc::cancel(void) {
     LOG("done");
 }
 
     LOG("done");
 }
 
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
     int xid_rep;
 
     caller ca(0, &rep);
     int xid_rep;
@@ -168,11 +163,8 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         xid_rep = xid_rep_window_.front();
     }
 
         xid_rep = xid_rep_window_.front();
     }
 
-    TO curr_to;
-    auto finaldeadline = steady_clock::now() + milliseconds(to.to),
-        nextdeadline = finaldeadline;
-
-    curr_to.to = to_min.to;
+    milliseconds curr_to = rpc::to_min;
+    auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
     connection *ch = NULL;
 
     bool transmit = true;
     connection *ch = NULL;
@@ -204,7 +196,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
-        nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + curr_to;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
@@ -230,7 +222,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
             // on the new connection
             transmit = true;
         }
             // on the new connection
             transmit = true;
         }
-        curr_to.to <<= 1;
+        curr_to *= 2;
     }
 
     {
     }
 
     {
@@ -492,7 +484,7 @@ rpcs::dispatch(djob_t *j)
 
         // save the latest good connection to the client
         {
 
         // save the latest good connection to the client
         {
-            lock rwl(conss_m_);
+            lock rwl(conns_m_);
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
@@ -537,7 +529,7 @@ rpcs::dispatch(djob_t *j)
 
             // get the latest connection to the client
             {
 
             // get the latest connection to the client
             {
-                lock rwl(conss_m_);
+                lock rwl(conns_m_);
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
index 065cabc..19ec96a 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #include "marshall_wrap.h"
 #include "connection.h"
 
 #include "marshall_wrap.h"
 #include "connection.h"
 
+namespace rpc {
+    static constexpr milliseconds to_max{12000};
+    static constexpr milliseconds to_min{100};
+}
+
 class rpc_const {
     public:
         static const unsigned int bind = 1;   // handler number reserved for bind
 class rpc_const {
     public:
         static const unsigned int bind = 1;   // handler number reserved for bind
@@ -26,7 +31,6 @@ class rpc_const {
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
 class rpcc : public chanmgr {
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
 class rpcc : public chanmgr {
-
     private:
 
         //manages per rpc info
     private:
 
         //manages per rpc info
@@ -66,52 +70,42 @@ class rpcc : public chanmgr {
         list<int> xid_rep_window_;
 
         struct request {
         list<int> xid_rep_window_;
 
         struct request {
-            request() { clear(); }
             void clear() { buf.clear(); xid = -1; }
             bool isvalid() { return xid != -1; }
             string buf;
             void clear() { buf.clear(); xid = -1; }
             bool isvalid() { return xid != -1; }
             string buf;
-            int xid;
+            int xid = -1;
         };
         };
-        struct request dup_req_;
+        request dup_req_;
         int xid_rep_done_;
         int xid_rep_done_;
+
+        int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
+
+        template<class R>
+            int call_m(proc_t proc, marshall &req, R & r, milliseconds to);
     public:
 
         rpcc(const string & d, bool retrans=true);
         ~rpcc();
 
     public:
 
         rpcc(const string & d, bool retrans=true);
         ~rpcc();
 
-        struct TO {
-            int to;
-        };
-        static const TO to_max;
-        static const TO to_min;
-        static TO to(int x) { TO t; t.to = x; return t;}
-
         unsigned int id() { return clt_nonce_; }
 
         unsigned int id() { return clt_nonce_; }
 
-        int bind(TO to = to_max);
+        int bind(milliseconds to = rpc::to_max);
 
         void set_reachable(bool r) { reachable_ = r; }
 
         void cancel();
 
 
         void set_reachable(bool r) { reachable_ = r; }
 
         void cancel();
 
-        int islossy() { return lossytest_ > 0; }
-
-        int call1(proc_t proc, marshall &req, string &rep, TO to);
-
         bool got_pdu(connection *c, const string & b);
 
         bool got_pdu(connection *c, const string & b);
 
-        template<class R>
-            int call_m(proc_t proc, marshall &req, R & r, TO to);
-
         template<class R, typename ...Args>
             inline int call(proc_t proc, R & r, const Args&... args);
 
         template<class R, typename ...Args>
         template<class R, typename ...Args>
             inline int call(proc_t proc, R & r, const Args&... args);
 
         template<class R, typename ...Args>
-            inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
+            inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args);
 };
 
 template<class R> int 
 };
 
 template<class R> int 
-rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
+rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) 
 {
     string rep;
     int intret = call1(proc, req, rep, to);
 {
     string rep;
     int intret = call1(proc, req, rep, to);
@@ -130,11 +124,11 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
 template<class R, typename... Args> inline int
 rpcc::call(proc_t proc, R & r, const Args&... args)
 {
 template<class R, typename... Args> inline int
 rpcc::call(proc_t proc, R & r, const Args&... args)
 {
-    return call_timeout(proc, rpcc::to_max, r, args...);
+    return call_timeout(proc, rpc::to_max, r, args...);
 }
 
 template<class R, typename... Args> inline int
 }
 
 template<class R, typename... Args> inline int
-rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
+rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args)
 {
     marshall m{args...};
     return call_m(proc, m, r, to);
 {
     marshall m{args...};
     return call_m(proc, m, r, to);
@@ -196,7 +190,7 @@ class rpcs : public chanmgr {
     mutex procs_m_; // protect insert/delete to procs[]
     mutex count_m_;  //protect modification of counts
     mutex reply_window_m_; // protect reply window et al
     mutex procs_m_; // protect insert/delete to procs[]
     mutex count_m_;  //protect modification of counts
     mutex reply_window_m_; // protect reply window et al
-    mutex conss_m_; // protect conns_
+    mutex conns_m_; // protect conns_
 
 
     protected:
 
 
     protected:
@@ -224,17 +218,15 @@ class rpcs : public chanmgr {
 
     bool got_pdu(connection *c, const string & b);
 
 
     bool got_pdu(connection *c, const string & b);
 
-    template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
-};
+    struct ReturnOnFailure {
+        static inline int unmarshall_args_failure() {
+            return rpc_const::unmarshal_args_failure;
+        }
+    };
 
 
-struct ReturnOnFailure {
-    static inline int unmarshall_args_failure() {
-        return rpc_const::unmarshal_args_failure;
+    template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
+        reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
     }
 };
 
     }
 };
 
-template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
-    reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
-}
-
 #endif
 #endif
index 2f58e5d..723df82 100644 (file)
@@ -166,7 +166,7 @@ client3(void *xx)
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
-        int ret = c->call_timeout(24, rpcc::to(300), rep, i);
+        int ret = c->call_timeout(24, milliseconds(300), rep, i);
         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
     }
 }
         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
     }
 }
@@ -187,14 +187,14 @@ simple_tests(rpcc *c)
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
-    intret = c->call_timeout(25, rpcc::to(20000), rep, 70000);
+    intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
-    intret = c->call_timeout(23, rpcc::to(300), xx, 77);
+    intret = c->call_timeout(23, milliseconds(300), xx, 77);
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
@@ -202,7 +202,7 @@ simple_tests(rpcc *c)
     {
         string arg(1000, 'x');
         string rep2;
     {
         string arg(1000, 'x');
         string rep2;
-        c->call_timeout(22, rpcc::to(300), rep2, arg, (string)"x");
+        c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
@@ -217,7 +217,7 @@ simple_tests(rpcc *c)
     string non_existent = "127.0.0.1:7661";
     rpcc *c1 = new rpcc(non_existent);
     time_t t0 = time(0);
     string non_existent = "127.0.0.1:7661";
     rpcc *c1 = new rpcc(non_existent);
     time_t t0 = time(0);
-    intret = c1->bind(rpcc::to(300));
+    intret = c1->bind(milliseconds(300));
     time_t t1 = time(0);
     VERIFY(intret < 0 && (t1 - t0) <= 4);
     cout << "   -- rpc timeout .. ok" << endl;
     time_t t1 = time(0);
     VERIFY(intret < 0 && (t1 - t0) <= 4);
     cout << "   -- rpc timeout .. ok" << endl;
@@ -285,7 +285,7 @@ failure_test()
     delete server;
 
     client1 = new rpcc(dst);
     delete server;
 
     client1 = new rpcc(dst);
-    VERIFY (client1->bind(rpcc::to(3000)) < 0);
+    VERIFY (client1->bind(milliseconds(3000)) < 0);
     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
     delete client1;
     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
     delete client1;
diff --git a/rsm.cc b/rsm.cc
index 81f0e4c..54713cb 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -214,7 +214,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(100),
+            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
@@ -257,7 +257,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(12000), log,
+            ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
@@ -347,7 +347,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(100), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
index 01098d6..ae88169 100644 (file)
@@ -26,7 +26,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const st
         rpcc *cl = h.safebind();
         auto ret = rsm_client_protocol::OK;
         if (cl)
         rpcc *cl = h.safebind();
         auto ret = rsm_client_protocol::OK;
         if (cl)
-            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(500), rep, proc, req);
+            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
         ml.lock();
 
         if (!cl)
         ml.lock();
 
         if (!cl)
@@ -61,7 +61,7 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
         rsm_client_mutex_lock.unlock();
         cl = h.safebind();
         if (cl)
         rsm_client_mutex_lock.unlock();
         cl = h.safebind();
         if (cl)
-            ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(100), known_mems, 0);
+            ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
         rsm_client_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK)
         rsm_client_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK)