Lots more clean-ups
authorPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:35:48 +0000 (12:35 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 10 Oct 2013 16:35:48 +0000 (12:35 -0400)
13 files changed:
config.cc
handle.cc
paxos.cc
rpc/connection.cc
rpc/connection.h
rpc/file.h [new file with mode: 0644]
rpc/pollmgr.cc
rpc/pollmgr.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm_client.cc

index 5373007..35654d8 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -204,7 +204,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     cfg_mutex_lock.unlock();
     int r = 0, ret = rpc_const::bind_failure;
     if (rpcc *cl = h.safebind())
-        ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(100), r, me, vid);
+        ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
     cfg_mutex_lock.lock();
 
     heartbeat_t res = OK;
index 5287a35..1cb5cc2 100644 (file)
--- a/handle.cc
+++ b/handle.cc
@@ -40,7 +40,7 @@ rpcc * handle::safebind() {
     // value to support the assumption.
     // 
     // With RPC_LOSSY=5, tests may fail due to delays and time outs.
-    int ret = cl->bind(rpcc::to(1000));
+    int ret = cl->bind(milliseconds(1000));
     if (ret < 0) {
         LOG("bind failure! " << h->m << " " << ret);
         delete cl;
index b83a044..8b00ad8 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -94,7 +94,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         if (!r)
             continue;
         auto status = (paxos_protocol::status)r->call_timeout(
-                paxos_protocol::preparereq, rpcc::to(100), res, me, instance, proposal);
+                paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
@@ -125,7 +125,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
             continue;
         bool accept = false;
         int status = r->call_timeout(
-                paxos_protocol::acceptreq, rpcc::to(100), accept, me, instance, proposal, v);
+                paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
@@ -138,7 +138,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
         if (!r)
             continue;
         int res = 0;
-        r->call_timeout(paxos_protocol::decidereq, rpcc::to(100), res, me, instance, v);
+        r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
     }
 }
 
index 4681ae9..33e891c 100644 (file)
@@ -2,30 +2,26 @@
 #include "rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
-#include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <unistd.h>
-#include <sys/socket.h>
 #include "marshall.h"
 
 connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
-    int flags = fcntl(fd_, F_GETFL, NULL);
-    fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+    fd_.flags() |= O_NONBLOCK;
 
     signal(SIGPIPE, SIG_IGN);
 
     create_time_ = steady_clock::now();
 
-    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+    PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
 }
 
 connection::~connection() {
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
-    close(fd_);
 }
 
 void connection::incref() {
@@ -48,7 +44,7 @@ void connection::closeconn() {
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
-    PollMgr::Instance()->block_remove_fd(fd_);
+    PollMgr::Instance().block_remove_fd(fd_);
 }
 
 void connection::decref() {
@@ -98,11 +94,11 @@ bool connection::send(const string & b) {
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        PollMgr::Instance()->block_remove_fd(fd_);
+        PollMgr::Instance().block_remove_fd(fd_);
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
-        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+        PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
@@ -120,11 +116,11 @@ void connection::write_cb(int s) {
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     if (wpdu_.buf.size() == 0) {
-        PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+        PollMgr::Instance().del_callback(fd_,CB_WRONLY);
         return;
     }
     if (!writepdu()) {
-        PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+        PollMgr::Instance().del_callback(fd_, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
@@ -152,7 +148,7 @@ void connection::read_cb(int s) {
 
     if (!succ) {
         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
-        PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+        PollMgr::Instance().del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
@@ -188,11 +184,10 @@ bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
         rpc_sz_t sz1;
-        ssize_t n = read(fd_, &sz1, sizeof(sz1));
+        ssize_t n = fd_.read(sz1);
 
-        if (n == 0) {
+        if (n == 0)
             return false;
-        }
 
         if (n < 0) {
             VERIFY(errno!=EAGAIN);
@@ -218,7 +213,7 @@ bool connection::readpdu() {
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+    ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
 
     IF_LEVEL(5) LOG("read " << n << " bytes");
 
@@ -234,32 +229,33 @@ bool connection::readpdu() {
 }
 
 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
-: mgr_(m1), lossy_(lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
 {
     struct sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
     sin.sin_port = hton(port);
 
-    tcp_ = socket(AF_INET, SOCK_STREAM, 0);
-    if (tcp_ < 0) {
-        perror("accept_loop socket:");
-        VERIFY(0);
-    }
+    tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
+    tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
 
-    int yes = 1;
-    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
-    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+    struct timeval timeout = {0, 50000};
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
+
+    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+        perror("accept_loop setsockopt");
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
-    if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
-        perror("accept_loop tcp bind:");
+    if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+        perror("accept_loop bind");
         VERIFY(0);
     }
 
     if (listen(tcp_, 1000) < 0) {
-        perror("listen:");
+        perror("accept_loop listen");
         VERIFY(0);
     }
 
@@ -269,24 +265,19 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
 
     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
-    if (pipe(pipe_) < 0) {
-        perror("accept_loop pipe:");
-        VERIFY(0);
-    }
+    file_t::pipe(pipe_);
 
-    int flags = fcntl(pipe_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipe_[0], F_SETFL, flags);
+    pipe_[0].flags() |= O_NONBLOCK;
 
     th_ = thread(&tcpsconn::accept_conn, this);
 }
 
 tcpsconn::~tcpsconn()
 {
-    VERIFY(close(pipe_[1]) == 0);
+    pipe_[1].close();
     th_.join();
 
-    //close all the active connections
+    // close all the active connections
     map<int, connection *>::iterator i;
     for (i = conns_.begin(); i != conns_.end(); i++) {
         i->second->closeconn();
@@ -325,40 +316,34 @@ void tcpsconn::process_accept() {
 
 void tcpsconn::accept_conn() {
     fd_set rfds;
-    int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
-
-    try {
-        while (1) {
-            FD_ZERO(&rfds);
-            FD_SET(pipe_[0], &rfds);
-            FD_SET(tcp_, &rfds);
-
-            int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
-            if (ret < 0) {
-                if (errno == EINTR) {
-                    continue;
-                } else {
-                    perror("accept_conn select:");
-                    IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
-                    VERIFY(0);
-                }
-            }
-
-            if (FD_ISSET(pipe_[0], &rfds)) {
-                close(pipe_[0]);
-                close(tcp_);
-                return;
-            }
-            else if (FD_ISSET(tcp_, &rfds)) {
-                process_accept();
-            } else {
-                VERIFY(0);
-            }
+    int max_fd = max((int)pipe_[0], (int)tcp_);
+
+    while (1) {
+        FD_ZERO(&rfds);
+        FD_SET(pipe_[0], &rfds);
+        FD_SET(tcp_, &rfds);
+
+        int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+        if (ret < 0 && errno == EINTR)
+            continue;
+        else if (ret < 0) {
+            perror("accept_conn select:");
+            IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
+            VERIFY(0);
+        }
+
+        if (FD_ISSET(pipe_[0], &rfds))
+            return;
+
+        if (!FD_ISSET(tcp_, &rfds))
+            VERIFY(0);
+
+        try {
+            process_accept();
+        } catch (thread_exit_exception e) {
+            break;
         }
-    }
-    catch (thread_exit_exception e)
-    {
     }
 }
 
index 1eb625b..3e19a93 100644 (file)
@@ -6,6 +6,7 @@
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include "pollmgr.h"
+#include "file.h"
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
 
@@ -49,7 +50,7 @@ class connection : public aio_callback {
         bool writepdu();
 
         chanmgr *mgr_;
-        const int fd_;
+        const file_t fd_;
         bool dead_ = false;
 
         charbuf wpdu_;
@@ -77,9 +78,9 @@ class tcpsconn {
         in_port_t port_;
         mutex m_;
         thread th_;
-        int pipe_[2];
+        file_t pipe_[2];
 
-        int tcp_; //file desciptor for accepting connection
+        socket_t tcp_; // listens for connections
         chanmgr *mgr_;
         int lossy_;
         map<int, connection *> conns_;
diff --git a/rpc/file.h b/rpc/file.h
new file mode 100644 (file)
index 0000000..75c0d6e
--- /dev/null
@@ -0,0 +1,56 @@
+#ifndef file_h
+#define file_h
+
+#include <fcntl.h>
+#include <unistd.h>
+#include "types.h"
+#include <sys/socket.h>
+
+class file_t {
+    private:
+        int fd_;
+
+        class flags_t {
+            private:
+                const file_t & f_;
+                int flags_;
+            public:
+                flags_t(const file_t & f) : f_(f), flags_(fcntl(f_.fd_, F_GETFL, NULL)) { }
+                ~flags_t() { fcntl(f_.fd_, F_SETFL, flags_); }
+                operator int & () { return flags_; }
+        };
+    public:
+        inline file_t(int fd=-1) : fd_(fd) {}
+        inline file_t(const file_t &) = delete;
+        inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
+        inline ~file_t() { if (fd_ != -1) ::close(fd_); }
+        static inline void pipe(file_t *ends) {
+            int fds[2];
+            VERIFY(::pipe(fds) == 0);
+            ends[0].fd_ = fds[0];
+            ends[1].fd_ = fds[1];
+        }
+        inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
+        inline flags_t flags() const { return *this; }
+        inline void close() {
+            ::close(fd_);
+            fd_ = -1;
+        }
+        template <class T>
+        inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
+        inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); }
+        template <class T>
+        inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
+        inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); }
+};
+
+class socket_t : public file_t {
+    public:
+        socket_t(int fd=-1) : file_t(fd) {}
+        template <class T>
+        int setsockopt(int level, int option, T && value) {
+            return ::setsockopt(*this, level, option, &value, sizeof(T));
+        }
+};
+
+#endif
index a938284..4acff93 100644 (file)
@@ -1,46 +1,77 @@
 #include "types.h"
 #include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
 
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
-PollMgr *PollMgr::instance = NULL;
-static once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
 
-static void
-PollMgrInit()
-{
-    PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+    public:
+        virtual void watch_fd(int fd, poll_flag flag) = 0;
+        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+        virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+        virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+    public :
+        SelectAIO();
+        ~SelectAIO() {}
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        fd_set rfds_, wfds_;
+        int highfds_;
+        file_t pipe_[2];
+        mutex m_;
+};
 
-PollMgr *
-PollMgr::Instance()
-{
-    call_once(pollmgr_is_initialized, PollMgrInit);
-    return instance;
-}
+#ifdef __linux__ 
+class EPollAIO : public wait_manager {
+    public:
+        EPollAIO() {}
+        ~EPollAIO() throw() { }
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        file_t poll_ = epoll_create(MAX_POLL_FDS);
+        struct epoll_event ready_[MAX_POLL_FDS];
+        vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
 
-PollMgr::PollMgr() : pending_change_(false)
-{
-    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
-    aio_ = new SelectAIO();
-    //aio_ = new EPollAIO();
 
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
     th_ = thread(&PollMgr::wait_loop, this);
 }
 
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
 {
-    //never kill me!!!
-    VERIFY(0);
+    lock ml(m_);
+    for (auto p : callbacks_)
+        aio_->unwatch_fd(p.first, CB_RDWR);
+    pending_change_ = true;
+    shutdown_ = true;
+    changedone_c_.wait(ml);
+    delete aio_;
+    th_.join();
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
-    VERIFY(fd < MAX_POLL_FDS);
-
     lock ml(m_);
     aio_->watch_fd(fd, flag);
 
@@ -48,44 +79,26 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
     callbacks_[fd] = ch;
 }
 
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd.  After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
     lock ml(m_);
     aio_->unwatch_fd(fd, CB_RDWR);
     pending_change_ = true;
     changedone_c_.wait(ml);
-    callbacks_[fd] = NULL;
+    callbacks_[fd] = nullptr;
 }
 
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
     lock ml(m_);
-    if (aio_->unwatch_fd(fd, flag)) {
-        callbacks_[fd] = NULL;
-    }
+    if (aio_->unwatch_fd(fd, flag))
+        callbacks_[fd] = nullptr;
 }
 
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
-    lock ml(m_);
-    if (!callbacks_[fd] || callbacks_[fd]!=c)
-        return false;
-
-    return aio_->is_watched(fd, flag);
-}
-
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
+void PollMgr::wait_loop() {
     vector<int> readable;
     vector<int> writable;
+    aio_callback * cb;
 
     while (1) {
         {
@@ -93,121 +106,80 @@ PollMgr::wait_loop() [[noreturn]]
             if (pending_change_) {
                 pending_change_ = false;
                 changedone_c_.notify_all();
+                if (shutdown_)
+                    break;
             }
         }
         readable.clear();
         writable.clear();
-        aio_->wait_ready(&readable,&writable);
-
-        if (!readable.size() && !writable.size()) {
-            continue;
-        } 
-        //no locking of m_
-        //because no add_callback() and del_callback should 
-        //modify callbacks_[fd] while the fd is not dead
-        for (unsigned int i = 0; i < readable.size(); i++) {
-            int fd = readable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->read_cb(fd);
+        aio_->wait_ready(readable, writable);
+
+        for (auto fd : readable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->read_cb(fd);
         }
 
-        for (unsigned int i = 0; i < writable.size(); i++) {
-            int fd = writable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->write_cb(fd);
+        for (auto fd : writable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->write_cb(fd);
         }
     }
 }
 
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
 {
     FD_ZERO(&rfds_);
     FD_ZERO(&wfds_);
 
-    VERIFY(pipe(pipefd_) == 0);
-    FD_SET(pipefd_[0], &rfds_);
-    highfds_ = pipefd_[0];
+    file_t::pipe(pipe_);
 
-    int flags = fcntl(pipefd_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipefd_[0], F_SETFL, flags);
-}
+    FD_SET(pipe_[0], &rfds_);
+    highfds_ = pipe_[0];
 
-SelectAIO::~SelectAIO()
-{
+    pipe_[0].flags() |= O_NONBLOCK;
 }
 
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
+
     lock ml(m_);
     if (highfds_ <= fd) 
         highfds_ = fd;
 
-    if (flag == CB_RDONLY) {
-        FD_SET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        FD_SET(fd,&wfds_);
-    }else {
+    if (flag & CB_RDONLY)
         FD_SET(fd,&rfds_);
+
+    if (flag & CB_WRONLY)
         FD_SET(fd,&wfds_);
-    }
 
-    char tmp = 1;
-    VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    VERIFY(pipe_[1].write((char)1)==1);
 }
 
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
-    lock ml(m_);
-    if (flag == CB_RDONLY) {
-        return FD_ISSET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        return FD_ISSET(fd,&wfds_);
-    }else{
-        return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
-    }
-}
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
 
-bool 
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
     lock ml(m_);
-    if (flag == CB_RDONLY) {
+    VERIFY((flag & ~CB_RDWR) == 0);
+    if (flag & CB_RDONLY)
         FD_CLR(fd, &rfds_);
-    }else if (flag == CB_WRONLY) {
+    if (flag & CB_WRONLY)
         FD_CLR(fd, &wfds_);
-    }else if (flag == CB_RDWR) {
-        FD_CLR(fd, &wfds_);
-        FD_CLR(fd, &rfds_);
-    }else{
-        VERIFY(0);
-    }
 
-    if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
-        if (fd == highfds_) {
-            int newh = pipefd_[0];
-            for (int i = 0; i <= highfds_; i++) {
-                if (FD_ISSET(i, &rfds_)) {
-                    newh = i;
-                }else if (FD_ISSET(i, &wfds_)) {
-                    newh = i;
-                }
-            }
-            highfds_ = newh;
-        }
-    }
-    if (flag == CB_RDWR) {
-        char tmp = 1;
-        VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    int newh = pipe_[0];
+    for (int i = 0; i <= highfds_; i++) {
+        if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+            newh = i;
     }
+    highfds_ = newh;
+
+    if (flag == CB_RDWR)
+        VERIFY(pipe_[1].write((char)1)==1);
+
     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
-void
-SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
     fd_set trfds, twfds;
     int high;
 
@@ -220,130 +192,89 @@ SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
 
     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
 
-    if (ret < 0) {
-        if (errno == EINTR) {
-            return;
-        } else {
-            perror("select:");
-            IF_LEVEL(0) LOG("select_loop failure errno " << errno);
-            VERIFY(0);
-        }
+    if (ret < 0 && errno == EINTR)
+        return;
+    else if (ret < 0) {
+        perror("select:");
+        IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+        VERIFY(0);
     }
 
     for (int fd = 0; fd <= high; fd++) {
-        if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+        if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
             char tmp;
-            VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+            VERIFY(pipe_[0].read(tmp)==1);
             VERIFY(tmp==1);
-        }else {
-            if (FD_ISSET(fd, &twfds)) {
-                writable->push_back(fd);
-            }
-            if (FD_ISSET(fd, &trfds)) {
-                readable->push_back(fd);
-            }
+        } else {
+            if (FD_ISSET(fd, &twfds))
+                writable.push_back(fd);
+
+            if (FD_ISSET(fd, &trfds))
+                readable.push_back(fd);
         }
     }
 }
 
 #ifdef __linux__ 
 
-EPollAIO::EPollAIO()
-{
-    pollfd_ = epoll_create(MAX_POLL_FDS);
-    VERIFY(pollfd_ >= 0);
-    bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
-
-EPollAIO::~EPollAIO()
-{
-    close(pollfd_);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
 
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
-    int f;
-    if (flag == CB_RDONLY) {
-        f = EPOLLIN;
-    }else if (flag == CB_WRONLY) {
-        f = EPOLLOUT;
-    }else { //flag == CB_RDWR
-        f = EPOLLIN | EPOLLOUT;
-    }
-    return f;
-}
-
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
     VERIFY(fd < MAX_POLL_FDS);
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-    fdstatus_[fd] |= (int)flag;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+    fdstatus_[fd] |= (unsigned)flag;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
-    }
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
-    }
 
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
 }
 
-bool 
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
+
     VERIFY(fd < MAX_POLL_FDS);
-    fdstatus_[fd] &= ~(int)flag;
+    fdstatus_[fd] &= ~(unsigned)flag;
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
-    }
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(op == EPOLL_CTL_DEL);
-    }
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
     return (op == EPOLL_CTL_DEL);
 }
 
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
-    VERIFY(fd < MAX_POLL_FDS);
-    return ((fdstatus_[fd] & CB_MASK) == flag);
-}
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
 
-void
-EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
-    int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+    int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
     for (int i = 0; i < nfds; i++) {
-        if (ready_[i].events & EPOLLIN) {
-            readable->push_back(ready_[i].data.fd);
-        }
-        if (ready_[i].events & EPOLLOUT) {
-            writable->push_back(ready_[i].data.fd);
-        }
+        if (ready_[i].events & EPOLLIN)
+            readable.push_back(ready_[i].data.fd);
+
+        if (ready_[i].events & EPOLLOUT)
+            writable.push_back(ready_[i].data.fd);
     }
 }
 
index 2da3167..ede35f8 100644 (file)
@@ -2,11 +2,6 @@
 #define pollmgr_h 
 
 #include "types.h"
-#include <sys/select.h>
-
-#ifdef __linux__
-#include <sys/epoll.h>
-#endif
 
 #define MAX_POLL_FDS 128
 
@@ -18,15 +13,6 @@ typedef enum {
     CB_MASK = ~0x11,
 } poll_flag;
 
-class aio_mgr {
-    public:
-        virtual void watch_fd(int fd, poll_flag flag) = 0;
-        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
-        virtual bool is_watched(int fd, poll_flag flag) = 0;
-        virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
-        virtual ~aio_mgr() {}
-};
-
 class aio_callback {
     public:
         virtual void read_cb(int fd) = 0;
@@ -39,69 +25,22 @@ class PollMgr {
         PollMgr();
         ~PollMgr();
 
-        static PollMgr *Instance();
-        static PollMgr *CreateInst();
+        static PollMgr & Instance();
 
         void add_callback(int fd, poll_flag flag, aio_callback *ch);
         void del_callback(int fd, poll_flag flag);
-        bool has_callback(int fd, poll_flag flag, aio_callback *ch);
         void block_remove_fd(int fd);
         void wait_loop();
 
-
-        static PollMgr *instance;
-        static int useful;
-        static int useless;
-
     private:
         mutex m_;
         cond changedone_c_;
-        thread th_;
-
-        aio_callback *callbacks_[MAX_POLL_FDS];
-        aio_mgr *aio_;
-        bool pending_change_;
-
-};
-
-class SelectAIO : public aio_mgr {
-    public :
-
-        SelectAIO();
-        ~SelectAIO();
-        void watch_fd(int fd, poll_flag flag);
-        bool unwatch_fd(int fd, poll_flag flag);
-        bool is_watched(int fd, poll_flag flag);
-        void wait_ready(vector<int> *readable, vector<int> *writable);
 
-    private:
-
-        fd_set rfds_;
-        fd_set wfds_;
-        int highfds_;
-        int pipefd_[2];
-
-        mutex m_;
-
-};
-
-#ifdef __linux__ 
-class EPollAIO : public aio_mgr {
-    public:
-        EPollAIO();
-        ~EPollAIO();
-        void watch_fd(int fd, poll_flag flag);
-        bool unwatch_fd(int fd, poll_flag flag);
-        bool is_watched(int fd, poll_flag flag);
-        void wait_ready(vector<int> *readable, vector<int> *writable);
-
-    private:
-        int pollfd_;
-        struct epoll_event ready_[MAX_POLL_FDS];
-        int fdstatus_[MAX_POLL_FDS];
+        map<int, aio_callback *> callbacks_;
+        class wait_manager *aio_;
+        bool pending_change_=false, shutdown_=false;
 
+        thread th_;
 };
-#endif /* __linux */
-
-#endif /* pollmgr_h */
 
+#endif
index 32b25ab..47ac775 100644 (file)
@@ -1,8 +1,8 @@
 /*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a
- single RPC server.  The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+ server.  The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
 
  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
  connections from different rpcc objects.  The jobs of rpcs include accepting
@@ -11,8 +11,8 @@
 
  Both rpcc and rpcs use the connection class as an abstraction for the
  underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns).  When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns).  When a
  request/reply is received, connection makes a callback into the corresponding
  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
 
  number of threads needed to manage these connections; without async IO, at
  least one thread is needed per connection to read data without blocking other
  activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests.  The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
 
  In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object.  A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object.  A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
 
  This version of the RPC library explicitly joins exited threads to make sure
  no outstanding references exist before deleting objects.
@@ -45,9 +43,9 @@
  there are no outstanding calls on the rpcc object.
 
  To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3.  delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish.  It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections.  3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish.  It is interesting how a thread pool can be deleted
  without using thread cancellation. The trick is to inject x "poison pills" for
  a thread pool of x threads. Upon getting a poison pill instead of a normal
  task, a worker thread will exit (and thread pool destructor waits to join all
@@ -63,9 +61,6 @@
 #include <netdb.h>
 #include <unistd.h>
 
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
@@ -108,7 +103,7 @@ rpcc::~rpcc() {
     VERIFY(calls_.size() == 0);
 }
 
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
     unsigned int r;
     int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
@@ -144,7 +139,7 @@ void rpcc::cancel(void) {
     LOG("done");
 }
 
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
     int xid_rep;
@@ -168,11 +163,8 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         xid_rep = xid_rep_window_.front();
     }
 
-    TO curr_to;
-    auto finaldeadline = steady_clock::now() + milliseconds(to.to),
-        nextdeadline = finaldeadline;
-
-    curr_to.to = to_min.to;
+    milliseconds curr_to = rpc::to_min;
+    auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
 
     bool transmit = true;
     connection *ch = NULL;
@@ -204,7 +196,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
         if(finaldeadline == time_point<steady_clock>::min())
             break;
 
-        nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+        nextdeadline = steady_clock::now() + curr_to;
         if(nextdeadline > finaldeadline) {
             nextdeadline = finaldeadline;
             finaldeadline = time_point<steady_clock>::min();
@@ -230,7 +222,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
             // on the new connection
             transmit = true;
         }
-        curr_to.to <<= 1;
+        curr_to *= 2;
     }
 
     {
@@ -492,7 +484,7 @@ rpcs::dispatch(djob_t *j)
 
         // save the latest good connection to the client
         {
-            lock rwl(conss_m_);
+            lock rwl(conns_m_);
             if(conns_.find(h.clt_nonce) == conns_.end()){
                 c->incref();
                 conns_[h.clt_nonce] = c;
@@ -537,7 +529,7 @@ rpcs::dispatch(djob_t *j)
 
             // get the latest connection to the client
             {
-                lock rwl(conss_m_);
+                lock rwl(conns_m_);
                 if(c->isdead() && c != conns_[h.clt_nonce]){
                     c->decref();
                     c = conns_[h.clt_nonce];
index 065cabc..19ec96a 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #include "marshall_wrap.h"
 #include "connection.h"
 
+namespace rpc {
+    static constexpr milliseconds to_max{12000};
+    static constexpr milliseconds to_min{100};
+}
+
 class rpc_const {
     public:
         static const unsigned int bind = 1;   // handler number reserved for bind
@@ -26,7 +31,6 @@ class rpc_const {
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
 class rpcc : public chanmgr {
-
     private:
 
         //manages per rpc info
@@ -66,52 +70,42 @@ class rpcc : public chanmgr {
         list<int> xid_rep_window_;
 
         struct request {
-            request() { clear(); }
             void clear() { buf.clear(); xid = -1; }
             bool isvalid() { return xid != -1; }
             string buf;
-            int xid;
+            int xid = -1;
         };
-        struct request dup_req_;
+        request dup_req_;
         int xid_rep_done_;
+
+        int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
+
+        template<class R>
+            int call_m(proc_t proc, marshall &req, R & r, milliseconds to);
     public:
 
         rpcc(const string & d, bool retrans=true);
         ~rpcc();
 
-        struct TO {
-            int to;
-        };
-        static const TO to_max;
-        static const TO to_min;
-        static TO to(int x) { TO t; t.to = x; return t;}
-
         unsigned int id() { return clt_nonce_; }
 
-        int bind(TO to = to_max);
+        int bind(milliseconds to = rpc::to_max);
 
         void set_reachable(bool r) { reachable_ = r; }
 
         void cancel();
 
-        int islossy() { return lossytest_ > 0; }
-
-        int call1(proc_t proc, marshall &req, string &rep, TO to);
-
         bool got_pdu(connection *c, const string & b);
 
-        template<class R>
-            int call_m(proc_t proc, marshall &req, R & r, TO to);
-
         template<class R, typename ...Args>
             inline int call(proc_t proc, R & r, const Args&... args);
 
         template<class R, typename ...Args>
-            inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
+            inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args);
 };
 
 template<class R> int 
-rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
+rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) 
 {
     string rep;
     int intret = call1(proc, req, rep, to);
@@ -130,11 +124,11 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
 template<class R, typename... Args> inline int
 rpcc::call(proc_t proc, R & r, const Args&... args)
 {
-    return call_timeout(proc, rpcc::to_max, r, args...);
+    return call_timeout(proc, rpc::to_max, r, args...);
 }
 
 template<class R, typename... Args> inline int
-rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
+rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args)
 {
     marshall m{args...};
     return call_m(proc, m, r, to);
@@ -196,7 +190,7 @@ class rpcs : public chanmgr {
     mutex procs_m_; // protect insert/delete to procs[]
     mutex count_m_;  //protect modification of counts
     mutex reply_window_m_; // protect reply window et al
-    mutex conss_m_; // protect conns_
+    mutex conns_m_; // protect conns_
 
 
     protected:
@@ -224,17 +218,15 @@ class rpcs : public chanmgr {
 
     bool got_pdu(connection *c, const string & b);
 
-    template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
-};
+    struct ReturnOnFailure {
+        static inline int unmarshall_args_failure() {
+            return rpc_const::unmarshal_args_failure;
+        }
+    };
 
-struct ReturnOnFailure {
-    static inline int unmarshall_args_failure() {
-        return rpc_const::unmarshal_args_failure;
+    template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
+        reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
     }
 };
 
-template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
-    reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
-}
-
 #endif
index 2f58e5d..723df82 100644 (file)
@@ -166,7 +166,7 @@ client3(void *xx)
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
-        int ret = c->call_timeout(24, rpcc::to(300), rep, i);
+        int ret = c->call_timeout(24, milliseconds(300), rep, i);
         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
     }
 }
@@ -187,14 +187,14 @@ simple_tests(rpcc *c)
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
-    intret = c->call_timeout(25, rpcc::to(20000), rep, 70000);
+    intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
-    intret = c->call_timeout(23, rpcc::to(300), xx, 77);
+    intret = c->call_timeout(23, milliseconds(300), xx, 77);
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
@@ -202,7 +202,7 @@ simple_tests(rpcc *c)
     {
         string arg(1000, 'x');
         string rep2;
-        c->call_timeout(22, rpcc::to(300), rep2, arg, (string)"x");
+        c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
@@ -217,7 +217,7 @@ simple_tests(rpcc *c)
     string non_existent = "127.0.0.1:7661";
     rpcc *c1 = new rpcc(non_existent);
     time_t t0 = time(0);
-    intret = c1->bind(rpcc::to(300));
+    intret = c1->bind(milliseconds(300));
     time_t t1 = time(0);
     VERIFY(intret < 0 && (t1 - t0) <= 4);
     cout << "   -- rpc timeout .. ok" << endl;
@@ -285,7 +285,7 @@ failure_test()
     delete server;
 
     client1 = new rpcc(dst);
-    VERIFY (client1->bind(rpcc::to(3000)) < 0);
+    VERIFY (client1->bind(milliseconds(3000)) < 0);
     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
     delete client1;
diff --git a/rsm.cc b/rsm.cc
index 81f0e4c..54713cb 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -214,7 +214,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(100),
+            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
@@ -257,7 +257,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
         rsm_mutex_lock.unlock();
         cl = h.safebind();
         if (cl != 0) {
-            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(12000), log,
+            ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
                     cfg->myaddr(), last_myvs);
         }
         rsm_mutex_lock.lock();
@@ -347,7 +347,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(100), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
index 01098d6..ae88169 100644 (file)
@@ -26,7 +26,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const st
         rpcc *cl = h.safebind();
         auto ret = rsm_client_protocol::OK;
         if (cl)
-            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(500), rep, proc, req);
+            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
         ml.lock();
 
         if (!cl)
@@ -61,7 +61,7 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
         rsm_client_mutex_lock.unlock();
         cl = h.safebind();
         if (cl)
-            ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(100), known_mems, 0);
+            ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
         rsm_client_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK)