More dependency check-ups
authorPeter Iannucci <iannucci@mit.edu>
Mon, 30 Sep 2013 01:02:07 +0000 (21:02 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Mon, 30 Sep 2013 13:05:23 +0000 (09:05 -0400)
26 files changed:
Makefile
endian.h [new file with mode: 0644]
handle.h
lang/verify.h
lock_client.cc
lock_server.cc
lock_tester.cc
log.cc
paxos.cc
paxos.h
rpc/connection.cc
rpc/connection.h
rpc/marshall.cc
rpc/marshall.h
rpc/pollmgr.cc
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm_client.h
rsmtest_client.cc
rsmtest_client.h
threaded_log.h
types.h

index e2a20d9..010818d 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -13,17 +13,13 @@ rpc/librpc.a: rpc/rpc.o rpc/marshall.o rpc/connection.o rpc/pollmgr.o rpc/thr_po
 
 rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
 
 
 rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a
 
-lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o
-lock_demo : $(lock_demo) rpc/librpc.a
+lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
 
 
-lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o
-lock_tester : $(lock_tester) rpc/librpc.a
+lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a
 
 
-lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o
-lock_server : $(lock_server) rpc/librpc.a
+lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a
 
 
-rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o
-rsm_tester: $(rsm_tester) rpc/librpc.a
+rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a
 
 %.o: %.cc
        $(CXX) $(CXXFLAGS) -c $< -o $@
 
 %.o: %.cc
        $(CXX) $(CXXFLAGS) -c $< -o $@
diff --git a/endian.h b/endian.h
new file mode 100644 (file)
index 0000000..7385406
--- /dev/null
+++ b/endian.h
@@ -0,0 +1,41 @@
+#ifndef endian_h
+#define endian_h
+
+#include <cinttypes>
+
+constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
+
+inline uint8_t hton(uint8_t t) { return t; }
+inline int8_t hton(int8_t t) { return t; }
+inline uint16_t hton(uint16_t t) { return htons(t); }
+inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
+inline uint32_t hton(uint32_t t) { return htonl(t); }
+inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
+inline uint64_t hton(uint64_t t) {
+    if (!endianness.is_little_endian)
+        return t;
+    return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
+}
+inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
+
+template <class T> inline T ntoh(T t) { return hton(t); }
+
+template <class... Args, size_t... Indices> inline tuple<typename remove_reference<Args>::type...>
+tuple_hton_imp(tuple<Args...> && t, tuple_indices<Indices...>) {
+    return tuple<typename remove_reference<Args>::type...>(hton(get<Indices>(t))...);
+}
+
+template <class... Args> inline tuple<typename remove_reference<Args>::type...>
+hton(tuple<Args...> && t) {
+    using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+    return tuple_hton_imp(forward<tuple<Args...>>(t), Indices());
+}
+
+#define ENDIAN_SWAPPABLE(_c_) \
+inline _c_ hton(_c_ && t) { \
+    _c_ result; \
+    result._tuple_() = hton(t._tuple_()); \
+    return result; \
+}
+
+#endif
index a513b56..d4b6223 100644 (file)
--- a/handle.h
+++ b/handle.h
@@ -1,5 +1,5 @@
 // manage a cache of RPC connections.
 // manage a cache of RPC connections.
-// assuming cid is a std::string holding the
+// assuming cid is a string holding the
 // host:port of the RPC server you want
 // to talk to:
 //
 // host:port of the RPC server you want
 // to talk to:
 //
index 823a48d..622aaf2 100644 (file)
@@ -1,5 +1,3 @@
-// safe assertions.
-
 #ifndef verify_client_h
 #define verify_client_h
 
 #ifndef verify_client_h
 #define verify_client_h
 
@@ -7,7 +5,7 @@
 #include <cassert>
 
 #ifdef NDEBUG
 #include <cassert>
 
 #ifdef NDEBUG
-#define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
+#define VERIFY(expr) { if (!(expr)) abort(); }
 #else
 #define VERIFY(expr) assert(expr)
 #endif
 #else
 #define VERIFY(expr) assert(expr)
 #endif
index 99dcb5b..de357f1 100644 (file)
@@ -4,7 +4,7 @@
 #include <arpa/inet.h>
 
 void lock_state::wait(lock & mutex_lock) {
 #include <arpa/inet.h>
 
 void lock_state::wait(lock & mutex_lock) {
-    auto self = std::this_thread::get_id();
+    auto self = this_thread::get_id();
     c[self].wait(mutex_lock);
     c.erase(self);
 }
     c[self].wait(mutex_lock);
     c.erase(self);
 }
@@ -80,7 +80,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) {
 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
-    auto self = std::this_thread::get_id();
+    auto self = this_thread::get_id();
 
     // check for reentrancy
     VERIFY(st.state != lock_state::locked || st.held_by != self);
 
     // check for reentrancy
     VERIFY(st.state != lock_state::locked || st.held_by != self);
@@ -145,7 +145,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
-    auto self = std::this_thread::get_id();
+    auto self = this_thread::get_id();
     VERIFY(st.state == lock_state::locked && st.held_by == self);
     st.state = lock_state::free;
     LOG("Lock " << lid << ": free");
     VERIFY(st.state == lock_state::locked && st.held_by == self);
     st.state = lock_state::free;
     LOG("Lock " << lid << ": free");
index d5e85a5..b724140 100644 (file)
@@ -30,8 +30,8 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
 }
 
 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
 }
 
 lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) {
-    std::thread(&lock_server::revoker, this).detach();
-    std::thread(&lock_server::retryer, this).detach();
+    thread(&lock_server::revoker, this).detach();
+    thread(&lock_server::retryer, this).detach();
     rsm->set_state_transfer(this);
 }
 
     rsm->set_state_transfer(this);
 }
 
@@ -167,16 +167,14 @@ int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock
 string lock_server::marshal_state() {
     lock sl(lock_table_lock);
     marshall rep;
 string lock_server::marshal_state() {
     lock sl(lock_table_lock);
     marshall rep;
-    rep << nacquire;
-    rep << lock_table;
-    return rep.str();
+    rep << nacquire << lock_table;
+    return rep.content();
 }
 
 void lock_server::unmarshal_state(string state) {
     lock sl(lock_table_lock);
 }
 
 void lock_server::unmarshal_state(string state) {
     lock sl(lock_table_lock);
-    unmarshall rep(state);
-    rep >> nacquire;
-    rep >> lock_table;
+    unmarshall rep(state, false);
+    rep >> nacquire >> lock_table;
 }
 
 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
 }
 
 lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
index c192128..f8e2196 100644 (file)
@@ -11,7 +11,7 @@ char log_thread_prefix = 'c';
 
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
 
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
-std::string dst;
+string dst;
 lock_client **lc = new lock_client * [nt];
 lock_protocol::lockid_t a = "1";
 lock_protocol::lockid_t b = "2";
 lock_client **lc = new lock_client * [nt];
 lock_protocol::lockid_t a = "1";
 lock_protocol::lockid_t b = "2";
@@ -21,7 +21,7 @@ lock_protocol::lockid_t c = "3";
 // doesn't grant the same lock to both clients.
 // it assumes that lock names are distinct in the first byte.
 int ct[256];
 // doesn't grant the same lock to both clients.
 // it assumes that lock names are distinct in the first byte.
 int ct[256];
-std::mutex count_mutex;
+mutex count_mutex;
 
 void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
 
 void check_grant(lock_protocol::lockid_t lid) {
     lock ml(count_mutex);
@@ -116,7 +116,7 @@ void test5(int i) {
 int
 main(int argc, char *argv[])
 {
 int
 main(int argc, char *argv[])
 {
-    std::thread th[nt];
+    thread th[nt];
     int test = 0;
 
     setvbuf(stdout, NULL, _IONBF, 0);
     int test = 0;
 
     setvbuf(stdout, NULL, _IONBF, 0);
@@ -148,7 +148,7 @@ main(int argc, char *argv[])
     if (!test || test == 2) {
         // test2
         for (int i = 0; i < nt; i++)
     if (!test || test == 2) {
         // test2
         for (int i = 0; i < nt; i++)
-            th[i] = std::thread(test2, i);
+            th[i] = thread(test2, i);
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
@@ -157,7 +157,7 @@ main(int argc, char *argv[])
         LOG_NONMEMBER("test 3");
 
         for (int i = 0; i < nt; i++)
         LOG_NONMEMBER("test 3");
 
         for (int i = 0; i < nt; i++)
-            th[i] = std::thread(test3, i);
+            th[i] = thread(test3, i);
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
@@ -166,7 +166,7 @@ main(int argc, char *argv[])
         LOG_NONMEMBER("test 4");
 
         for (int i = 0; i < 2; i++)
         LOG_NONMEMBER("test 4");
 
         for (int i = 0; i < 2; i++)
-            th[i] = std::thread(test4, i);
+            th[i] = thread(test4, i);
         for (int i = 0; i < 2; i++)
             th[i].join();
     }
         for (int i = 0; i < 2; i++)
             th[i].join();
     }
@@ -175,7 +175,7 @@ main(int argc, char *argv[])
         LOG_NONMEMBER("test 5");
 
         for (int i = 0; i < nt; i++)
         LOG_NONMEMBER("test 5");
 
         for (int i = 0; i < nt; i++)
-            th[i] = std::thread(test5, i);
+            th[i] = thread(test5, i);
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
         for (int i = 0; i < nt; i++)
             th[i].join();
     }
diff --git a/log.cc b/log.cc
index de00d67..3b881fa 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -60,14 +60,14 @@ string log::dump() {
 
 void log::restore(string s) {
     LOG("restore: " << s);
 
 void log::restore(string s) {
     LOG("restore: " << s);
-    ofstream f(name, std::ios::trunc);
+    ofstream f(name, ios::trunc);
     f << s;
     f.close();
 }
 
 // XXX should be an atomic operation
 void log::loginstance(unsigned instance, string v) {
     f << s;
     f.close();
 }
 
 // XXX should be an atomic operation
 void log::loginstance(unsigned instance, string v) {
-    ofstream f(name, std::ios::app);
+    ofstream f(name, ios::app);
     f << "done " << instance << " " << v << "\n";
     f.close();
 }
     f << "done " << instance << " " << v << "\n";
     f.close();
 }
@@ -75,7 +75,7 @@ void log::loginstance(unsigned instance, string v) {
 // an acceptor should call logprop(promise) when it
 // receives a prepare to which it responds prepare_ok().
 void log::logprop(prop_t promise) {
 // an acceptor should call logprop(promise) when it
 // receives a prepare to which it responds prepare_ok().
 void log::logprop(prop_t promise) {
-    ofstream f(name, std::ios::app);
+    ofstream f(name, ios::app);
     f << "propseen " << promise.n << " " << promise.m << "\n";
     f.close();
 }
     f << "propseen " << promise.n << " " << promise.m << "\n";
     f.close();
 }
@@ -83,7 +83,7 @@ void log::logprop(prop_t promise) {
 // an acceptor should call logaccept(accepted, accepted_value) when it
 // receives an accept RPC to which it replies accept_ok().
 void log::logaccept(prop_t n, string v) {
 // an acceptor should call logaccept(accepted, accepted_value) when it
 // receives an accept RPC to which it replies accept_ok().
 void log::logaccept(prop_t n, string v) {
-    ofstream f(name, std::ios::app);
+    ofstream f(name, ios::app);
     f << "accepted " << n.n << " " << n.m << " " << v << "\n";
     f.close();
 }
     f << "accepted " << n.n << " " << n.m << " " << v << "\n";
     f.close();
 }
index f9d5785..b39fa5b 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -46,7 +46,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const
     }
     stable = false;
     bool r = false;
     }
     stable = false;
     bool r = false;
-    proposal.n = std::max(promise.n, proposal.n) + 1;
+    proposal.n = max(promise.n, proposal.n) + 1;
     nodes_t accepts;
     value_t v = newv;
     if (prepare(instance, accepts, cur_nodes, v)) {
     nodes_t accepts;
     value_t v = newv;
     if (prepare(instance, accepts, cur_nodes, v)) {
diff --git a/paxos.h b/paxos.h
index 426dfef..186daab 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -29,7 +29,7 @@ class proposer_acceptor {
         paxos_change *delegate;
         node_t me;
 
         paxos_change *delegate;
         node_t me;
 
-        rpcs pxs = {(uint32_t)std::stoi(me)};
+        rpcs pxs = {(uint32_t)stoi(me)};
 
         bool break1 = false;
         bool break2 = false;
 
         bool break1 = false;
         bool break2 = false;
index 86d4ec5..55e374a 100644 (file)
@@ -1,11 +1,11 @@
 // std::bind and syscall bind have the same name, so don't use std::bind in this file
 #define LIBT4_NO_FUNCTIONAL
 #include "connection.h"
 // std::bind and syscall bind have the same name, so don't use std::bind in this file
 #define LIBT4_NO_FUNCTIONAL
 #include "connection.h"
+#include <cerrno>
+#include <csignal>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <netinet/tcp.h>
-#include <errno.h>
-#include <signal.h>
 #include <unistd.h>
 #include <sys/socket.h>
 
 #include <unistd.h>
 #include <sys/socket.h>
 
@@ -15,8 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
     int flags = fcntl(fd_, F_GETFL, NULL);
 : mgr_(m1), fd_(f1), lossy_(l1)
 {
     int flags = fcntl(fd_, F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(fd_, F_SETFL, flags);
+    fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
 
     signal(SIGPIPE, SIG_IGN);
 
 
     signal(SIGPIPE, SIG_IGN);
 
@@ -27,9 +26,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 
 connection::~connection() {
     VERIFY(dead_);
 
 connection::~connection() {
     VERIFY(dead_);
-    if (rpdu_.buf)
-        free(rpdu_.buf);
-    VERIFY(!wpdu_.buf);
+    VERIFY(!wpdu_.buf.size());
     close(fd_);
 }
 
     close(fd_);
 }
 
@@ -46,12 +43,10 @@ bool connection::isdead() {
 void connection::closeconn() {
     {
         lock ml(m_);
 void connection::closeconn() {
     {
         lock ml(m_);
-        if (!dead_) {
-            dead_ = true;
-            shutdown(fd_,SHUT_RDWR);
-        } else {
+        if (dead_)
             return;
             return;
-        }
+        dead_ = true;
+        shutdown(fd_,SHUT_RDWR);
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
@@ -81,18 +76,18 @@ int connection::compare(connection *another) {
     return 0;
 }
 
     return 0;
 }
 
-bool connection::send(char *b, size_t sz) {
+bool connection::send(const string & b) {
     lock ml(m_);
     lock ml(m_);
+
     waiters_++;
     waiters_++;
-    while (!dead_ && wpdu_.buf) {
+    while (!dead_ && wpdu_.buf.size())
         send_wait_.wait(ml);
         send_wait_.wait(ml);
-    }
     waiters_--;
     waiters_--;
-    if (dead_) {
+
+    if (dead_)
         return false;
         return false;
-    }
+
     wpdu_.buf = b;
     wpdu_.buf = b;
-    wpdu_.sz = sz;
     wpdu_.solong = 0;
 
     if (lossy_) {
     wpdu_.solong = 0;
 
     if (lossy_) {
@@ -107,19 +102,15 @@ bool connection::send(char *b, size_t sz) {
         ml.unlock();
         PollMgr::Instance()->block_remove_fd(fd_);
         ml.lock();
         ml.unlock();
         PollMgr::Instance()->block_remove_fd(fd_);
         ml.lock();
-    } else {
-        if (wpdu_.solong == wpdu_.sz) {
-        } else {
-            //should be rare to need to explicitly add write callback
-            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
-            while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
-                send_complete_.wait(ml);
-            }
-        }
+    } else if (wpdu_.solong != wpdu_.buf.size()) {
+        // should be rare to need to explicitly add write callback
+        PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+        while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
+            send_complete_.wait(ml);
     }
     }
-    bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
-    wpdu_.solong = wpdu_.sz = 0;
-    wpdu_.buf = NULL;
+    bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
+    wpdu_.solong = 0;
+    wpdu_.buf.clear();
     if (waiters_ > 0)
         send_wait_.notify_all();
     return ret;
     if (waiters_ > 0)
         send_wait_.notify_all();
     return ret;
@@ -130,7 +121,7 @@ void connection::write_cb(int s) {
     lock ml(m_);
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     lock ml(m_);
     VERIFY(!dead_);
     VERIFY(fd_ == s);
-    if (wpdu_.sz == 0) {
+    if (wpdu_.buf.size() == 0) {
         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
         return;
     }
         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
         return;
     }
@@ -139,7 +130,7 @@ void connection::write_cb(int s) {
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
-        if (wpdu_.solong < wpdu_.sz) {
+        if (wpdu_.solong < wpdu_.buf.size()) {
             return;
         }
     }
             return;
         }
     }
@@ -154,41 +145,44 @@ void connection::read_cb(int s) {
         return;
     }
 
         return;
     }
 
+    IF_LEVEL(5) LOG("got data on fd " << s);
+
     bool succ = true;
     bool succ = true;
-    if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
+    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
         succ = readpdu();
     }
 
     if (!succ) {
         succ = readpdu();
     }
 
     if (!succ) {
+        IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
 
         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
 
-    if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
-        if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
+    if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
+        if (mgr_->got_pdu(this, rpdu_.buf)) {
             //chanmgr has successfully consumed the pdu
             //chanmgr has successfully consumed the pdu
-            rpdu_.buf = NULL;
-            rpdu_.sz = rpdu_.solong = 0;
+            rpdu_.buf.clear();
+            rpdu_.solong = 0;
         }
     }
 }
 
 bool connection::writepdu() {
     VERIFY(wpdu_.solong != size_t_max);
         }
     }
 }
 
 bool connection::writepdu() {
     VERIFY(wpdu_.solong != size_t_max);
-    if (wpdu_.solong == wpdu_.sz)
+    if (wpdu_.solong == wpdu_.buf.size())
         return true;
 
     if (wpdu_.solong == 0) {
         return true;
 
     if (wpdu_.solong == 0) {
-        uint32_t sz = htonl((uint32_t)wpdu_.sz);
-        bcopy(&sz,wpdu_.buf,sizeof(sz));
+        uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
+        copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
     }
     }
-    ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+    ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
     if (n < 0) {
         if (errno != EAGAIN) {
             IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
     if (n < 0) {
         if (errno != EAGAIN) {
             IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
             wpdu_.solong = size_t_max;
-            wpdu_.sz = 0;
+            wpdu_.buf.clear();
         }
         return (errno == EAGAIN);
     }
         }
         return (errno == EAGAIN);
     }
@@ -197,7 +191,8 @@ bool connection::writepdu() {
 }
 
 bool connection::readpdu() {
 }
 
 bool connection::readpdu() {
-    if (!rpdu_.sz) {
+    IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
+    if (!rpdu_.buf.size()) {
         uint32_t sz1;
         ssize_t n = read(fd_, &sz1, sizeof(sz1));
 
         uint32_t sz1;
         ssize_t n = read(fd_, &sz1, sizeof(sz1));
 
@@ -211,33 +206,34 @@ bool connection::readpdu() {
         }
 
         if (n > 0 && n != sizeof(sz1)) {
         }
 
         if (n > 0 && n != sizeof(sz1)) {
-            IF_LEVEL(0) LOG("connection::readpdu short read of sz");
+            IF_LEVEL(0) LOG("short read of sz");
             return false;
         }
 
         size_t sz = ntohl(sz1);
 
         if (sz > MAX_PDU) {
             return false;
         }
 
         size_t sz = ntohl(sz1);
 
         if (sz > MAX_PDU) {
-            IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
+            IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
             return false;
         }
 
             return false;
         }
 
-        rpdu_.sz = sz;
-        VERIFY(rpdu_.buf == NULL);
-        rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
-        VERIFY(rpdu_.buf);
-        bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+        IF_LEVEL(5) LOG("read size of datagram = " << sz);
+
+        VERIFY(rpdu_.buf.size() == 0);
+        rpdu_.buf = string(sz+sizeof(sz1), 0);
+        copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
         rpdu_.solong = sizeof(sz1);
     }
 
         rpdu_.solong = sizeof(sz1);
     }
 
-    ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+    ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+
+    IF_LEVEL(5) LOG("read " << n << " bytes");
+
     if (n <= 0) {
         if (errno == EAGAIN)
             return true;
     if (n <= 0) {
         if (errno == EAGAIN)
             return true;
-        if (rpdu_.buf)
-            free(rpdu_.buf);
-        rpdu_.buf = NULL;
-        rpdu_.sz = rpdu_.solong = 0;
+        rpdu_.buf.clear();
+        rpdu_.solong = 0;
         return (errno == EAGAIN);
     }
     rpdu_.solong += (size_t)n;
         return (errno == EAGAIN);
     }
     rpdu_.solong += (size_t)n;
@@ -254,7 +250,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
 
     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp_ < 0) {
 
     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
     if (tcp_ < 0) {
-        perror("tcpsconn::tcpsconn accept_loop socket:");
+        perror("accept_loop socket:");
         VERIFY(0);
     }
 
         VERIFY(0);
     }
 
@@ -268,7 +264,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
     }
 
     if (listen(tcp_, 1000) < 0) {
     }
 
     if (listen(tcp_, 1000) < 0) {
-        perror("tcpsconn::tcpsconn listen:");
+        perror("listen:");
         VERIFY(0);
     }
 
         VERIFY(0);
     }
 
@@ -276,7 +272,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
     port_ = ntohs(sin.sin_port);
 
     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
     port_ = ntohs(sin.sin_port);
 
-    IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
+    IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
 
     if (pipe(pipe_) < 0) {
         perror("accept_loop pipe:");
 
     if (pipe(pipe_) < 0) {
         perror("accept_loop pipe:");
index 2a01e46..882c1e0 100644 (file)
@@ -5,7 +5,6 @@
 #include <sys/types.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <sys/types.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
-#include <cstddef>
 #include "pollmgr.h"
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
 #include "pollmgr.h"
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
@@ -16,18 +15,15 @@ class connection;
 
 class chanmgr {
     public:
 
 class chanmgr {
     public:
-        virtual bool got_pdu(connection *c, char *b, size_t sz) = 0;
+        virtual bool got_pdu(connection *c, const string & b) = 0;
         virtual ~chanmgr() {}
 };
 
 class connection : public aio_callback {
     public:
         struct charbuf {
         virtual ~chanmgr() {}
 };
 
 class connection : public aio_callback {
     public:
         struct charbuf {
-            charbuf(): buf(NULL), sz(0), solong(0) {}
-            charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){}
-            char *buf;
-            size_t sz;
-            size_t solong; // number of bytes written or read so far
+            string buf;
+            size_t solong = 0; // number of bytes written or read so far
         };
 
         connection(chanmgr *m1, int f1, int lossytest=0);
         };
 
         connection(chanmgr *m1, int f1, int lossytest=0);
@@ -37,7 +33,7 @@ class connection : public aio_callback {
         bool isdead();
         void closeconn();
 
         bool isdead();
         void closeconn();
 
-        bool send(char *b, size_t sz);
+        bool send(const string & b);
         void write_cb(int s);
         void read_cb(int s);
 
         void write_cb(int s);
         void read_cb(int s);
 
@@ -46,6 +42,7 @@ class connection : public aio_callback {
         int ref() { lock rl(ref_m_); return refno_; }
 
         int compare(connection *another);
         int ref() { lock rl(ref_m_); return refno_; }
 
         int compare(connection *another);
+
     private:
 
         bool readpdu();
     private:
 
         bool readpdu();
@@ -62,7 +59,7 @@ class connection : public aio_callback {
 
         int waiters_ = 0;
         int refno_ = 1;
 
         int waiters_ = 0;
         int refno_ = 1;
-        const int lossy_;
+        int lossy_ = 0;
 
         mutex m_;
         mutex ref_m_;
 
         mutex m_;
         mutex ref_m_;
index 5c2b10c..b8371cf 100644 (file)
 #include "types.h"
 #include "marshall.h"
 
 #include "types.h"
 #include "marshall.h"
 
-marshall &
-operator<<(marshall &m, uint8_t x) {
-    m.rawbyte(x);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, uint16_t x) {
-    x = hton(x);
-    m.rawbytes((char *)&x, 2);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, uint32_t x) {
-    x = hton(x);
-    m.rawbytes((char *)&x, 4);
-    return m;
-}
-
-marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
-marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
-marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
-marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
-
-marshall &
-operator<<(marshall &m, const string &s) {
-    m << (unsigned int) s.size();
+MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(int8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint16_t)
+MARSHALL_RAW_NETWORK_ORDER(int16_t)
+MARSHALL_RAW_NETWORK_ORDER(uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(int32_t)
+MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(int64_t)
+
+marshall & operator<<(marshall &m, const string &s) {
+    m << (uint32_t)s.size();
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
-void marshall::pack_req_header(const request_header &h) {
-    size_t saved_sz = index_;
-    //leave the first 4-byte empty for channel to fill size of pdu
-    index_ = sizeof(rpc_sz_t);
-    *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
-    index_ = saved_sz;
-}
-
-void marshall::pack_reply_header(const reply_header &h) {
-    size_t saved_sz = index_;
-    //leave the first 4-byte empty for channel to fill size of pdu
-    index_ = sizeof(rpc_sz_t);
-    *this << h.xid << h.ret;
-    index_ = saved_sz;
-}
-
-// take the contents from another unmarshall object
-void
-unmarshall::take_in(unmarshall &another)
-{
-    if(buf_)
-        free(buf_);
-    another.take_buf(&buf_, &sz_);
-    index_ = RPC_HEADER_SZ;
-    ok_ = sz_ >= RPC_HEADER_SZ?true:false;
-}
-
-inline bool
-unmarshall::ensure(size_t n) {
-    if (index_+n > sz_)
-        ok_ = false;
-    return ok_;
-}
-
-inline uint8_t
-unmarshall::rawbyte()
-{
-    if (!ensure(1))
-        return 0;
-    return (uint8_t)buf_[index_++];
-}
-
-void
-unmarshall::rawbytes(string &ss, size_t n)
-{
-    VERIFY(ensure(n));
-    ss.assign(buf_+index_, n);
-    index_ += n;
-}
-
-template <class T>
-void
-unmarshall::rawbytes(T &t)
-{
-    const size_t n = sizeof(T);
-    VERIFY(ensure(n));
-    memcpy(&t, buf_+index_, n);
-    t = ntoh(t);
-    index_ += n;
-}
-
-unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
-unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
-unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
-unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
-
 unmarshall & operator>>(unmarshall &u, string &s) {
 unmarshall & operator>>(unmarshall &u, string &s) {
-    unsigned sz = u.grab<unsigned>();
-    if(u.ok())
-        u.rawbytes(s, sz);
+    uint32_t sz = u.grab<uint32_t>();
+    if (u.ok()) {
+        s.resize(sz);
+        u.rawbytes(&s[0], sz);
+    }
     return u;
 }
     return u;
 }
index 98856e4..d7f1dff 100644 (file)
 #define marshall_h
 
 #include "types.h"
 #define marshall_h
 
 #include "types.h"
-#include <cstring>
-#include <cstddef>
-#include <cinttypes>
+
+// for structs or classes containing a MEMBERS declaration
+class marshall;
+class unmarshall;
+#define FORWARD_MARSHALLABLE(_c_) \
+extern unmarshall & operator>>(unmarshall &u, typename remove_reference<_c_>::type &a); \
+extern marshall & operator<<(marshall &m, const _c_ a);
+#define MARSHALLABLE(_c_) \
+inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
+
+// for plain old data
+#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
+marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
+unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+
+#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
 
 using proc_t = uint32_t;
 using status_t = int32_t;
 
 struct request_header {
 
 using proc_t = uint32_t;
 using status_t = int32_t;
 
 struct request_header {
-    request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) :
-        xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
     int xid;
     proc_t proc;
     unsigned int clt_nonce;
     unsigned int srv_nonce;
     int xid_rep;
     int xid;
     proc_t proc;
     unsigned int clt_nonce;
     unsigned int srv_nonce;
     int xid_rep;
+
+    MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
 };
 
 };
 
+FORWARD_MARSHALLABLE(request_header)
+ENDIAN_SWAPPABLE(request_header)
+
 struct reply_header {
 struct reply_header {
-    reply_header(int x=0, int r=0): xid(x), ret(r) {}
     int xid;
     int ret;
     int xid;
     int ret;
-};
-
-template<class T> inline T hton(T t);
-
-constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
 
 
-template<> inline uint8_t hton(uint8_t t) { return t; }
-template<> inline int8_t hton(int8_t t) { return t; }
-template<> inline uint16_t hton(uint16_t t) { return htons(t); }
-template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); }
-template<> inline uint32_t hton(uint32_t t) { return htonl(t); }
-template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); }
-template<> inline uint64_t hton(uint64_t t) {
-    if (!endianness.is_little_endian)
-        return t;
-    return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32);
-}
-template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); }
-template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; }
-template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; }
+    MEMBERS(xid, ret)
+};
 
 
-template <class T> inline T ntoh(T t) { return hton(t); }
+FORWARD_MARSHALLABLE(reply_header)
+ENDIAN_SWAPPABLE(reply_header)
 
 typedef int rpc_sz_t;
 
 
 typedef int rpc_sz_t;
 
-//size of initial buffer allocation
-#define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
+const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
 
 
+// Template parameter pack expansion is not allowed in certain contexts, but
+// brace initializers (for instance, calls to constructors of empty structs)
+// are fair game.  
 struct pass { template <typename... Args> inline pass(Args&&...) {} };
 
 class marshall {
     private:
 struct pass { template <typename... Args> inline pass(Args&&...) {} };
 
 class marshall {
     private:
-        char *buf_;     // Base of the raw bytes buffer (dynamically readjusted)
-        size_t capacity_;  // Capacity of the buffer
-        size_t index_;     // Read/write head position
+        string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer
+        size_t index_ = RPC_HEADER_SZ; // Read/write head position
 
         inline void reserve(size_t n) {
 
         inline void reserve(size_t n) {
-            if((index_+n) > capacity_){
-                capacity_ += max(capacity_, n);
-                VERIFY (buf_ != NULL);
-                buf_ = (char *)realloc(buf_, capacity_);
-                VERIFY(buf_);
-            }
+            if (index_+n > buf_.size())
+                buf_.resize(index_+n);
         }
     public:
         template <typename... Args>
         marshall(const Args&... args) {
         }
     public:
         template <typename... Args>
         marshall(const Args&... args) {
-            buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
-            VERIFY(buf_);
-            capacity_ = DEFAULT_RPC_SZ;
-            index_ = RPC_HEADER_SZ;
             (void)pass{(*this << args)...};
         }
 
             (void)pass{(*this << args)...};
         }
 
-        ~marshall() {
-            if (buf_)
-                free(buf_);
-        }
-
-        size_t size() { return index_;}
-        char *cstr() { return buf_;}
-        const char *cstr() const { return buf_;}
-
-        void rawbyte(uint8_t x) {
-            reserve(1);
-            buf_[index_++] = (int8_t)x;
-        }
-
-        void rawbytes(const char *p, size_t n) {
+        void rawbytes(const void *p, size_t n) {
             reserve(n);
             reserve(n);
-            memcpy(buf_+index_, p, n);
+            copy((char *)p, (char *)p+n, &buf_[index_]);
             index_ += n;
         }
 
             index_ += n;
         }
 
-        // Return the current content (excluding header) as a string
-        string get_content() {
-            return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
-        }
-
-        // Return the current content (excluding header) as a string
-        string str() {
-            return get_content();
-        }
-
-        void pack_req_header(const request_header &h);
-        void pack_reply_header(const reply_header &h);
-
-        void take_buf(char **b, size_t *s) {
-            *b = buf_;
-            *s = index_;
-            buf_ = NULL;
-            index_ = 0;
-            return;
+        // with header
+        operator string () const { return buf_.substr(0,index_); }
+        // without header
+        string content() { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
+
+        template <class T>
+        void pack_header(const T &h) {
+            VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
+            size_t saved_sz = index_;
+            index_ = sizeof(rpc_sz_t); // first 4 bytes hold length field
+            *this << h;
+            index_ = saved_sz;
         }
 };
 
         }
 };
 
-marshall& operator<<(marshall &, bool);
-marshall& operator<<(marshall &, uint32_t);
-marshall& operator<<(marshall &, int32_t);
-marshall& operator<<(marshall &, uint8_t);
-marshall& operator<<(marshall &, int8_t);
-marshall& operator<<(marshall &, uint16_t);
-marshall& operator<<(marshall &, int16_t);
-marshall& operator<<(marshall &, uint64_t);
-marshall& operator<<(marshall &, const string &);
+FORWARD_MARSHALLABLE(bool);
+FORWARD_MARSHALLABLE(uint8_t);
+FORWARD_MARSHALLABLE(int8_t);
+FORWARD_MARSHALLABLE(uint16_t);
+FORWARD_MARSHALLABLE(int16_t);
+FORWARD_MARSHALLABLE(uint32_t);
+FORWARD_MARSHALLABLE(int32_t);
+FORWARD_MARSHALLABLE(size_t);
+FORWARD_MARSHALLABLE(uint64_t);
+FORWARD_MARSHALLABLE(int64_t);
+FORWARD_MARSHALLABLE(string &);
 
 template <class A> typename enable_if<is_iterable<A>::value, marshall>::type &
 operator<<(marshall &m, const A &x) {
 
 template <class A> typename enable_if<is_iterable<A>::value, marshall>::type &
 operator<<(marshall &m, const A &x) {
-    m << (unsigned int) x.size();
+    m << (unsigned int)x.size();
     for (const auto &a : x)
         m << a;
     return m;
     for (const auto &a : x)
         m << a;
     return m;
@@ -153,92 +127,48 @@ operator<<(marshall &m, E e) {
     return m << from_enum(e);
 }
 
     return m << from_enum(e);
 }
 
-class unmarshall;
-
-unmarshall& operator>>(unmarshall &, bool &);
-unmarshall& operator>>(unmarshall &, uint8_t &);
-unmarshall& operator>>(unmarshall &, int8_t &);
-unmarshall& operator>>(unmarshall &, uint16_t &);
-unmarshall& operator>>(unmarshall &, int16_t &);
-unmarshall& operator>>(unmarshall &, uint32_t &);
-unmarshall& operator>>(unmarshall &, int32_t &);
-unmarshall& operator>>(unmarshall &, size_t &);
-unmarshall& operator>>(unmarshall &, uint64_t &);
-unmarshall& operator>>(unmarshall &, int64_t &);
-unmarshall& operator>>(unmarshall &, string &);
 template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
 operator>>(unmarshall &u, E &e);
 
 class unmarshall {
     private:
 template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
 operator>>(unmarshall &u, E &e);
 
 class unmarshall {
     private:
-        char *buf_;
-        size_t sz_;
-        size_t index_;
-        bool ok_;
-
-        inline bool ensure(size_t n);
-    public:
-        unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
-        unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {}
-        unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
-        {
-            //take the content which does not exclude a RPC header from a string
-            take_content(s);
-        }
-        ~unmarshall() {
-            if (buf_) free(buf_);
+        string buf_;
+        size_t index_ = 0;
+        bool ok_ = false;
+
+        inline bool ensure(size_t n) {
+            if (index_+n > buf_.size())
+                ok_ = false;
+            return ok_;
         }
         }
-
-        //take contents from another unmarshall object
-        void take_in(unmarshall &another);
-
-        //take the content which does not exclude a RPC header from a string
-        void take_content(const string &s) {
-            sz_ = s.size()+RPC_HEADER_SZ;
-            buf_ = (char *)realloc(buf_,sz_);
-            VERIFY(buf_);
-            index_ = RPC_HEADER_SZ;
-            memcpy(buf_+index_, s.data(), s.size());
-            ok_ = true;
+    public:
+        unmarshall() {}
+        unmarshall(const string &s, bool has_header)
+            : buf_(s),index_(RPC_HEADER_SZ) {
+            if (!has_header)
+                buf_.insert(0, RPC_HEADER_SZ, 0);
+            ok_ = (buf_.size() >= RPC_HEADER_SZ);
         }
 
         bool ok() const { return ok_; }
         }
 
         bool ok() const { return ok_; }
-        char *cstr() { return buf_;}
-        bool okdone() const { return ok_ && index_ == sz_; }
-
-        uint8_t rawbyte();
-        void rawbytes(string &s, size_t n);
-        template <class T> void rawbytes(T &t);
-
-        size_t ind() { return index_;}
-        size_t size() { return sz_;}
-        void take_buf(char **b, size_t *sz) {
-            *b = buf_;
-            *sz = sz_;
-            sz_ = index_ = 0;
-            buf_ = NULL;
-        }
+        bool okdone() const { return ok_ && index_ == buf_.size(); }
 
 
-        void unpack_req_header(request_header *h) {
-            //the first 4-byte is for channel to fill size of pdu
-            index_ = sizeof(rpc_sz_t);
-            *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep;
-            index_ = RPC_HEADER_SZ;
+        void rawbytes(void * t, size_t n) {
+            VERIFY(ensure(n));
+            copy(&buf_[index_], &buf_[index_+n], (char *)t);
+            index_ += n;
         }
 
         }
 
-        void unpack_reply_header(reply_header *h) {
-            //the first 4-byte is for channel to fill size of pdu
+        template <class T>
+        void unpack_header(T & h) {
+            // first 4 bytes hold length field
+            VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
             index_ = sizeof(rpc_sz_t);
             index_ = sizeof(rpc_sz_t);
-            *this >> h->xid >> h->ret;
+            *this >> h;
             index_ = RPC_HEADER_SZ;
         }
 
             index_ = RPC_HEADER_SZ;
         }
 
-        template <class A>
-        inline A grab() {
-            A a;
-            *this >> a;
-            return a;
-        }
+        template <class T> inline T grab() { T t; *this >> t; return t; }
 };
 
 template <class A> typename enable_if<is_iterable<A>::value, unmarshall>::type &
 };
 
 template <class A> typename enable_if<is_iterable<A>::value, unmarshall>::type &
@@ -279,7 +209,7 @@ typedef function<int(unmarshall &, marshall &)> handler;
 // PAI 2013/09/19
 // C++11 does neither of these two things for us:
 // 1) Declare variables using a parameter pack expansion, like so
 // PAI 2013/09/19
 // C++11 does neither of these two things for us:
 // 1) Declare variables using a parameter pack expansion, like so
-//      Args ...args;
+//      Args... args;
 // 2) Call a function with a tuple of the arguments it expects
 //
 // We implement an 'invoke' function for functions of the RPC handler
 // 2) Call a function with a tuple of the arguments it expects
 //
 // We implement an 'invoke' function for functions of the RPC handler
@@ -293,21 +223,6 @@ typedef function<int(unmarshall &, marshall &)> handler;
 // 'invoke' as a parameter which will be ignored, but its type will force the
 // compiler to specialize 'invoke' appropriately.
 
 // 'invoke' as a parameter which will be ignored, but its type will force the
 // compiler to specialize 'invoke' appropriately.
 
-// The following implementation of tuple_indices is redistributed under the MIT
-// License as an insubstantial portion of the LLVM compiler infrastructure.
-
-template <size_t...> struct tuple_indices {};
-template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
-template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
-    typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
-};
-template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
-    typedef tuple_indices<Indices...> type;
-};
-template <size_t E, size_t S=0> struct make_tuple_indices {
-    typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
-};
-
 // This class encapsulates the default response to runtime unmarshalling
 // failures.  The templated wrappers below may optionally use a different
 // class.
 // This class encapsulates the default response to runtime unmarshalling
 // failures.  The templated wrappers below may optionally use a different
 // class.
@@ -324,7 +239,7 @@ struct VerifyOnFailure {
 
 // One for function pointers...
 
 
 // One for function pointers...
 
-template <class F, class R, class RV, class args_type, size_t ...Indices>
+template <class F, class R, class RV, class args_type, size_t... Indices>
 typename enable_if<!is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
     return f(r, move(get<Indices>(t))...);
 typename enable_if<!is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
     return f(r, move(get<Indices>(t))...);
@@ -332,7 +247,7 @@ invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
 
 // And one for pointers to member functions...
 
 
 // And one for pointers to member functions...
 
-template <class F, class C, class RV, class R, class args_type, size_t ...Indices>
+template <class F, class C, class RV, class R, class args_type, size_t... Indices>
 typename enable_if<is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
     return (c->*f)(r, move(get<Indices>(t))...);
 typename enable_if<is_member_function_pointer<F>::value, RV>::type
 invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
     return (c->*f)(r, move(get<Indices>(t))...);
@@ -406,7 +321,7 @@ template <class F, class ErrorHandler, class Signature>
 struct marshalled_func<F, ErrorHandler, function<Signature>> :
     public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
 
 struct marshalled_func<F, ErrorHandler, function<Signature>> :
     public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
 
-template <class ...Args, size_t ...Indices> unmarshall &
+template <class... Args, size_t... Indices> unmarshall &
 tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
     (void)pass{(u >> get<Indices>(t))...};
     return u;
 tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
     (void)pass{(u >> get<Indices>(t))...};
     return u;
@@ -418,7 +333,7 @@ operator>>(unmarshall & u, tuple<Args &...> && t) {
     return tuple_unmarshall_imp(u, t, Indices());
 }
 
     return tuple_unmarshall_imp(u, t, Indices());
 }
 
-template <class ...Args, size_t ...Indices> marshall &
+template <class... Args, size_t... Indices> marshall &
 tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>) {
     (void)pass{(m << get<Indices>(t))...};
     return m;
 tuple_marshall_imp(marshall & m, tuple<Args...> & t, tuple_indices<Indices...>) {
     (void)pass{(m << get<Indices>(t))...};
     return m;
@@ -430,9 +345,7 @@ operator<<(marshall & m, tuple<Args...> && t) {
     return tuple_marshall_imp(m, t, Indices());
 }
 
     return tuple_marshall_imp(m, t, Indices());
 }
 
-// for structs or classes containing a MEMBERS declaration
-#define MARSHALLABLE(_c_) \
-inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
-inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); }
+MARSHALLABLE(request_header)
+MARSHALLABLE(reply_header)
 
 #endif
 
 #endif
index 15fba26..4254b4f 100644 (file)
@@ -6,7 +6,7 @@
 #include "pollmgr.h"
 
 PollMgr *PollMgr::instance = NULL;
 #include "pollmgr.h"
 
 PollMgr *PollMgr::instance = NULL;
-static std::once_flag pollmgr_is_initialized;
+static once_flag pollmgr_is_initialized;
 
 static void
 PollMgrInit()
 
 static void
 PollMgrInit()
@@ -17,7 +17,7 @@ PollMgrInit()
 PollMgr *
 PollMgr::Instance()
 {
 PollMgr *
 PollMgr::Instance()
 {
-    std::call_once(pollmgr_is_initialized, PollMgrInit);
+    call_once(pollmgr_is_initialized, PollMgrInit);
     return instance;
 }
 
     return instance;
 }
 
@@ -27,7 +27,7 @@ PollMgr::PollMgr() : pending_change_(false)
     aio_ = new SelectAIO();
     //aio_ = new EPollAIO();
 
     aio_ = new SelectAIO();
     //aio_ = new EPollAIO();
 
-    th_ = std::thread(&PollMgr::wait_loop, this);
+    th_ = thread(&PollMgr::wait_loop, this);
 }
 
 PollMgr::~PollMgr() [[noreturn]]
 }
 
 PollMgr::~PollMgr() [[noreturn]]
@@ -84,8 +84,8 @@ void
 PollMgr::wait_loop() [[noreturn]]
 {
 
 PollMgr::wait_loop() [[noreturn]]
 {
 
-    std::vector<int> readable;
-    std::vector<int> writable;
+    vector<int> readable;
+    vector<int> writable;
 
     while (1) {
         {
 
     while (1) {
         {
@@ -206,7 +206,7 @@ SelectAIO::unwatch_fd(int fd, poll_flag flag)
 }
 
 void
 }
 
 void
-SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
 {
     fd_set trfds, twfds;
     int high;
 {
     fd_set trfds, twfds;
     int high;
@@ -334,7 +334,7 @@ EPollAIO::is_watched(int fd, poll_flag flag)
 }
 
 void
 }
 
 void
-EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
+EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
 {
     int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
     for (int i = 0; i < nfds; i++) {
 {
     int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
     for (int i = 0; i < nfds; i++) {
index 90d9608..62003dd 100644 (file)
@@ -71,6 +71,8 @@ inline void set_rand_seed() {
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
+static sockaddr_in make_sockaddr(const string &hostandport);
+
 rpcc::rpcc(const string & d, bool retrans) :
     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
 rpcc::rpcc(const string & d, bool retrans) :
     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
@@ -86,20 +88,19 @@ rpcc::rpcc(const string & d, bool retrans) :
     }
 
     char *loss_env = getenv("RPC_LOSSY");
     }
 
     char *loss_env = getenv("RPC_LOSSY");
-    if(loss_env != NULL){
+    if(loss_env)
         lossytest_ = atoi(loss_env);
         lossytest_ = atoi(loss_env);
-    }
 
     // xid starts with 1 and latest received reply starts with 0
     xid_rep_window_.push_back(0);
 
 
     // xid starts with 1 and latest received reply starts with 0
     xid_rep_window_.push_back(0);
 
-    IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
+    IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
 }
 
 // IMPORTANT: destruction should happen only when no external threads
 // are blocked inside rpcc or will use rpcc in the future
 rpcc::~rpcc() {
-    IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
+    IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
     if(chan_){
         chan_->closeconn();
         chan_->decref();
     if(chan_){
         chan_->closeconn();
         chan_->decref();
@@ -115,7 +116,7 @@ int rpcc::bind(TO to) {
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
         bind_done_ = true;
         srv_nonce_ = r;
     } else {
-        IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
+        IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
     }
     return ret;
 };
     }
     return ret;
 };
@@ -123,11 +124,11 @@ int rpcc::bind(TO to) {
 // Cancel all outstanding calls
 void rpcc::cancel(void) {
     lock ml(m_);
 // Cancel all outstanding calls
 void rpcc::cancel(void) {
     lock ml(m_);
-    LOG("rpcc::cancel: force callers to fail");
+    LOG("force callers to fail");
     for(auto &p : calls_){
         caller *ca = p.second;
 
     for(auto &p : calls_){
         caller *ca = p.second;
 
-        IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
+        IF_LEVEL(2) LOG("force caller to fail");
         {
             lock cl(ca->m);
             ca->done = true;
         {
             lock cl(ca->m);
             ca->done = true;
@@ -140,10 +141,10 @@ void rpcc::cancel(void) {
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
         destroy_wait_ = true;
         destroy_wait_c_.wait(ml);
     }
-    LOG("rpcc::cancel: done");
+    LOG("done");
 }
 
 }
 
-int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
 
     caller ca(0, &rep);
     int xid_rep;
 
     caller ca(0, &rep);
     int xid_rep;
@@ -152,7 +153,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
 
         if((proc != rpc_const::bind && !bind_done_) ||
                 (proc == rpc_const::bind && bind_done_)){
 
         if((proc != rpc_const::bind && !bind_done_) ||
                 (proc == rpc_const::bind && bind_done_)){
-            IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
+            IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
             return rpc_const::bind_failure;
         }
 
             return rpc_const::bind_failure;
         }
 
@@ -163,7 +164,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
         xid_rep = xid_rep_window_.front();
     }
 
         xid_rep = xid_rep_window_.front();
     }
 
@@ -190,11 +191,11 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
                         }
                     }
                     if (forgot.isvalid())
                         }
                     }
                     if (forgot.isvalid())
-                        ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
-                    ch->send(req.cstr(), req.size());
+                        ch->send(forgot.buf);
+                    ch->send(req);
                 }
                 else IF_LEVEL(1) LOG("not reachable");
                 }
                 else IF_LEVEL(1) LOG("not reachable");
-                IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
+                IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
             }
             transmit = false; // only send once on a given channel
                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
             }
             transmit = false; // only send once on a given channel
@@ -212,14 +213,14 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
         {
             lock cal(ca.m);
             while (!ca.done){
         {
             lock cal(ca.m);
             while (!ca.done){
-                IF_LEVEL(2) LOG("rpcc:call1: wait");
+                IF_LEVEL(2) LOG("wait");
                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
-                    IF_LEVEL(2) LOG("rpcc::call1: timeout");
+                    IF_LEVEL(2) LOG("timeout");
                     break;
                 }
             }
             if(ca.done){
                     break;
                 }
             }
             if(ca.done){
-                IF_LEVEL(2) LOG("rpcc::call1: reply received");
+                IF_LEVEL(2) LOG("reply received");
                 break;
             }
         }
                 break;
             }
         }
@@ -250,7 +251,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
     {
         lock ml(m_);
         if (!dup_req_.isvalid()) {
     {
         lock ml(m_);
         if (!dup_req_.isvalid()) {
-            dup_req_.buf.assign(req.cstr(), req.size());
+            dup_req_.buf = req;
             dup_req_.xid = ca.xid;
         }
         if (xid_rep > xid_rep_done_)
             dup_req_.xid = ca.xid;
         }
         if (xid_rep > xid_rep_done_)
@@ -259,7 +260,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
 
     lock cal(ca.m);
 
 
     lock cal(ca.m);
 
-    IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
+    IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
                     ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
                     ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
 
@@ -294,14 +295,14 @@ rpcc::get_refconn(connection **ch)
 //
 // this function keeps no reference for connection *c
 bool
 //
 // this function keeps no reference for connection *c
 bool
-rpcc::got_pdu(connection *, char *b, size_t sz)
+rpcc::got_pdu(connection *, const string & b)
 {
 {
-    unmarshall rep(b, sz);
+    unmarshall rep(b, true);
     reply_header h;
     reply_header h;
-    rep.unpack_reply_header(&h);
+    rep.unpack_header(h);
 
     if(!rep.ok()){
 
     if(!rep.ok()){
-        IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
+        IF_LEVEL(1) LOG("unmarshall header failed!!!");
         return true;
     }
 
         return true;
     }
 
@@ -310,17 +311,17 @@ rpcc::got_pdu(connection *, char *b, size_t sz)
     update_xid_rep(h.xid);
 
     if(calls_.find(h.xid) == calls_.end()){
     update_xid_rep(h.xid);
 
     if(calls_.find(h.xid) == calls_.end()){
-        IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
+        IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
         return true;
     }
     caller *ca = calls_[h.xid];
 
     lock cl(ca->m);
     if(!ca->done){
         return true;
     }
     caller *ca = calls_[h.xid];
 
     lock cl(ca->m);
     if(!ca->done){
-        ca->un->take_in(rep);
+        *ca->rep = b;
         ca->intret = h.ret;
         if(ca->intret < 0){
         ca->intret = h.ret;
         if(ca->intret < 0){
-            IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
+            IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
         }
         ca->done = 1;
     }
         }
         ca->done = 1;
     }
@@ -353,21 +354,17 @@ compress:
 }
 
 rpcs::rpcs(unsigned int p1, size_t count)
 }
 
 rpcs::rpcs(unsigned int p1, size_t count)
-  : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
+  : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
 {
     set_rand_seed();
     nonce_ = (unsigned int)random();
 {
     set_rand_seed();
     nonce_ = (unsigned int)random();
-    IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
-
-    char *loss_env = getenv("RPC_LOSSY");
-    if(loss_env != NULL){
-        lossytest_ = atoi(loss_env);
-    }
+    IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
     reg(rpc_const::bind, &rpcs::rpcbind, this);
 
     reg(rpc_const::bind, &rpcs::rpcbind, this);
-    dispatchpool_ = new ThrPool(6,false);
+    dispatchpool_ = new ThrPool(6, false);
 
 
-    listener_ = new tcpsconn(this, port_, lossytest_);
+    char *loss_env = getenv("RPC_LOSSY");
+    listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
 }
 
 rpcs::~rpcs()
 }
 
 rpcs::~rpcs()
@@ -379,14 +376,14 @@ rpcs::~rpcs()
 }
 
 bool
 }
 
 bool
-rpcs::got_pdu(connection *c, char *b, size_t sz)
+rpcs::got_pdu(connection *c, const string & b)
 {
 {
-        if(!reachable_){
-            IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
-            return true;
-        }
+    if(!reachable_){
+        IF_LEVEL(1) LOG("not reachable");
+        return true;
+    }
 
 
-    djob_t *j = new djob_t(c, b, sz);
+    djob_t *j = new djob_t{c, b};
     c->incref();
     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
     if(!succ || !reachable_){
     c->incref();
     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
     if(!succ || !reachable_){
@@ -434,32 +431,32 @@ void
 rpcs::dispatch(djob_t *j)
 {
     connection *c = j->conn;
 rpcs::dispatch(djob_t *j)
 {
     connection *c = j->conn;
-    unmarshall req(j->buf, j->sz);
+    unmarshall req(j->buf, true);
     delete j;
 
     request_header h;
     delete j;
 
     request_header h;
-    req.unpack_req_header(&h);
+    req.unpack_header(h);
     proc_t proc = h.proc;
 
     if(!req.ok()){
     proc_t proc = h.proc;
 
     if(!req.ok()){
-        IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
+        IF_LEVEL(1) LOG("unmarshall header failed!!!");
         c->decref();
         return;
     }
 
         c->decref();
         return;
     }
 
-    IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
+    IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
 
     marshall rep;
-    reply_header rh(h.xid,0);
+    reply_header rh{h.xid,0};
 
     // is client sending to an old instance of server?
     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
 
     // is client sending to an old instance of server?
     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
-        IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
+        IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
                         " (current " << nonce_ << ") proc " << hex << h.proc);
         rh.ret = rpc_const::oldsrv_failure;
                         " (current " << nonce_ << ") proc " << hex << h.proc);
         rh.ret = rpc_const::oldsrv_failure;
-        rep.pack_reply_header(rh);
-        c->send(rep.cstr(),rep.size());
+        rep.pack_header(rh);
+        c->send(rep);
         return;
     }
 
         return;
     }
 
@@ -468,7 +465,7 @@ rpcs::dispatch(djob_t *j)
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
     {
         lock pl(procs_m_);
         if(procs_.count(proc) < 1){
-            cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
+            cerr << "unknown proc " << hex << proc << "." << endl;
             c->decref();
             VERIFY(0);
             return;
             c->decref();
             VERIFY(0);
             return;
@@ -478,8 +475,7 @@ rpcs::dispatch(djob_t *j)
     }
 
     rpcs::rpcstate_t stat;
     }
 
     rpcs::rpcstate_t stat;
-    char *b1 = nullptr;
-    size_t sz1 = 0;
+    string b1;
 
     if(h.clt_nonce){
         // have i seen this client before?
 
     if(h.clt_nonce){
         // have i seen this client before?
@@ -489,7 +485,7 @@ rpcs::dispatch(djob_t *j)
             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-                IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
+                IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
             }
         }
                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
             }
         }
@@ -507,37 +503,36 @@ rpcs::dispatch(djob_t *j)
             }
         }
 
             }
         }
 
-        stat = checkduplicate_and_update(h.clt_nonce, h.xid,
-                                                 h.xid_rep, &b1, &sz1);
+        stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
     } else {
         // this client does not require at most once logic
         stat = NEW;
     }
 
     } else {
         // this client does not require at most once logic
         stat = NEW;
     }
 
-    switch (stat){
+    switch (stat) {
         case NEW: // new request
         case NEW: // new request
-            if(counting_){
+            if (counting_){
                 updatestat(proc);
             }
 
             rh.ret = (*f)(req, rep);
             if (rh.ret == rpc_const::unmarshal_args_failure) {
                 updatestat(proc);
             }
 
             rh.ret = (*f)(req, rep);
             if (rh.ret == rpc_const::unmarshal_args_failure) {
-                cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
+                cerr << "failed to unmarshall the arguments. You are " <<
                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
                         "types of arguments." << endl;
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
 
                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
                         "types of arguments." << endl;
                 VERIFY(0);
             }
             VERIFY(rh.ret >= 0);
 
-            rep.pack_reply_header(rh);
-            rep.take_buf(&b1,&sz1);
+            rep.pack_header(rh);
+            b1 = rep;
 
 
-            IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
+            IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
 
                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
 
-            if(h.clt_nonce > 0){
+            if (h.clt_nonce > 0) {
                 // only record replies for clients that require at-most-once logic
                 // only record replies for clients that require at-most-once logic
-                add_reply(h.clt_nonce, h.xid, b1, sz1);
+                add_reply(h.clt_nonce, h.xid, b1);
             }
 
             // get the latest connection to the client
             }
 
             // get the latest connection to the client
@@ -550,22 +545,18 @@ rpcs::dispatch(djob_t *j)
                 }
             }
 
                 }
             }
 
-            c->send(b1, sz1);
-            if(h.clt_nonce == 0){
-                // reply is not added to at-most-once window, free it
-                free(b1);
-            }
+            c->send(rep);
             break;
         case INPROGRESS: // server is working on this request
             break;
         case DONE: // duplicate and we still have the response
             break;
         case INPROGRESS: // server is working on this request
             break;
         case DONE: // duplicate and we still have the response
-            c->send(b1, sz1);
+            c->send(b1);
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
             break;
         case FORGOTTEN: // very old request and we don't have the response anymore
-            IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
+            IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
             rh.ret = rpc_const::atmostonce_failure;
             rh.ret = rpc_const::atmostonce_failure;
-            rep.pack_reply_header(rh);
-            c->send(rep.cstr(),rep.size());
+            rep.pack_header(rh);
+            c->send(rep);
             break;
     }
     c->decref();
             break;
     }
     c->decref();
@@ -583,11 +574,11 @@ rpcs::dispatch(djob_t *j)
 // returns one of:
 //   NEW: never seen this xid before.
 //   INPROGRESS: seen this xid, and still processing it.
 // returns one of:
 //   NEW: never seen this xid before.
 //   INPROGRESS: seen this xid, and still processing it.
-//   DONE: seen this xid, previous reply returned in *b and *sz.
+//   DONE: seen this xid, previous reply returned in b.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
-        int xid_rep, char **b, size_t *sz)
+        int xid_rep, string & b)
 {
     lock rwl(reply_window_m_);
 
 {
     lock rwl(reply_window_m_);
 
@@ -602,10 +593,8 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 
     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
         // scan for deletion candidates
 
     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
         // scan for deletion candidates
-        for (; it != l.end() && it->xid < xid_rep; it++) {
-            if (it->cb_present)
-                free(it->buf);
-        }
+        while (it != l.end() && it->xid < xid_rep)
+            it++;
         l.erase(start, it);
         l.begin()->xid = xid_rep;
     }
         l.erase(start, it);
         l.begin()->xid = xid_rep;
     }
@@ -621,12 +610,10 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
     if (it != l.end() && it->xid == xid) {
         if (it->cb_present) {
             // return information about the remembered reply
     if (it != l.end() && it->xid == xid) {
         if (it->cb_present) {
             // return information about the remembered reply
-            *b = it->buf;
-            *sz = it->sz;
+            b = it->buf;
             return DONE;
             return DONE;
-        } else {
-            return INPROGRESS;
         }
         }
+        return INPROGRESS;
     } else {
         // remember that a new request has arrived
         l.insert(it, reply_t(xid));
     } else {
         // remember that a new request has arrived
         l.insert(it, reply_t(xid));
@@ -635,14 +622,11 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
 }
 
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
 }
 
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
-// and passes the return value in b and sz.
-// add_reply() should remember b and sz.
-// free_reply_window() and checkduplicate_and_update is responsible for
-// calling free(b).
-void
-rpcs::add_reply(unsigned int clt_nonce, int xid,
-        char *b, size_t sz)
-{
+// and passes the return value in b.
+// add_reply() should remember b.
+// free_reply_window() and checkduplicate_and_update are responsible for
+// cleaning up the remembered values.
+void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
     list<reply_t> &l = reply_window_[clt_nonce];
     lock rwl(reply_window_m_);
     // remember the RPC reply value
     list<reply_t> &l = reply_window_[clt_nonce];
@@ -652,38 +636,26 @@ rpcs::add_reply(unsigned int clt_nonce, int xid,
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
         cerr << "Could not find reply struct in add_reply" << endl;
     // there should already be an entry, so whine if there isn't
     if (it == l.end() || it->xid != xid) {
         cerr << "Could not find reply struct in add_reply" << endl;
-        l.insert(it, reply_t(xid, b, sz));
+        l.insert(it, reply_t(xid, b));
     } else {
     } else {
-        *it = reply_t(xid, b, sz);
+        *it = reply_t(xid, b);
     }
 }
 
 void rpcs::free_reply_window(void) {
     lock rwl(reply_window_m_);
     }
 }
 
 void rpcs::free_reply_window(void) {
     lock rwl(reply_window_m_);
-    for (auto clt : reply_window_) {
-        for (auto it : clt.second){
-            if (it.cb_present)
-                free(it.buf);
-        }
-        clt.second.clear();
-    }
     reply_window_.clear();
 }
 
 int rpcs::rpcbind(unsigned int &r, int) {
     reply_window_.clear();
 }
 
 int rpcs::rpcbind(unsigned int &r, int) {
-    IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
+    IF_LEVEL(2) LOG("called return nonce " << nonce_);
     r = nonce_;
     return 0;
 }
 
     r = nonce_;
     return 0;
 }
 
-bool operator<(const sockaddr_in &a, const sockaddr_in &b){
-    return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
-            ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
-             ((a.sin_port < b.sin_port))));
-}
+static sockaddr_in make_sockaddr(const string &host, const string &port);
 
 
-/*---------------auxilary function--------------*/
-sockaddr_in make_sockaddr(const string &hostandport) {
+static sockaddr_in make_sockaddr(const string &hostandport) {
     auto colon = hostandport.find(':');
     if (colon == string::npos)
         return make_sockaddr("127.0.0.1", hostandport);
     auto colon = hostandport.find(':');
     if (colon == string::npos)
         return make_sockaddr("127.0.0.1", hostandport);
@@ -691,7 +663,7 @@ sockaddr_in make_sockaddr(const string &hostandport) {
         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
 }
 
         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
 }
 
-sockaddr_in make_sockaddr(const string &host, const string &port) {
+static sockaddr_in make_sockaddr(const string &host, const string &port) {
     sockaddr_in dst;
     bzero(&dst, sizeof(dst));
     dst.sin_family = AF_INET;
     sockaddr_in dst;
     bzero(&dst, sizeof(dst));
     dst.sin_family = AF_INET;
index f01af09..5dabe4b 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -30,10 +30,10 @@ class rpcc : public chanmgr {
 
         //manages per rpc info
         struct caller {
 
         //manages per rpc info
         struct caller {
-            caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {}
+            caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
 
             int xid;
 
             int xid;
-            unmarshall *un;
+            string *rep;
             int intret;
             bool done = false;
             mutex m;
             int intret;
             bool done = false;
             mutex m;
@@ -95,10 +95,9 @@ class rpcc : public chanmgr {
 
         int islossy() { return lossytest_ > 0; }
 
 
         int islossy() { return lossytest_ > 0; }
 
-        int call1(proc_t proc, 
-                marshall &req, unmarshall &rep, TO to);
+        int call1(proc_t proc, marshall &req, string &rep, TO to);
 
 
-        bool got_pdu(connection *c, char *b, size_t sz);
+        bool got_pdu(connection *c, const string & b);
 
         template<class R>
             int call_m(proc_t proc, marshall &req, R & r, TO to);
 
         template<class R>
             int call_m(proc_t proc, marshall &req, R & r, TO to);
@@ -113,8 +112,9 @@ class rpcc : public chanmgr {
 template<class R> int 
 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
 {
 template<class R> int 
 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
 {
-    unmarshall u;
-    int intret = call1(proc, req, u, to);
+    string rep;
+    int intret = call1(proc, req, rep, to);
+    unmarshall u(rep, true);
     if (intret < 0) return intret;
     u >> r;
     if (u.okdone() != true) {
     if (intret < 0) return intret;
     u >> r;
     if (u.okdone() != true) {
@@ -139,8 +139,6 @@ rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
     return call_m(proc, m, r, to);
 }
 
     return call_m(proc, m, r, to);
 }
 
-bool operator<(const sockaddr_in &a, const sockaddr_in &b);
-
 // rpc server endpoint.
 class rpcs : public chanmgr {
 
 // rpc server endpoint.
 class rpcs : public chanmgr {
 
@@ -158,22 +156,11 @@ class rpcs : public chanmgr {
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
     struct reply_t {
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
     struct reply_t {
-        reply_t (int _xid) {
-            xid = _xid;
-            cb_present = false;
-            buf = NULL;
-            sz = 0;
-        }
-        reply_t (int _xid, char *_buf, size_t _sz) {
-            xid = _xid;
-            cb_present = true;
-            buf = _buf;
-            sz = _sz;
-        }
+        reply_t (int _xid) : xid(_xid), cb_present(false) {}
+        reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
         int xid;
         bool cb_present; // whether the reply buffer is valid
         int xid;
         bool cb_present; // whether the reply buffer is valid
-        char *buf;      // the reply buffer
-        size_t sz;         // the size of reply buffer
+        string buf;      // the reply buffer
     };
 
     unsigned int port_;
     };
 
     unsigned int port_;
@@ -185,11 +172,10 @@ class rpcs : public chanmgr {
     map<unsigned int, list<reply_t> > reply_window_;
 
     void free_reply_window(void);
     map<unsigned int, list<reply_t> > reply_window_;
 
     void free_reply_window(void);
-    void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
+    void add_reply(unsigned int clt_nonce, int xid, const string & b);
 
     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
 
     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
-            int xid, int rep_xid,
-            char **b, size_t *sz);
+            int xid, int rep_xid, string & b);
 
     void updatestat(proc_t proc);
 
 
     void updatestat(proc_t proc);
 
@@ -201,7 +187,6 @@ class rpcs : public chanmgr {
     size_t curr_counts_;
     map<proc_t, size_t> counts_;
 
     size_t curr_counts_;
     map<proc_t, size_t> counts_;
 
-    int lossytest_; 
     bool reachable_;
 
     // map proc # to function
     bool reachable_;
 
     // map proc # to function
@@ -216,10 +201,8 @@ class rpcs : public chanmgr {
     protected:
 
     struct djob_t {
     protected:
 
     struct djob_t {
-        djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
-        char *buf;
-        size_t sz;
         connection *conn;
         connection *conn;
+        string buf;
     };
     void dispatch(djob_t *);
 
     };
     void dispatch(djob_t *);
 
@@ -227,7 +210,7 @@ class rpcs : public chanmgr {
     void reg1(proc_t proc, handler *);
 
     ThrPool* dispatchpool_;
     void reg1(proc_t proc, handler *);
 
     ThrPool* dispatchpool_;
-    tcpsconn* listener_;
+    tcpsconn *listener_;
 
     public:
     rpcs(unsigned int port, size_t counts=0);
 
     public:
     rpcs(unsigned int port, size_t counts=0);
@@ -238,7 +221,7 @@ class rpcs : public chanmgr {
 
     void set_reachable(bool r) { reachable_ = r; }
 
 
     void set_reachable(bool r) { reachable_ = r; }
 
-    bool got_pdu(connection *c, char *b, size_t sz);
+    bool got_pdu(connection *c, const string & b);
 
     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
 };
 
     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
 };
@@ -253,7 +236,4 @@ template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
 }
 
     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
 }
 
-sockaddr_in make_sockaddr(const string &hostandport);
-sockaddr_in make_sockaddr(const string &host, const string &port);
-
 #endif
 #endif
index 7217b25..b90d19a 100644 (file)
@@ -80,8 +80,8 @@ testmarshall()
 {
     marshall m;
     request_header rh{1,2,3,4,5};
 {
     marshall m;
     request_header rh{1,2,3,4,5};
-    m.pack_req_header(rh);
-    VERIFY(m.size()==RPC_HEADER_SZ);
+    m.pack_header(rh);
+    VERIFY(((string)m).size()==RPC_HEADER_SZ);
     int i = 12345;
     unsigned long long l = 1223344455L;
     string s = "hallo....";
     int i = 12345;
     unsigned long long l = 1223344455L;
     string s = "hallo....";
@@ -89,14 +89,12 @@ testmarshall()
     m << l;
     m << s;
 
     m << l;
     m << s;
 
-    char *b;
-    size_t sz;
-    m.take_buf(&b,&sz);
-    VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+    string b = m;
+    VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
 
 
-    unmarshall un(b,sz);
+    unmarshall un(b, true);
     request_header rh1;
     request_header rh1;
-    un.unpack_req_header(&rh1);
+    un.unpack_header(rh1);
     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
     int i1;
     unsigned long long l1;
     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
     int i1;
     unsigned long long l1;
@@ -131,11 +129,11 @@ client1(size_t cl)
         int arg = (random() % 1000);
         int rep;
 
         int arg = (random() % 1000);
         int rep;
 
-        auto start = std::chrono::steady_clock::now();
+        auto start = steady_clock::now();
 
         int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
 
         int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
-        auto end = std::chrono::steady_clock::now();
-        auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+        auto end = steady_clock::now();
+        auto diff = duration_cast<milliseconds>(end - start).count();
         if (ret != 0)
             cout << diff << " ms have elapsed!!!" << endl;
         VERIFY(ret == 0);
         if (ret != 0)
             cout << diff << " ms have elapsed!!!" << endl;
         VERIFY(ret == 0);
@@ -401,7 +399,7 @@ main(int argc, char *argv[])
 
     if (isclient) {
         // server's address.
 
     if (isclient) {
         // server's address.
-        dst = "127.0.0.1:" + std::to_string(port);
+        dst = "127.0.0.1:" + to_string(port);
 
 
         // start the client.  bind it to the server.
 
 
         // start the client.  bind it to the server.
index 8b9691b..7d3cf7d 100644 (file)
@@ -3,16 +3,14 @@
 // if blocking, then addJob() blocks when queue is full
 // otherwise, addJob() simply returns false when queue is full
 ThrPool::ThrPool(size_t sz, bool blocking)
 // if blocking, then addJob() blocks when queue is full
 // otherwise, addJob() simply returns false when queue is full
 ThrPool::ThrPool(size_t sz, bool blocking)
-: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
-{
+: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
        for (size_t i=0; i<nthreads_; i++)
         th_.emplace_back(&ThrPool::do_worker, this);
 }
 
 // IMPORTANT: this function can be called only when no external thread 
 // will ever use this thread pool again or is currently blocking on it
        for (size_t i=0; i<nthreads_; i++)
         th_.emplace_back(&ThrPool::do_worker, this);
 }
 
 // IMPORTANT: this function can be called only when no external thread 
 // will ever use this thread pool again or is currently blocking on it
-ThrPool::~ThrPool()
-{
+ThrPool::~ThrPool() {
        for (size_t i=0; i<nthreads_; i++)
                jobq_.enq(job_t());
 
        for (size_t i=0; i<nthreads_; i++)
                jobq_.enq(job_t());
 
@@ -20,15 +18,11 @@ ThrPool::~ThrPool()
         th_[i].join();
 }
 
         th_[i].join();
 }
 
-bool 
-ThrPool::addJob(const job_t &j)
-{
+bool ThrPool::addJob(const job_t &j) {
        return jobq_.enq(j,blockadd_);
 }
 
        return jobq_.enq(j,blockadd_);
 }
 
-void
-ThrPool::do_worker()
-{
+void ThrPool::do_worker() {
     job_t j;
        while (1) {
         jobq_.deq(&j);
     job_t j;
        while (1) {
         jobq_.deq(&j);
index 94ce237..5b95cba 100644 (file)
@@ -4,7 +4,7 @@
 #include "types.h"
 #include "fifo.h"
 
 #include "types.h"
 #include "fifo.h"
 
-typedef std::function<void()> job_t;
+typedef function<void()> job_t;
 
 class ThrPool {
        public:
 
 class ThrPool {
        public:
@@ -18,7 +18,7 @@ class ThrPool {
                bool blockadd_;
 
                fifo<job_t> jobq_;
                bool blockadd_;
 
                fifo<job_t> jobq_;
-               std::vector<std::thread> th_;
+               vector<thread> th_;
 
         void do_worker();
 };
 
         void do_worker();
 };
diff --git a/rsm.cc b/rsm.cc
index f12f9db..843418a 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -86,7 +86,7 @@
 #include "rsm.h"
 #include "rsm_client.h"
 
 #include "rsm.h"
 #include "rsm_client.h"
 
-rsm::rsm(std::string _first, std::string _me) :
+rsm::rsm(string _first, string _me) :
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
     stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
     partitioned (false), dopartition(false), break1(false), break2(false)
 {
@@ -111,13 +111,13 @@ rsm::rsm(std::string _first, std::string _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = new rpcs((uint32_t)std::stoi(_me) + 1);
+    testsvr = new rpcs((uint32_t)stoi(_me) + 1);
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 
     {
         lock ml(rsm_mutex);
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 
     {
         lock ml(rsm_mutex);
-        std::thread(&rsm::recovery, this).detach();
+        thread(&rsm::recovery, this).detach();
     }
 }
 
     }
 }
 
@@ -140,7 +140,7 @@ void rsm::recovery() [[noreturn]] {
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary?
+                this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
                 ml.lock();
             }
         }
@@ -195,7 +195,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
 
 bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
     // Remember the primary of vid_insync
 
 bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
     // Remember the primary of vid_insync
-    std::string m = primary;
+    string m = primary;
     while (vid_insync == vid_commit) {
         if (statetransfer(m, rsm_mutex_lock))
             break;
     while (vid_insync == vid_commit) {
         if (statetransfer(m, rsm_mutex_lock))
             break;
@@ -208,7 +208,7 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) {
  * Call to transfer state from m to the local node.
  * Assumes that rsm_mutex is already held.
  */
  * Call to transfer state from m to the local node.
  * Assumes that rsm_mutex is already held.
  */
-bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
+bool rsm::statetransfer(string m, lock & rsm_mutex_lock)
 {
     rsm_protocol::transferres r;
     handle h(m);
 {
     rsm_protocol::transferres r;
     handle h(m);
@@ -225,7 +225,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
         rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
         rsm_mutex_lock.lock();
     }
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+        LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
         return false;
     }
     if (stf && last_myvs != r.last) {
         return false;
     }
     if (stf && last_myvs != r.last) {
@@ -236,7 +236,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock)
     return true;
 }
 
     return true;
 }
 
-bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
+bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
     handle h(m);
     rpcc *cl = h.safebind();
     rsm_mutex_lock.unlock();
     handle h(m);
     rpcc *cl = h.safebind();
@@ -251,7 +251,7 @@ bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) {
 }
 
 
 }
 
 
-bool rsm::join(std::string m, lock & rsm_mutex_lock) {
+bool rsm::join(string m, lock & rsm_mutex_lock) {
     handle h(m);
     int ret = 0;
     string log;
     handle h(m);
     int ret = 0;
     string log;
@@ -269,7 +269,7 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) {
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
     }
 
     if (cl == 0 || ret != rsm_protocol::OK) {
-        LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
+        LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret);
         return false;
     }
     LOG("rsm::join: succeeded " << log);
         return false;
     }
     LOG("rsm::join: succeeded " << log);
@@ -303,18 +303,18 @@ void rsm::commit_change(unsigned vid, lock &) {
 }
 
 
 }
 
 
-void rsm::execute(int procno, std::string req, std::string &r) {
+void rsm::execute(int procno, string req, string &r) {
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
-    unmarshall args(req);
+    unmarshall args(req, false);
     marshall rep;
     marshall rep;
-    std::string reps;
+    string reps;
     auto ret = (rsm_protocol::status)(*h)(args, rep);
     marshall rep1;
     rep1 << ret;
     auto ret = (rsm_protocol::status)(*h)(args, rep);
     marshall rep1;
     rep1 << ret;
-    rep1 << rep.str();
-    r = rep1.str();
+    rep1 << rep.content();
+    r = rep1.content();
 }
 
 //
 }
 
 //
@@ -323,11 +323,11 @@ void rsm::execute(int procno, std::string req, std::string &r) {
 // number, and invokes it on all members of the replicated state
 // machine.
 //
 // number, and invokes it on all members of the replicated state
 // machine.
 //
-rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) {
-    LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
+rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) {
+    LOG("rsm::client_invoke: procno 0x" << hex << procno);
     lock ml(invoke_mutex);
     lock ml(invoke_mutex);
-    std::vector<std::string> m;
-    std::string myaddr;
+    vector<string> m;
+    string myaddr;
     viewstamp vs;
     {
         lock ml2(rsm_mutex);
     viewstamp vs;
     {
         lock ml2(rsm_mutex);
@@ -377,11 +377,11 @@ rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) {
-    LOG("rsm::invoke: procno 0x" << std::hex << proc);
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) {
+    LOG("rsm::invoke: procno 0x" << hex << proc);
     lock ml(invoke_mutex);
     lock ml(invoke_mutex);
-    std::vector<std::string> m;
-    std::string myaddr;
+    vector<string> m;
+    string myaddr;
     {
         lock ml2(rsm_mutex);
         // check if !inviewchange
     {
         lock ml2(rsm_mutex);
         // check if !inviewchange
@@ -402,7 +402,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req)
             return rsm_protocol::ERR;
         myvs++;
     }
             return rsm_protocol::ERR;
         myvs++;
     }
-    std::string r;
+    string r;
     execute(proc, req, r);
     last_myvs = vs;
     breakpoint1();
     execute(proc, req, r);
     last_myvs = vs;
     breakpoint1();
@@ -412,7 +412,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req)
 /**
  * RPC handler: Send back the local node's state to the caller
  */
 /**
  * RPC handler: Send back the local node's state to the caller
  */
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
@@ -429,7 +429,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string
  * RPC handler: Inform the local node (the primary) that node m has synchronized
  * for view vid
  */
  * RPC handler: Inform the local node (the primary) that node m has synchronized
  * for view vid
  */
-rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
+rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) {
     lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
@@ -442,7 +442,7 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
 // a node that wants to join an RSM as a server sends a
 // joinreq to the RSM's current primary; this is the
 // handler for that RPC.
 // a node that wants to join an RSM as a server sends a
 // joinreq to the RSM's current primary; this is the
 // handler for that RPC.
-rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
+rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) {
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
     auto ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
@@ -481,8 +481,8 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) {
  * so the client can switch to a different primary
  * when it existing primary fails
  */
  * so the client can switch to a different primary
  * when it existing primary fails
  */
-rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int) {
-    std::vector<std::string> m;
+rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+    vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
     m.push_back(primary);
@@ -495,7 +495,7 @@ rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
-    std::vector<std::string> c, p;
+    vector<string> c, p;
     cfg->get_view(vid, c);
     cfg->get_view(vid - 1, p);
     VERIFY (c.size() > 0);
     cfg->get_view(vid, c);
     cfg->get_view(vid - 1, p);
     VERIFY (c.size() > 0);
@@ -528,7 +528,7 @@ bool rsm::amiprimary() {
 
 // assumes caller holds rsm_mutex
 void rsm::net_repair(bool heal, lock &) {
 
 // assumes caller holds rsm_mutex
 void rsm::net_repair(bool heal, lock &) {
-    std::vector<std::string> m;
+    vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
         if (m[i] != cfg->myaddr()) {
index 4a80f60..0b1dc88 100644 (file)
 
 class rsm_client {
     protected:
 
 class rsm_client {
     protected:
-        std::string primary;
-        std::vector<std::string> known_mems;
-        std::mutex rsm_client_mutex;
+        string primary;
+        vector<string> known_mems;
+        mutex rsm_client_mutex;
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
     public:
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
     public:
-        rsm_client(std::string dst);
-        rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req);
+        rsm_client(string dst);
+        rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
 
         template<class R, class ...Args>
             int call(unsigned int proc, R & r, const Args & ...a1);
 
         template<class R, class ...Args>
             int call(unsigned int proc, R & r, const Args & ...a1);
@@ -31,11 +31,11 @@ class rsm_client {
 
 template<class R>
 int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
 
 template<class R>
 int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
-    std::string rep;
-    std::string res;
-    int intret = invoke(proc, rep, req.cstr());
+    string rep;
+    string res;
+    int intret = invoke(proc, rep, req);
     VERIFY( intret == rsm_client_protocol::OK );
     VERIFY( intret == rsm_client_protocol::OK );
-    unmarshall u(rep);
+    unmarshall u(rep, false);
     u >> intret;
     if (intret < 0) return intret;
     u >> res;
     u >> intret;
     if (intret < 0) return intret;
     u >> res;
@@ -47,7 +47,7 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
         VERIFY(0);
         return rpc_const::unmarshal_reply_failure;
     }
         VERIFY(0);
         return rpc_const::unmarshal_reply_failure;
     }
-    unmarshall u1(res);
+    unmarshall u1(res, false);
     u1 >> r;
     if(!u1.okdone()) {
         cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
     u1 >> r;
     if(!u1.okdone()) {
         cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl;
index 3965fa9..f4238db 100644 (file)
@@ -3,7 +3,7 @@
 #include "rsmtest_client.h"
 #include <arpa/inet.h>
 
 #include "rsmtest_client.h"
 #include <arpa/inet.h>
 
-rsmtest_client::rsmtest_client(std::string dst) : cl(dst) {
+rsmtest_client::rsmtest_client(string dst) : cl(dst) {
     if (cl.bind() < 0)
         cout << "rsmtest_client: call bind" << endl;
 }
     if (cl.bind() < 0)
         cout << "rsmtest_client: call bind" << endl;
 }
index ad3d4c1..e71fede 100644 (file)
@@ -11,7 +11,7 @@ class rsmtest_client {
     protected:
         rpcc cl;
     public:
     protected:
         rpcc cl;
     public:
-        rsmtest_client(std::string d);
+        rsmtest_client(string d);
         virtual ~rsmtest_client() {}
         virtual rsm_test_protocol::status net_repair(int heal);
         virtual rsm_test_protocol::status breakpoint(int b);
         virtual ~rsmtest_client() {}
         virtual rsm_test_protocol::status net_repair(int heal);
         virtual rsm_test_protocol::status breakpoint(int b);
index 706e2b7..750c5d2 100644 (file)
@@ -11,8 +11,7 @@ extern int next_instance_num;
 extern char log_thread_prefix;
 
 namespace std {
 extern char log_thread_prefix;
 
 namespace std {
-    // This is an awful hack.  But sticking this in std:: makes it possible for
-    // ostream_iterator to use it.
+    // Sticking this in std:: makes it possible for ostream_iterator to use it.
     template <class A, class B>
     ostream & operator<<(ostream &o, const pair<A,B> &d) {
         return o << "<" << d.first << "," << d.second << ">";
     template <class A, class B>
     ostream & operator<<(ostream &o, const pair<A,B> &d) {
         return o << "<" << d.first << "," << d.second << ">";
@@ -26,7 +25,7 @@ operator<<(ostream &o, const A &a) {
 }
 
 #define LOG_PREFIX { \
 }
 
 #define LOG_PREFIX { \
-    auto _thread_ = std::this_thread::get_id(); \
+    auto _thread_ = this_thread::get_id(); \
     int _tid_ = thread_name_map[_thread_]; \
     if (_tid_==0) \
         _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
     int _tid_ = thread_name_map[_thread_]; \
     if (_tid_==0) \
         _tid_ = thread_name_map[_thread_] = ++next_thread_num; \
@@ -39,7 +38,7 @@ operator<<(ostream &o, const A &a) {
     int _self_ = instance_name_map[this]; \
     if (_self_==0) \
         _self_ = instance_name_map[this] = ++next_instance_num; \
     int _self_ = instance_name_map[this]; \
     if (_self_==0) \
         _self_ = instance_name_map[this] = ++next_instance_num; \
-    cerr << "#" << setw(2) << _self_; \
+    cerr << "#" << setw(2) << " " << _self_; \
 }
 
 #define LOG_NONMEMBER(_x_) { \
 }
 
 #define LOG_NONMEMBER(_x_) { \
diff --git a/types.h b/types.h
index e6b5895..e78186c 100644 (file)
--- a/types.h
+++ b/types.h
@@ -50,6 +50,7 @@ using std::ostream;
 using std::istream;
 using std::ostream_iterator;
 using std::istream_iterator;
 using std::istream;
 using std::ostream_iterator;
 using std::istream_iterator;
+using std::ios;
 
 #include <limits>
 using std::numeric_limits;
 
 #include <limits>
 using std::numeric_limits;
@@ -77,6 +78,11 @@ using std::stoi;
 
 #include <thread>
 using std::thread;
 
 #include <thread>
 using std::thread;
+using std::call_once;
+using std::once_flag;
+namespace this_thread {
+    using namespace std::this_thread;
+}
 
 #include <tuple>
 using std::tuple;
 
 #include <tuple>
 using std::tuple;
@@ -92,10 +98,12 @@ using std::is_member_function_pointer;
 using std::is_same;
 using std::underlying_type;
 using std::enable_if;
 using std::is_same;
 using std::underlying_type;
 using std::enable_if;
+using std::remove_reference;
 
 #include <utility>
 using std::pair;
 using std::declval;
 
 #include <utility>
 using std::pair;
 using std::declval;
+using std::forward;
 
 #include <vector>
 using std::vector;
 
 #include <vector>
 using std::vector;
@@ -145,4 +153,21 @@ LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
 LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
 LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
 
 LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \
 LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
 
+// The following implementation of tuple_indices is redistributed under the MIT
+// License as an insubstantial portion of the LLVM compiler infrastructure.
+
+template <size_t...> struct tuple_indices {};
+template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
+template <size_t S, size_t... Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
+    typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
+};
+template <size_t E, size_t... Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
+    typedef tuple_indices<Indices...> type;
+};
+template <size_t E, size_t S=0> struct make_tuple_indices {
+    typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
+};
+
+#include "endian.h"
+
 #endif
 #endif