Working on g++ compatibility
authorPeter Iannucci <iannucci@mit.edu>
Fri, 1 Nov 2013 17:33:54 +0000 (13:33 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 1 Nov 2013 17:33:54 +0000 (13:33 -0400)
26 files changed:
Makefile.osx
config.cc
config.h
lock_client.cc
lock_client.h
lock_server.cc
lock_server.h
lock_tester.cc
log.h
paxos.cc
paxos.h
paxos_protocol.h
rpc/connection.cc
rpc/connection.h
rpc/file.h
rpc/marshall.h
rpc/marshall_wrap.h
rpc/poll_mgr.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpc_protocol.h
rpc/rpctest.cc
rsm.cc
rsm.h
rsm_protocol.h
types.h

index f69851c..80d85a3 100644 (file)
@@ -1,13 +1,15 @@
 PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \
-                  -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
-                  -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
-                  -Wno-exit-time-destructors
-#OPTFLAGS = -ftrapv -O4
-OPTFLAGS =
-CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS)
-LDFLAGS = -stdlib=libc++ $(OPTFLAGS)
-CXX = clang++
-CC = clang++
+                  -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
+                  -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
+                  -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++
+OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins 
+STDLIB = -stdlib=libc++ 
+#STDLIB = 
+CXX = clang++-mp-3.4
+#CXX = g++-mp-4.8
+CXXFLAGS = -std=c++11 -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -std=c++11 $(STDLIB) $(OPTFLAGS)
+CC := $(CXX)
 EXTRA_TARGETS = signatures
 
 socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
index 38c4c05..2b40078 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -145,7 +145,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) {
     return r;
 }
 
-void config::heartbeater() [[noreturn]] {
+void config::heartbeater() {
     lock cfg_mutex_lock(cfg_mutex);
 
     while (1) {
index 73940a0..7124a6e 100644 (file)
--- a/config.h
+++ b/config.h
@@ -41,7 +41,7 @@ class config : public paxos_change {
         void restore(const string &s);
         bool add(const string &, unsigned view_id);
         bool ismember(const string &m, unsigned view_id);
-        void heartbeater(void);
+        void heartbeater NORETURN ();
         void paxos_commit(unsigned instance, const string &v);
         rpcs *get_rpcs() { return paxos.get_rpcs(); }
         void breakpoint(int b) { paxos.breakpoint(b); }
index 8864ce9..3c5aa89 100644 (file)
@@ -46,7 +46,7 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi
     rlsrpc->start();
 }
 
-void lock_client::releaser() [[noreturn]] {
+void lock_client::releaser() {
     while (1) {
         lock_protocol::lockid_t lid;
         release_fifo.deq(&lid);
index 728fbf7..73dfffe 100644 (file)
@@ -63,7 +63,7 @@ class lock_client {
         lock_protocol::status acquire(lock_protocol::lockid_t);
         lock_protocol::status release(lock_protocol::lockid_t);
         int stat(lock_protocol::lockid_t);
-        void releaser();
+        void releaser NORETURN ();
         rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
         rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
 };
index 522a917..0c3a6e9 100644 (file)
@@ -34,7 +34,7 @@ lock_server::lock_server(rsm *r) : rsm_ (r) {
     rsm_->set_state_transfer(this);
 }
 
-void lock_server::revoker() [[noreturn]] {
+void lock_server::revoker () {
     while (1) {
         lock_protocol::lockid_t lid;
         revoke_fifo.deq(&lid);
@@ -62,7 +62,7 @@ void lock_server::revoker() [[noreturn]] {
     }
 }
 
-void lock_server::retryer() [[noreturn]] {
+void lock_server::retryer() {
     while (1) {
         lock_protocol::lockid_t lid;
         retry_fifo.deq(&lid);
index 6ba4902..d3ec580 100644 (file)
@@ -22,7 +22,7 @@ public:
     MEMBERS(held, held_by, wanted_by)
 };
 
-MARSHALLABLE(lock_state)
+MARSHALLABLE_STRUCT(lock_state)
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 
@@ -37,8 +37,8 @@ class lock_server : public rsm_state_transfer {
         rsm *rsm_;
     public:
         lock_server(rsm *r = 0);
-        void revoker();
-        void retryer();
+        void revoker NORETURN ();
+        void retryer NORETURN ();
         string marshal_state();
         void unmarshal_state(const string & state);
         lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
index 0204a71..f535d8f 100644 (file)
@@ -10,17 +10,17 @@ char log_thread_prefix = 'c';
 
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
-string dst;
-lock_client **lc = new lock_client * [nt];
-lock_protocol::lockid_t a = "1";
-lock_protocol::lockid_t b = "2";
-lock_protocol::lockid_t c = "3";
+static string dst;
+static lock_client **lc = new lock_client * [nt];
+static lock_protocol::lockid_t a = "1";
+static lock_protocol::lockid_t b = "2";
+static lock_protocol::lockid_t c = "3";
 
 // check_grant() and check_release() check that the lock server
 // doesn't grant the same lock to both clients.
 // it assumes that lock names are distinct in the first byte.
-int ct[256];
-mutex count_mutex;
+static int ct[256];
+static mutex count_mutex;
 
 void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
diff --git a/log.h b/log.h
index e8acd4a..201cb80 100644 (file)
--- a/log.h
+++ b/log.h
@@ -24,4 +24,4 @@ class log {
         void logaccept(prop_t n_a, string v);
 };
 
-#endif /* log_h */
+#endif
index ab60302..a88a7a5 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -7,7 +7,7 @@ bool isamember(const node_t & m, const nodes_t & nodes) {
 
 // check if l2 contains a majority of the elements of l1
 bool majority(const nodes_t &l1, const nodes_t &l2) {
-    auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+    auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
     return overlap >= (l1.size() >> 1) + 1;
 }
 
diff --git a/paxos.h b/paxos.h
index 8c9fc8f..db18e6c 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -45,7 +45,7 @@ class proposer_acceptor {
         map<unsigned,value_t> values;   // vals of each instance
 
         friend class log;
-        log l = {this, me};
+        class log l = {this, me};
 
         void commit(unsigned instance, const value_t & v);
         void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock);
index 1f5fd3e..c61e2eb 100644 (file)
@@ -12,7 +12,7 @@ struct prop_t {
     LEXICOGRAPHIC_COMPARISON(prop_t)
 };
 
-MARSHALLABLE(prop_t)
+MARSHALLABLE_STRUCT(prop_t)
 
 namespace paxos_protocol {
     enum status : rpc_protocol::status { OK, ERR };
@@ -35,6 +35,6 @@ namespace paxos_protocol {
     REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
 };
 
-MARSHALLABLE(paxos_protocol::prepareres)
+MARSHALLABLE_STRUCT(paxos_protocol::prepareres)
 
 #endif
index 358a2af..b29e136 100644 (file)
@@ -6,25 +6,32 @@
 #include <unistd.h>
 #include "marshall.h"
 
-connection::connection(connection_delegate *m1, socket_t && f1, int l1)
-: mgr_(m1), fd_(move(f1)), lossy_(l1)
+connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
+: fd(move(f1)), delegate_(delegate), lossy_(l1)
 {
-    fd_.flags() |= O_NONBLOCK;
+    fd.flags() |= O_NONBLOCK;
 
     signal(SIGPIPE, SIG_IGN);
 
-    create_time_ = steady_clock::now();
-
-    poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
+    poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
 }
 
 connection::~connection() {
-    closeconn();
+    {
+        lock ml(m_);
+        if (dead_)
+            return;
+        dead_ = true;
+        shutdown(fd,SHUT_RDWR);
+    }
+    // after block_remove_fd, select will never wait on fd and no callbacks
+    // will be active
+    poll_mgr::shared_mgr.block_remove_fd(fd);
     VERIFY(dead_);
     VERIFY(!wpdu_.buf.size());
 }
 
-shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
+shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
@@ -33,20 +40,7 @@ shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_del
         return nullptr;
     }
     IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
-    return make_shared<connection>(mgr, move(s), lossy);
-}
-
-void connection::closeconn() {
-    {
-        lock ml(m_);
-        if (dead_)
-            return;
-        dead_ = true;
-        shutdown(fd_,SHUT_RDWR);
-    }
-    //after block_remove_fd, select will never wait on fd_
-    //and no callbacks will be active
-    poll_mgr::shared_mgr.block_remove_fd(fd_);
+    return make_shared<connection>(delegate, move(s), lossy);
 }
 
 bool connection::send(const string & b) {
@@ -65,19 +59,19 @@ bool connection::send(const string & b) {
 
     if (lossy_) {
         if ((random()%100) < lossy_) {
-            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
-            shutdown(fd_,SHUT_RDWR);
+            IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
+            shutdown(fd,SHUT_RDWR);
         }
     }
 
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        poll_mgr::shared_mgr.block_remove_fd(fd_);
+        poll_mgr::shared_mgr.block_remove_fd(fd);
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
-        poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
+        poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
@@ -89,17 +83,17 @@ bool connection::send(const string & b) {
     return ret;
 }
 
-//fd_ is ready to be written
+// fd is ready to be written
 void connection::write_cb(int s) {
     lock ml(m_);
     VERIFY(!dead_);
-    VERIFY(fd_ == s);
+    VERIFY(fd == s);
     if (wpdu_.buf.size() == 0) {
-        poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
+        poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
         return;
     }
     if (!writepdu()) {
-        poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
+        poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
@@ -110,28 +104,26 @@ void connection::write_cb(int s) {
     send_complete_.notify_one();
 }
 
-// fd_ is ready to be read
+// fd is ready to be read
 void connection::read_cb(int s) {
     lock ml(m_);
-    VERIFY(fd_ == s);
+    VERIFY(fd == s);
     if (dead_)
         return;
 
     IF_LEVEL(5) LOG("got data on fd " << s);
 
-    bool succ = true;
-    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
-        succ = readpdu();
-
-    if (!succ) {
-        IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
-        poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
-        dead_ = true;
-        send_complete_.notify_one();
+    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
+        if (!readpdu()) {
+            IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
+            poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+            dead_ = true;
+            send_complete_.notify_one();
+        }
     }
 
     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
-        if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
+        if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
             // connection_delegate has successfully consumed the pdu
             rpdu_.buf.clear();
             rpdu_.solong = 0;
@@ -144,10 +136,10 @@ bool connection::writepdu() {
     if (wpdu_.solong == wpdu_.buf.size())
         return true;
 
-    ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
+    ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
-            IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
+            IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
             wpdu_.buf.clear();
         }
@@ -161,7 +153,7 @@ bool connection::readpdu() {
     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
     if (!rpdu_.buf.size()) {
         rpc_protocol::rpc_sz_t sz1;
-        ssize_t n = fd_.read(sz1);
+        ssize_t n = fd.read(sz1);
 
         if (n == 0)
             return false;
@@ -185,12 +177,11 @@ bool connection::readpdu() {
 
         IF_LEVEL(5) LOG("read size of datagram = " << sz);
 
-        VERIFY(rpdu_.buf.size() == 0);
-        rpdu_.buf = string(sz+sizeof(sz1), 0);
+        rpdu_.buf.assign(sz+sizeof(sz1), 0);
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+    ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
 
     IF_LEVEL(5) LOG("read " << n << " bytes");
 
@@ -199,27 +190,25 @@ bool connection::readpdu() {
             return true;
         rpdu_.buf.clear();
         rpdu_.solong = 0;
-        return (errno == EAGAIN);
+        return false;
     }
     rpdu_.solong += (size_t)n;
     return true;
 }
 
-tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
-: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
+connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
 {
-    sockaddr_in sin{}; // zero initialize
-    sin.sin_family = AF_INET;
-    sin.sin_port = hton(port);
-
     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
     tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
     tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
 
-    // careful to exactly match type signature of bind arguments so we don't
-    // get std::bind instead
-    if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+    sockaddr_in sin{}; // zero initialize
+    sin.sin_family = AF_INET;
+    sin.sin_port = hton(port);
+
+    if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
         perror("accept_loop bind");
         VERIFY(0);
     }
@@ -238,25 +227,21 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
     poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
 }
 
-tcpsconn::~tcpsconn()
-{
+connection_listener::~connection_listener() {
     poll_mgr::shared_mgr.block_remove_fd(tcp_);
-
-    for (auto & i : conns_)
-        i.second->closeconn();
 }
 
-void tcpsconn::read_cb(int) {
+void connection_listener::read_cb(int) {
     sockaddr_in sin;
     socklen_t slen = sizeof(sin);
     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
     if (s1 < 0) {
-        perror("tcpsconn::accept_conn error");
+        perror("connection_listener::accept_conn error");
         throw thread_exit_exception();
     }
 
     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
-    auto ch = make_shared<connection>(mgr_, s1, lossy_);
+    auto ch = make_shared<connection>(delegate_, s1, lossy_);
 
     // garbage collect dead connections
     for (auto i = conns_.begin(); i != conns_.end();) {
@@ -266,5 +251,5 @@ void tcpsconn::read_cb(int) {
             ++i;
     }
 
-    conns_[ch->channo()] = ch;
+    conns_[s1] = ch;
 }
index 87d17e4..8f7d494 100644 (file)
@@ -21,24 +21,18 @@ class connection_delegate {
 
 class connection : private aio_callback, public enable_shared_from_this<connection> {
     public:
-        struct charbuf {
-            string buf;
-            size_t solong = 0; // number of bytes written or read so far
-        };
-
-        connection(connection_delegate *m1, socket_t && f1, int lossytest=0);
+        connection(connection_delegate * delegate, socket_t && f1, int lossytest=0);
         ~connection();
 
-        int channo() { return fd_; }
-        bool isdead() { lock ml(m_); return dead_; }
-        void closeconn();
+        bool isdead() { return dead_; }
 
         bool send(const string & b);
 
-        time_point<steady_clock> create_time() const { return create_time_; }
-
         static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
 
+        const time_point<steady_clock> create_time = steady_clock::now();
+        const file_t fd;
+
     private:
         void write_cb(int s);
         void read_cb(int s);
@@ -46,15 +40,17 @@ class connection : private aio_callback, public enable_shared_from_this<connecti
         bool readpdu();
         bool writepdu();
 
-        connection_delegate *mgr_;
-        const file_t fd_;
+        connection_delegate * delegate_;
         bool dead_ = false;
 
+        struct charbuf {
+            string buf;
+            size_t solong = 0; // number of bytes written or read so far
+        };
+
         charbuf wpdu_;
         charbuf rpdu_;
 
-        time_point<steady_clock> create_time_;
-
         int waiters_ = 0;
         int lossy_ = 0;
 
@@ -63,10 +59,10 @@ class connection : private aio_callback, public enable_shared_from_this<connecti
         cond send_wait_;
 };
 
-class tcpsconn : private aio_callback {
+class connection_listener : private aio_callback {
     public:
-        tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
-        ~tcpsconn();
+        connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0);
+        ~connection_listener();
         inline in_port_t port() { return port_; }
     private:
         void write_cb(int) {}
@@ -76,7 +72,7 @@ class tcpsconn : private aio_callback {
         mutex m_;
 
         socket_t tcp_; // listens for connections
-        connection_delegate *mgr_;
+        connection_delegate * delegate_;
         int lossy_;
         map<int, shared_ptr<connection>> conns_;
 };
index 75c0d6e..393b98a 100644 (file)
@@ -22,7 +22,7 @@ class file_t {
     public:
         inline file_t(int fd=-1) : fd_(fd) {}
         inline file_t(const file_t &) = delete;
-        inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
+        inline file_t(file_t && other) : fd_(-1) { swap(fd_, other.fd_); }
         inline ~file_t() { if (fd_ != -1) ::close(fd_); }
         static inline void pipe(file_t *ends) {
             int fds[2];
@@ -31,7 +31,7 @@ class file_t {
             ends[1].fd_ = fds[1];
         }
         inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
-        inline flags_t flags() const { return *this; }
+        inline flags_t flags() const { return {*this}; }
         inline void close() {
             ::close(fd_);
             fd_ = -1;
index 6412612..6e0c94a 100644 (file)
@@ -149,13 +149,13 @@ operator>>(unmarshall & u, tuple<Args &...> && t) {
 //
 
 // Implements struct marshalling via tuple marshalling of members.
-#define MARSHALLABLE(_c_) \
+#define MARSHALLABLE_STRUCT(_c_) \
 inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
 inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
 
 // our first two marshallable structs...
-MARSHALLABLE(rpc_protocol::request_header)
-MARSHALLABLE(rpc_protocol::reply_header)
+MARSHALLABLE_STRUCT(rpc_protocol::request_header)
+MARSHALLABLE_STRUCT(rpc_protocol::reply_header)
 
 //
 // Marshalling for STL containers
index 8d5b15a..8e10a75 100644 (file)
@@ -85,7 +85,7 @@ struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
         return new handler([=](unmarshall &u, marshall &m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
-            ArgsStorage t = {u._grab<typename decay<Args>::type>()...};
+            ArgsStorage t{u._grab<typename decay<Args>::type>()...};
             // Verify successful unmarshalling of the entire input stream.
             if (!u.okdone())
                 return (RV)ErrorHandler::unmarshall_args_failure();
index 94b24e3..dc42274 100644 (file)
@@ -14,7 +14,7 @@ class wait_manager {
         virtual void watch_fd(int fd, poll_flag flag) = 0;
         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
-        virtual ~wait_manager() throw() {}
+        virtual ~wait_manager() noexcept {}
 };
 
 class SelectAIO : public wait_manager {
index 964102f..0c3a97d 100644 (file)
@@ -1,56 +1,56 @@
-/*
- The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
- server.  The jobs of rpcc include maintaining a connection to server, sending
- RPC requests and waiting for responses, retransmissions, at-most-once delivery
- etc.
-
- The rpcs class handles the server side of RPC.  Each rpcs handles multiple
- connections from different rpcc objects.  The jobs of rpcs include accepting
- connections, dispatching requests to registered RPC handlers, at-most-once
- delivery etc.
-
- Both rpcc and rpcs use the connection class as an abstraction for the
- underlying communication channel.  To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has
- failed (thus the caller can free the buffer when send() returns).  When a
- request/reply is received, connection makes a callback into the corresponding
- rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
-
- Thread organization:
- rpcc uses application threads to send RPC requests and blocks to receive the
- reply or error. All connections use a single PollMgr object to perform async
- socket IO.  PollMgr creates a single thread to examine the readiness of socket
- file descriptors and informs the corresponding connection whenever a socket is
- ready to be read or written.  (We use asynchronous socket IO to reduce the
- number of threads needed to manage these connections; without async IO, at
- least one thread is needed per connection to read data without blocking other
- activities.)  Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests.  The thread pool allows
- us to control the number of threads spawned at the server (spawning one thread
- per request will hurt when the server faces thousands of requests).
-
- In order to delete a connection object, we must maintain a reference count.
- For rpcc, multiple client threads might be invoking the rpcc::call() functions
- and thus holding multiple references to the underlying connection object. For
- rpcs, multiple dispatch threads might be holding references to the same
- connection object.  A connection object is deleted only when the underlying
- connection is dead and the reference count reaches zero.
-
- This version of the RPC library explicitly joins exited threads to make sure
- no outstanding references exist before deleting objects.
-
- To delete a rpcc object safely, the users of the library must ensure that
- there are no outstanding calls on the rpcc object.
-
- To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.  3.
- delete the dispatch thread pool which involves waiting for current active RPC
- handlers to finish.  It is interesting how a thread pool can be deleted
- without using thread cancellation. The trick is to inject x "poison pills" for
- a thread pool of x threads. Upon getting a poison pill instead of a normal
- task, a worker thread will exit (and thread pool destructor waits to join all
- x exited worker threads).
- */
+//
+// The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
+// server.  The jobs of rpcc include maintaining a connection to server, sending
+// RPC requests and waiting for responses, retransmissions, at-most-once delivery
+// etc.
+//
+// The rpcs class handles the server side of RPC.  Each rpcs handles multiple
+// connections from different rpcc objects.  The jobs of rpcs include accepting
+// connections, dispatching requests to registered RPC handlers, at-most-once
+// delivery etc.
+//
+// Both rpcc and rpcs use the connection class as an abstraction for the
+// underlying communication channel.  To send an RPC request/reply, one calls
+// connection::send() which blocks until data is sent or the connection has
+// failed (thus the caller can free the buffer when send() returns).  When a
+// request/reply is received, connection makes a callback into the corresponding
+// rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
+//
+// Thread organization:
+// rpcc uses application threads to send RPC requests and blocks to receive the
+// reply or error. All connections use a single PollMgr object to perform async
+// socket IO.  PollMgr creates a single thread to examine the readiness of socket
+// file descriptors and informs the corresponding connection whenever a socket is
+// ready to be read or written.  (We use asynchronous socket IO to reduce the
+// number of threads needed to manage these connections; without async IO, at
+// least one thread is needed per connection to read data without blocking other
+// activities.)  Each rpcs object creates one thread for listening on the server
+// port and a pool of threads for executing RPC requests.  The thread pool allows
+// us to control the number of threads spawned at the server (spawning one thread
+// per request will hurt when the server faces thousands of requests).
+//
+// In order to delete a connection object, we must maintain a reference count.
+// For rpcc, multiple client threads might be invoking the rpcc::call() functions
+// and thus holding multiple references to the underlying connection object. For
+// rpcs, multiple dispatch threads might be holding references to the same
+// connection object.  A connection object is deleted only when the underlying
+// connection is dead and the reference count reaches zero.
+//
+// This version of the RPC library explicitly joins exited threads to make sure
+// no outstanding references exist before deleting objects.
+//
+// To delete a rpcc object safely, the users of the library must ensure that
+// there are no outstanding calls on the rpcc object.
+//
+// To delete a rpcs object safely, we do the following in sequence: 1. stop
+// accepting new incoming connections. 2. close existing active connections.  3.
+// delete the dispatch thread pool which involves waiting for current active RPC
+// handlers to finish.  It is interesting how a thread pool can be deleted
+// without using thread cancellation. The trick is to inject x "poison pills" for
+// a thread pool of x threads. Upon getting a poison pill instead of a normal
+// task, a worker thread will exit (and thread pool destructor waits to join all
+// x exited worker threads).
+//
 
 #include "rpc.h"
 
@@ -58,6 +58,7 @@
 #include <netinet/tcp.h>
 #include <netdb.h>
 #include <unistd.h>
+#include <string.h>
 
 inline void set_rand_seed() {
     auto now = time_point_cast<nanoseconds>(steady_clock::now());
@@ -72,7 +73,7 @@ rpcc::rpcc(const string & d, bool retrans) :
 {
     if (retrans) {
         set_rand_seed();
-        clt_nonce_ = (unsigned int)random();
+        clt_nonce_ = (nonce_t)random();
     } else {
         // special client nonce 0 means this client does not
         // require at-most-once logic from the server
@@ -94,15 +95,14 @@ rpcc::rpcc(const string & d, bool retrans) :
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
     cancel();
-    IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
-    if (chan_)
-        chan_->closeconn();
+    IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1));
+    chan_.reset();
     VERIFY(calls_.size() == 0);
 }
 
 int rpcc::bind(milliseconds to) {
-    unsigned int r;
-    int ret = call_timeout(rpc_protocol::bind, to, r, 0);
+    nonce_t r;
+    int ret = call_timeout(rpc_protocol::bind, to, r);
     if (ret == 0) {
         lock ml(m_);
         bind_done_ = true;
@@ -140,7 +140,7 @@ void rpcc::cancel(void) {
 int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
 
     caller ca(0, &rep);
-    int xid_rep;
+    xid_t xid_rep;
     {
         lock ml(m_);
 
@@ -325,11 +325,11 @@ compress:
     }
 }
 
-rpcs::rpcs(in_port_t p1, size_t count)
-  : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
+rpcs::rpcs(in_port_t p1)
+  : port_(p1), reachable_ (true)
 {
     set_rand_seed();
-    nonce_ = (unsigned int)random();
+    nonce_ = (nonce_t)random();
     IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
     reg(rpc_protocol::bind, &rpcs::rpcbind, this);
@@ -338,7 +338,7 @@ rpcs::rpcs(in_port_t p1, size_t count)
 
 void rpcs::start() {
     char *loss_env = getenv("RPC_LOSSY");
-    listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
+    listener_.reset(new connection_listener(this, port_, loss_env ? atoi(loss_env) : 0));
 }
 
 rpcs::~rpcs() {
@@ -354,7 +354,7 @@ bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
         return true;
     }
 
-    return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
+    return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b));
 }
 
 void rpcs::reg1(proc_id_t proc, handler *h) {
@@ -364,29 +364,6 @@ void rpcs::reg1(proc_id_t proc, handler *h) {
     VERIFY(procs_.count(proc) >= 1);
 }
 
-void rpcs::updatestat(proc_id_t proc) {
-    lock cl(count_m_);
-    counts_[proc]++;
-    curr_counts_--;
-    if (curr_counts_ == 0) {
-        LOG("RPC STATS: ");
-        for (auto i = counts_.begin(); i != counts_.end(); i++)
-            LOG(hex << i->first << ":" << dec << i->second);
-
-        lock rwl(reply_window_m_);
-
-        size_t totalrep = 0, maxrep = 0;
-        for (auto clt : reply_window_) {
-            totalrep += clt.second.size();
-            if (clt.second.size() > maxrep)
-                maxrep = clt.second.size();
-        }
-        IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
-                        totalrep << " max per client " << maxrep);
-        curr_counts_ = counting_;
-    }
-}
-
 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     unmarshall req(buf, true);
 
@@ -440,7 +417,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
-                                " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
+                                " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
             }
         }
 
@@ -449,7 +426,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
             lock rwl(conns_m_);
             if (conns_.find(h.clt_nonce) == conns_.end())
                 conns_[h.clt_nonce] = c;
-            else if (conns_[h.clt_nonce]->create_time() < c->create_time())
+            else if (conns_[h.clt_nonce]->create_time < c->create_time)
                 conns_[h.clt_nonce] = c;
         }
 
@@ -461,9 +438,6 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
 
     switch (stat) {
         case NEW: // new request
-            if (counting_)
-                updatestat(proc);
-
             rh.ret = (*f)(req, rep);
             if (rh.ret == rpc_protocol::unmarshal_args_failure) {
                 LOG("failed to unmarshall the arguments. You are " <<
@@ -522,8 +496,8 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
 //   DONE: seen this xid, previous reply returned in b.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
-        int xid_rep, string & b)
+rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+        xid_t xid_rep, string & b)
 {
     lock rwl(reply_window_m_);
 
@@ -532,7 +506,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
 
-    int past_xid_rep = l.begin()->xid;
+    xid_t past_xid_rep = l.begin()->xid;
 
     list<reply_t>::iterator start = l.begin(), it = ++start;
 
@@ -571,7 +545,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 // add_reply() should remember b.
 // free_reply_window() and checkduplicate_and_update are responsible for
 // cleaning up the remembered values.
-void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
+void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
     list<reply_t> &l = reply_window_[clt_nonce];
@@ -592,7 +566,7 @@ void rpcs::free_reply_window(void) {
     reply_window_.clear();
 }
 
-int rpcs::rpcbind(unsigned int &r, int) {
+int rpcs::rpcbind(nonce_t &r) {
     IF_LEVEL(2) LOG("called return nonce " << nonce_);
     r = nonce_;
     return 0;
index 7b65101..3ae7737 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -37,10 +37,12 @@ class rpcc : private connection_delegate {
         using proc_id_t = rpc_protocol::proc_id_t;
         template <class S>
         using proc_t = rpc_protocol::proc_t<S>;
+        using nonce_t = rpc_protocol::nonce_t;
+        using xid_t = rpc_protocol::xid_t;
 
         // manages per rpc info
         struct caller {
-            caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
+            caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {}
 
             int xid;
             string *rep;
@@ -51,14 +53,14 @@ class rpcc : private connection_delegate {
         };
 
         void get_refconn(shared_ptr<connection> & ch);
-        void update_xid_rep(int xid);
+        void update_xid_rep(xid_t xid);
 
 
         sockaddr_in dst_;
-        unsigned int clt_nonce_;
-        unsigned int srv_nonce_;
+        nonce_t clt_nonce_;
+        nonce_t srv_nonce_;
         bool bind_done_;
-        int xid_;
+        xid_t xid_;
         int lossytest_;
         bool retrans_;
         bool reachable_;
@@ -72,13 +74,13 @@ class rpcc : private connection_delegate {
         cond destroy_wait_c_;
 
         map<int, caller *> calls_;
-        list<int> xid_rep_window_;
+        list<xid_t> xid_rep_window_;
 
         struct request {
             void clear() { buf.clear(); xid = -1; }
             bool isvalid() { return xid != -1; }
             string buf;
-            int xid = -1;
+            xid_t xid = -1;
         };
         request dup_req_;
         int xid_rep_done_;
@@ -108,7 +110,7 @@ class rpcc : private connection_delegate {
         rpcc(const string & d, bool retrans=true);
         ~rpcc();
 
-        unsigned int id() { return clt_nonce_; }
+        nonce_t id() { return clt_nonce_; }
 
         int bind(milliseconds to = rpc::to_max);
 
@@ -135,6 +137,8 @@ class rpcs : private connection_delegate {
         using proc_id_t = rpc_protocol::proc_id_t;
         template <class S>
         using proc_t = rpc_protocol::proc_t<S>;
+        using nonce_t = rpc_protocol::nonce_t;
+        using xid_t = rpc_protocol::xid_t;
 
         typedef enum {
             NEW,  // new RPC, not a duplicate
@@ -148,36 +152,29 @@ class rpcs : private connection_delegate {
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
         struct reply_t {
-            reply_t (int _xid) : xid(_xid), cb_present(false) {}
-            reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
-            int xid;
+            reply_t (xid_t _xid) : xid(_xid), cb_present(false) {}
+            reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
+            xid_t xid;
             bool cb_present; // whether the reply buffer is valid
             string buf;      // the reply buffer
         };
 
         in_port_t port_;
-        unsigned int nonce_;
+        nonce_t nonce_;
 
         // provide at most once semantics by maintaining a window of replies
         // per client that that client hasn't acknowledged receiving yet.
         // indexed by client nonce.
-        map<unsigned int, list<reply_t>> reply_window_;
+        map<nonce_t, list<reply_t>> reply_window_;
 
         void free_reply_window(void);
-        void add_reply(unsigned int clt_nonce, int xid, const string & b);
-
-        rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
-                int xid, int rep_xid, string & b);
+        void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
 
-        void updatestat(proc_id_t proc);
+        rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+                xid_t rep_xid, string & b);
 
         // latest connection to the client
-        map<unsigned int, shared_ptr<connection>> conns_;
-
-        // counting
-        const size_t counting_;
-        size_t curr_counts_;
-        map<proc_id_t, size_t> counts_;
+        map<nonce_t, shared_ptr<connection>> conns_;
 
         bool reachable_;
 
@@ -185,7 +182,6 @@ class rpcs : private connection_delegate {
         map<proc_id_t, handler *> procs_;
 
         mutex procs_m_; // protect insert/delete to procs[]
-        mutex count_m_;  // protect modification of counts
         mutex reply_window_m_; // protect reply window et al
         mutex conns_m_; // protect conns_
 
@@ -195,21 +191,21 @@ class rpcs : private connection_delegate {
         void reg1(proc_id_t proc, handler *);
 
         unique_ptr<thread_pool> dispatchpool_;
-        unique_ptr<tcpsconn> listener_;
+        unique_ptr<connection_listener> listener_;
 
         // RPC handler for clients binding
-        rpc_protocol::status rpcbind(unsigned int &r, int a);
+        rpc_protocol::status rpcbind(nonce_t &r);
 
         bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
     public:
 
-        rpcs(in_port_t port, size_t counts=0);
+        rpcs(in_port_t port);
         ~rpcs();
 
         void set_reachable(bool r) { reachable_ = r; }
 
-        template<class P, class F, class C=void> void reg(proc_t<P> proc, F f, C *c=nullptr) {
+        template<class P, class F, class C=void> inline void reg(proc_t<P> proc, F f, C *c=nullptr) {
             static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
             struct ReturnOnFailure {
                 static inline int unmarshall_args_failure() {
index 881de9b..4a3ff32 100644 (file)
@@ -8,6 +8,8 @@ namespace rpc_protocol {
 
     using status = int32_t;
     using rpc_sz_t = uint32_t;
+    using nonce_t = uint32_t;
+    using xid_t = int32_t;
 
     enum : status {
         timeout_failure = -1,
@@ -20,17 +22,17 @@ namespace rpc_protocol {
     };
 
     struct request_header {
-        int xid;
+        xid_t xid;
         proc_id_t proc;
-        unsigned int clt_nonce;
-        unsigned int srv_nonce;
-        int xid_rep;
+        nonce_t clt_nonce;
+        nonce_t srv_nonce;
+        xid_t xid_rep;
 
         MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
     };
 
     struct reply_header {
-        int xid;
+        xid_t xid;
         int ret;
 
         MEMBERS(xid, ret)
@@ -44,13 +46,13 @@ namespace rpc_protocol {
 
     const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
     const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
-    const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+    const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
 
-#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ };
-#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
+#define REMOTE_PROCEDURE_BASE(_base_) static constexpr rpc_protocol::proc_id_t base = _base_;
+#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
 
     REMOTE_PROCEDURE_BASE(0);
-    REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind
+    REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind
 };
 
 ENDIAN_SWAPPABLE(rpc_protocol::request_header)
index 1963ada..4dd2af2 100644 (file)
@@ -6,15 +6,16 @@
 #include <arpa/inet.h>
 #include <getopt.h>
 #include <unistd.h>
+#include <string.h>
 
 #define NUM_CL 2
 
 char log_thread_prefix = 'r';
 
-rpcs *server;  // server rpc object
-rpcc *clients[NUM_CL];  // client rpc object
-string dst; //server's ip address
-in_port_t port;
+static rpcs *server;  // server rpc object
+static rpcc *clients[NUM_CL];  // client rpc object
+static string dst; //server's ip address
+static in_port_t port;
 
 // server-side handlers. they must be methods of some class
 // to simplify rpcs::reg(). a server process can have handlers
@@ -60,11 +61,11 @@ int srv::handle_slow(int &r, const int a) {
 }
 
 int srv::handle_bigrep(string &r, const size_t len) {
-    r = string((size_t)len, 'x');
+    r = string(len, 'x');
     return 0;
 }
 
-srv service;
+static srv service;
 
 void startserver() {
     server = new rpcs(port);
@@ -82,13 +83,15 @@ void testmarshall() {
     VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
     int i = 12345;
     unsigned long long l = 1223344455L;
+    size_t sz = 101010101;
     string s = "hallo....";
     m << i;
     m << l;
     m << s;
+    m << sz;
 
     string b = m;
-    VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+    VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)+sizeof(uint32_t));
 
     unmarshall un(b, true);
     rpc_protocol::request_header rh1;
@@ -97,11 +100,13 @@ void testmarshall() {
     int i1;
     unsigned long long l1;
     string s1;
+    size_t sz1;
     un >> i1;
     un >> l1;
     un >> s1;
+    un >> sz1;
     VERIFY(un.okdone());
-    VERIFY(i1==i && l1==l && s1==s);
+    VERIFY(i1==i && l1==l && s1==s && sz1==sz);
 }
 
 void client1(size_t cl) {
@@ -199,6 +204,7 @@ void simple_tests(rpcc *c) {
     // huge RPC
     string big(1000000, 'x');
     intret = c->call(srv_protocol::_22, rep, big, (string)"z");
+    VERIFY(intret == 0);
     VERIFY(rep.size() == 1000001);
     cout << "   -- huge 1M rpc request .. ok" << endl;
 
diff --git a/rsm.cc b/rsm.cc
index 7e90b03..956f45d 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -121,7 +121,7 @@ void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
 }
 
 // The recovery thread runs this function
-void rsm::recovery() [[noreturn]] {
+void rsm::recovery() {
     bool r = true;
     lock ml(rsm_mutex);
 
@@ -356,6 +356,9 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
         }
     }
     execute(procno, req, r);
+    for (size_t i=0; i<r.size(); i++) {
+        LOG(hex << setfill('0') << setw(2) << (unsigned int)(unsigned char)r[i]);
+    }
     last_myvs = vs;
     return rsm_client_protocol::OK;
 }
@@ -467,9 +470,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 }
 
 //
-// RPC handler: Send back all the nodes this local knows about to client
-// so the client can switch to a different primary
-// when it existing primary fails
+// RPC handler: Responds with the list of known nodes for fall-back on a
+// primary failure
 //
 rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
     vector<string> m;
@@ -512,12 +514,9 @@ bool rsm::amiprimary() {
 }
 
 
-// Testing server
-
-// Simulate partitions
+// Test RPCs -- simulate partitions and failures
 
-// assumes caller holds rsm_mutex
-void rsm::net_repair(bool heal, lock &) {
+void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
@@ -534,15 +533,12 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r,
     lock ml(rsm_mutex);
     LOG("heal " << heal << " (dopartition " <<
             dopartition << ", partitioned " << partitioned << ")");
-    if (heal) {
+    if (heal)
         net_repair(heal, ml);
-        partitioned = false;
-    } else {
+    else
         dopartition = true;
-        partitioned = false;
-    }
-    r = rsm_test_protocol::OK;
-    return r;
+    partitioned = false;
+    return r = rsm_test_protocol::OK;
 }
 
 // simulate failure at breakpoint 1 and 2
diff --git a/rsm.h b/rsm.h
index b402bab..14dc011 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -72,7 +72,7 @@ class rsm : public config_view_change {
 
         bool amiprimary();
         void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
-        void recovery();
+        void recovery NORETURN ();
         void commit_change(unsigned vid);
 
         template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
index d64c0af..9cd60bd 100644 (file)
@@ -20,7 +20,7 @@ struct viewstamp {
     LEXICOGRAPHIC_COMPARISON(viewstamp)
 };
 
-MARSHALLABLE(viewstamp)
+MARSHALLABLE_STRUCT(viewstamp)
 
 namespace rsm_protocol {
     enum status : rpc_protocol::status { OK, ERR, BUSY};
@@ -39,7 +39,7 @@ namespace rsm_protocol {
     REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp));
 };
 
-MARSHALLABLE(rsm_protocol::transferres)
+MARSHALLABLE_STRUCT(rsm_protocol::transferres)
 
 namespace rsm_test_protocol {
     enum status : rpc_protocol::status {OK, ERR};
diff --git a/types.h b/types.h
index 6e6f0f6..ede859f 100644 (file)
--- a/types.h
+++ b/types.h
@@ -5,41 +5,40 @@
 
 #include <algorithm>
 using std::copy;
-using std::move;
+using std::count_if;
+using std::find;
 using std::max;
 using std::min;
 using std::min_element;
-using std::find;
-using std::count_if;
+using std::move;
+using std::swap;
 
 #include <condition_variable>
 using cond = std::condition_variable;
 using std::cv_status;
 
 #include <chrono>
-using std::chrono::seconds;
-using std::chrono::milliseconds;
+using std::chrono::duration_cast;
 using std::chrono::microseconds;
+using std::chrono::milliseconds;
 using std::chrono::nanoseconds;
+using std::chrono::seconds;
 using std::chrono::steady_clock;
 using std::chrono::system_clock;
-using std::chrono::duration_cast;
-using std::chrono::time_point_cast;
 using std::chrono::time_point;
+using std::chrono::time_point_cast;
 
 #include <exception>
 using std::exception;
 
 #include <fstream>
-using std::ofstream;
 using std::ifstream;
+using std::ofstream;
 
-#ifndef LIBT4_NO_FUNCTIONAL
 #include <functional>
+// std::bind conflicts with BIND(2)
 using std::function;
-using std::bind;
 using std::placeholders::_1;
-#endif
 
 #include <iomanip>
 #include <iostream>
@@ -217,4 +216,14 @@ struct pass { template <typename... Args> inline pass(Args&&...) {} };
 
 #include "endian.h"
 
+#ifndef __has_attribute
+#define __has_attribute(x) 0
+#endif
+
+#if __has_attribute(noreturn)
+#define NORETURN [[noreturn]]
+#else
+#define NORETURN
+#endif
+
 #endif