So many changes. Broken. iannucci
authorPeter Iannucci <iannucci@mit.edu>
Fri, 18 Jul 2014 17:48:07 +0000 (13:48 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 18 Jul 2014 17:48:07 +0000 (13:48 -0400)
50 files changed:
Makefile
Makefile.osx
config.cc
debug.cc [moved from threaded_log.cc with 71% similarity]
include/config.h [moved from config.h with 83% similarity]
include/debug.h [moved from threaded_log.h with 88% similarity]
include/endian.h [moved from endian.h with 98% similarity]
include/lock_client.h [moved from lock_client.h with 63% similarity]
include/lock_protocol.h [moved from lock_protocol.h with 93% similarity]
include/lock_server.h [new file with mode: 0644]
include/log.h [new file with mode: 0644]
include/maybe.h [moved from maybe.h with 100% similarity]
include/paxos.h [moved from paxos.h with 69% similarity]
include/paxos_protocol.h [moved from paxos_protocol.h with 83% similarity]
include/rpc/connection.h [moved from rpc/connection.h with 92% similarity]
include/rpc/fifo.h [moved from rpc/fifo.h with 74% similarity]
include/rpc/file.h [moved from rpc/file.h with 57% similarity]
include/rpc/marshall.h [new file with mode: 0644]
include/rpc/marshall_wrap.h [moved from rpc/marshall_wrap.h with 97% similarity]
include/rpc/poll_mgr.h [moved from rpc/poll_mgr.h with 97% similarity]
include/rpc/rpc.h [moved from rpc/rpc.h with 93% similarity]
include/rpc/rpc_protocol.h [moved from rpc/rpc_protocol.h with 92% similarity]
include/rpc/thread_pool.h [moved from rpc/thread_pool.h with 92% similarity]
include/rsm.h [moved from rsm.h with 93% similarity]
include/rsm_client.h [new file with mode: 0644]
include/rsm_protocol.h [moved from rsm_protocol.h with 95% similarity]
include/rsmtest_client.h [moved from rsmtest_client.h with 87% similarity]
include/t4.h [moved from t4.h with 91% similarity]
include/types.h [moved from types.h with 78% similarity]
lock_client.cc
lock_server.cc
lock_server.h [deleted file]
lock_smain.cc
lock_tester.cc
log.cc
log.h [deleted file]
marshall.py [new file with mode: 0755]
paxos.cc
rpc/connection.cc
rpc/marshall.h [deleted file]
rpc/poll_mgr.cc
rpc/rpc.cc
rpc/rpctest.cc
rpc/thread_pool.cc
rsm.cc
rsm_client.cc
rsm_client.h [deleted file]
rsm_tester.cc
rsmtest_client.cc
t4.cc

index 5dd06c5..e2c8d5e 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,31 +1,50 @@
-CXXFLAGS ?= -g -MMD -Werror -I. -std=c++1y
-LDFLAGS ?= 
-CXX ?= g++
-CC ?= g++
-EXTRA_TARGETS ?=
+CLANGXX ?= clang++
+GXX ?= g++
+USE_CLANG ?= 1
+CC = $(CXX)
 
-all: lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
+OPTFLAGS = -O3
+CXXFLAGS = -ggdb3 -MMD -I. -std=c++1y $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS)
 
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
-       rm -f $@
-       ar cq $@ $^
-       ranlib rpc/librpc.a
+POSTBUILD ?=
 
-rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o
+PRODUCTS = lock_server lock_tester rsm_tester rpc/rpctest
 
-lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
+ifeq "$(USE_CLANG)" "1"
 
-lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
+PEDANTRY = -Werror -Weverything -Wall -Wextra -pedantic-errors -pedantic \
+                  -Wno-c++98-compat-pedantic -Wno-padded -Weffc++ \
+                  -Wno-non-virtual-dtor -Wno-weak-vtables
+STDLIB = -stdlib=libc++
+CXX = $(CLANGXX)
 
-rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o
+else
 
-%.o: %.cc
-       $(CXX) $(CXXFLAGS) -c $< -o $@
+PEDANTRY = -pedantic -Wall -Wextra -fno-default-inline -Werror
+STDLIB = -pthread
+CXX = $(GXX)
+
+endif
+
+all: $(PRODUCTS) $(POSTBUILD)
+
+LIBRPC_OBJECTS = rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
+
+rpc/librpc.a: $(foreach x,$(LIBRPC_OBJECTS),rpc/librpc.a($(x)))
+
+rpc/rpctest: rpc/rpctest.o debug.o rpc/librpc.a t4.o
+
+lock_tester : lock_tester.o lock_client.o debug.o rsm_client.o rpc/librpc.a t4.o
+
+lock_server : lock_smain.o debug.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
+
+rsm_tester: rsm_tester.o rsmtest_client.o debug.o rpc/librpc.a t4.o
 
 -include *.d
 -include rpc/*.d
 
 clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester rsm_tester
 .PHONY: clean $(EXTRA_TARGETS)
-clean: 
-       rm -rf $(clean_files)
+clean:
+       -rm -rf $(clean_files)
index 92638c7..fd7d6ac 100644 (file)
@@ -1,31 +1,9 @@
-USE_CLANG = 1
+CLANGXX = clang++-mp-3.4
+GXX = g++-mp-4.8
 
-PEDANTRY =
-STDLIB =
-OPTFLAGS = -O3 #-fno-omit-frame-pointer -fsanitize=address ,thread,undefined -fsanitize-memory-track-origins 
-CXXFLAGS = -std=c++1y -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
-LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS)
-
-ifeq "$(USE_CLANG)" "1"
-
-PEDANTRY += \
-       -Weverything -pedantic-errors -Werror -Wno-c++98-compat-pedantic \
-       -Wno-padded -pedantic -Wall -Wextra -Weffc++
-STDLIB += -stdlib=libc++
-CXX = clang++-mp-3.4
-
-else
-
-PEDANTRY += -pedantic -Wall -Wextra -fno-default-inline -Werror
-STDLIB += -pthread
-CXX = g++-mp-4.8
-
-endif
-
-CC := $(CXX)
-EXTRA_TARGETS = signatures
+POSTBUILD = signatures
 
 socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
 signatures : lock_server lock_tester rpc/rpctest
-       echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true
-       echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
+       echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) -s || true
+       echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
index 8a7bf65..b2d2e5c 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,38 +1,35 @@
-#include "config.h"
+#include "include/config.h"
 
 using std::vector;
-
-// The config module maintains views. As a node joins or leaves a
-// view, the next view will be the same as previous view, except with
-// the new node added or removed. The first view contains only node
-// 1. If node 2 joins after the first node (it will download the views
-// from node 1), it will learn about view 1 with the first node as the
-// only member.  It will then invoke Paxos to create the next view.
-// It will tell Paxos to ask the nodes in view 1 to agree on the value
-// {1, 2}.  If Paxos returns success, then it moves to view 2 with
-// {1,2} as the members. When node 3 joins, the config module runs
-// Paxos with the nodes in view 2 and the proposed value to be
-// {1,2,3}. And so on.  When a node discovers that some node of the
-// current view is not responding, it kicks off Paxos to propose a new
-// value (the current view minus the node that isn't responding). The
-// config module uses Paxos to create a total order of views, and it
-// is ensured that the majority of the previous view agrees to the
-// next view.  The Paxos log contains all the values (i.e., views)
-// agreed on.
+using namespace std::chrono;
+
+// The config module maintains views. As a node joins or leaves a view, the
+// next view will be the same as previous view, except with the new node added
+// or removed. The first view contains only node 1. If node 2 joins after the
+// first node (it will download the views from node 1), it will learn about
+// view 1 with the first node as the only member.  It will then invoke Paxos to
+// create the next view.  It will tell Paxos to ask the nodes in view 1 to
+// agree on the value {1, 2}.  If Paxos returns success, then it moves to view
+// 2 with {1,2} as the members. When node 3 joins, the config module runs Paxos
+// with the nodes in view 2 and the proposed value to be {1,2,3}. And so on.
+// When a node discovers that some node of the current view is not responding,
+// it kicks off Paxos to propose a new value (the current view minus the node
+// that isn't responding). The config module uses Paxos to create a total order
+// of views, and it is ensured that the majority of the previous view agrees to
+// the next view.  The Paxos log contains all the values (i.e., views) agreed
+// on.
 //
-// The RSM module informs config to add nodes. The config module
-// runs a heartbeater thread that checks in with nodes.  If a node
-// doesn't respond, the config module will invoke Paxos's proposer to
-// remove the node.  Higher layers will learn about this change when a
-// Paxos acceptor accepts the new proposed value through
-// paxos_commit().
+// The RSM module informs config to add nodes. The config module runs a
+// heartbeater thread that checks in with nodes.  If a node doesn't respond,
+// the config module will invoke Paxos's proposer to remove the node.  Higher
+// layers will learn about this change when a Paxos acceptor accepts the new
+// proposed value through paxos_commit().
 //
-// To be able to bring other nodes up to date to the latest formed
-// view, each node will have a complete history of all view numbers
-// and their values that it knows about. At any time a node can reboot
-// and when it re-joins, it may be many views behind; by remembering
-// all views, the other nodes can bring this re-joined node up to
-// date.
+// To be able to bring other nodes up to date to the latest formed view, each
+// node will have a complete history of all view numbers and their values that
+// it knows about. At any time a node can reboot and when it re-joins, it may
+// be many views behind; by remembering all views, the other nodes can bring
+// this re-joined node up to date.
 
 config_view_change::~config_view_change() {}
 
@@ -52,23 +49,23 @@ void config::restore(const string & s) {
     reconstruct(cfg_mutex_lock);
 }
 
-void config::get_view(unsigned instance, vector<string> & m) {
+vector<string> config::get_view(unsigned instance) {
     lock cfg_mutex_lock(cfg_mutex);
-    get_view(instance, m, cfg_mutex_lock);
+    return get_view(instance, cfg_mutex_lock);
 }
 
-void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
+vector<string> config::get_view(unsigned instance, lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
     string value = paxos.value(instance);
     LOG << "get_view(" << instance << "): returns " << value;
-    m = explode(value);
+    return explode(value);
 }
 
 void config::reconstruct(lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
     my_view_id = paxos.instance();
     if (my_view_id > 0) {
-        get_view(my_view_id, mems, cfg_mutex_lock);
+        mems = get_view(my_view_id, cfg_mutex_lock);
         LOG << "view " << my_view_id << " " << mems;
     }
 }
@@ -99,9 +96,7 @@ void config::paxos_commit(unsigned instance, const string & value) {
 
 bool config::ismember(const string & m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
-    vector<string> v;
-    get_view(vid, v, cfg_mutex_lock);
-    return isamember(m, v);
+    return isamember(m, get_view(vid, cfg_mutex_lock));
 }
 
 bool config::add(const string & new_m, unsigned vid) {
@@ -146,13 +141,12 @@ void config::heartbeater() {
     lock cfg_mutex_lock(cfg_mutex);
 
     while (1) {
-        auto next_timeout = steady_clock::now() + milliseconds(300);
+        auto next_timeout = steady_clock::now() + 300ms;
         LOG << "go to sleep";
         config_cond.wait_until(cfg_mutex_lock, next_timeout);
 
         unsigned vid = my_view_id;
-        vector<string> cmems;
-        get_view(vid, cmems, cfg_mutex_lock);
+        vector<string> cmems = get_view(vid, cfg_mutex_lock);
         LOG << "current membership " << cmems;
 
         if (!isamember(me, cmems)) {
@@ -180,10 +174,10 @@ void config::heartbeater() {
     }
 }
 
-paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
+paxos_protocol::status config::heartbeat(paxos_protocol::view_t & r, string m, paxos_protocol::view_t vid) {
     lock cfg_mutex_lock(cfg_mutex);
-    r = (int) my_view_id;
-    LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
+    r = my_view_id;
+    LOG << "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
     if (vid == my_view_id)
         return paxos_protocol::OK;
     else if (paxos.isrunning()) {
@@ -195,13 +189,13 @@ paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
 
 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
-    unsigned vid = my_view_id;
+    paxos_protocol::view_t vid = my_view_id, r;
     LOG << "heartbeat to " << m << " (" << vid << ")";
 
     cfg_mutex_lock.unlock();
-    int r = 0, ret = rpc_protocol::bind_failure;
+    int ret = rpc_protocol::bind_failure;
     if (auto cl = rpcc::bind_cached(m))
-        ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
+        ret = cl->call_timeout(paxos_protocol::heartbeat, 100ms, r, me, vid);
     cfg_mutex_lock.lock();
 
     heartbeat_t res = OK;
similarity index 71%
rename from threaded_log.cc
rename to debug.cc
index 3218e33..784a054 100644 (file)
+++ b/debug.cc
@@ -1,6 +1,6 @@
-#include "t4.h"
-#include "types.h"
-#include "threaded_log.h"
+#include "include/t4.h"
+#include "include/types.h"
+#include "include/debug.h"
 
 using namespace std::chrono;
 
@@ -9,8 +9,7 @@ locked_ostream && _log_prefix(locked_ostream && f, const string & file, const st
     int tid = global->thread_name_map[thread];
     if (tid==0)
         tid = global->thread_name_map[thread] = ++global->next_thread_num;
-    auto utime = duration_cast<microseconds>(
-            system_clock::now().time_since_epoch()).count() % 1000000000;
+    auto utime = (system_clock::now().time_since_epoch() / 1us) % 1000000000;
     f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
     f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid;
     f << " " << std::setw(20) << file << " " << std::setw(18) << func;
@@ -32,3 +31,12 @@ int _log_debug_level() {
 lock _log_lock() {
     return lock(global->log_mutex);
 }
+
+string hex_string(const string & s) {
+    string bytes;
+    for (char ch : s) {
+        bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]);
+        bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]);
+    }
+    return bytes;
+}
similarity index 83%
rename from config.h
rename to include/config.h
index 8d0c836..ce8990d 100644 (file)
--- a/config.h
@@ -1,8 +1,8 @@
 #ifndef config_h
 #define config_h
 
-#include "types.h"
-#include "paxos.h"
+#include "include/types.h"
+#include "include/paxos.h"
 
 class config_view_change {
     public:
@@ -20,8 +20,8 @@ class config : public paxos_change {
         std::vector<string> mems;
         std::mutex cfg_mutex;
         cond config_cond;
-        paxos_protocol::status heartbeat(int & r, string m, unsigned instance);
-        void get_view(unsigned instance, std::vector<string> & m, lock & cfg_mutex_lock);
+        paxos_protocol::status heartbeat(paxos_protocol::view_t & r, string m, unsigned instance);
+        std::vector<string> get_view(unsigned instance, lock & cfg_mutex_lock);
         bool remove(const string &, lock & cfg_mutex_lock);
         void reconstruct(lock & cfg_mutex_lock);
         typedef enum {
@@ -35,7 +35,7 @@ class config : public paxos_change {
         unsigned view_id() { return my_view_id; }
         const string & myaddr() const { return me; }
         string dump() { return paxos.dump(); }
-        void get_view(unsigned instance, std::vector<string> & m);
+        std::vector<string> get_view(unsigned instance);
         void restore(const string & s);
         bool add(const string &, unsigned view_id);
         bool ismember(const string & m, unsigned view_id);
similarity index 88%
rename from threaded_log.h
rename to include/debug.h
index 9a83bfb..f42be6b 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef threaded_log_h
-#define threaded_log_h
+#ifndef debug_h
+#define debug_h
 
 #include <string>
 #include <ostream>
@@ -20,7 +20,10 @@ lock _log_lock();
 
 #define LOG_NONMEMBER _log_prefix(locked_ostream{std::cerr, _log_lock()}, __FILE__, __func__)
 #define LOG           _log_member(LOG_NONMEMBER, (const void *)this)
+#define DEBUG_LOG LOG
 
 #define IF_LEVEL(level) if(_log_debug_level() >= abs(level))
 
+string hex_string(const string & s);
+
 #endif
similarity index 98%
rename from endian.h
rename to include/endian.h
index 7c78bbc..450eb5d 100644 (file)
--- a/endian.h
@@ -2,6 +2,7 @@
 #define endian_h
 
 #include <cinttypes>
+#include <arpa/inet.h>
 
 constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
 
similarity index 63%
rename from lock_client.h
rename to include/lock_client.h
index 9e449f4..64848b5 100644 (file)
@@ -5,59 +5,50 @@
 
 #ifdef __cplusplus
 
-#include "types.h"
-#include "lock_protocol.h"
-#include "rpc/fifo.h"
-#include "rsm_client.h"
-#include "maybe.h"
-
-class lock_release_user {
-    public:
-        virtual void dorelease(lock_protocol::lockid_t) = 0;
-        virtual ~lock_release_user() {}
-};
-
-class lock_state {
-public:
-    enum {
-        none = 0,
-        retrying,
-        free,
-        locked,
-        acquiring,
-        releasing
-    } state = none;
-    std::thread::id held_by;
-    std::list<thread::id> wanted_by;
-    std::mutex m;
-    std::map<thread::id, cond> c;
-    lock_protocol::xid_t xid;
-    void wait(lock & mutex_lock);
-    void signal();
-    void signal(thread::id who);
-};
-
-typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
+#include "include/types.h"
+#include "include/lock_protocol.h"
+#include "include/rpc/fifo.h"
+#include "include/rsm_client.h"
+#include "include/maybe.h"
 
 // Clients that caches locks.  The server can revoke locks using
 // lock_revoke_server.
 class lock_client {
     private:
+        class lock_state {
+        public:
+            enum {
+                none = 0,
+                retrying,
+                free,
+                locked,
+                acquiring,
+                releasing
+            } state = none;
+            std::thread::id held_by;
+            std::list<thread::id> wanted_by;
+            std::mutex m;
+            std::map<thread::id, cond> c;
+            lock_protocol::xid_t xid;
+            void wait(lock & mutex_lock);
+            void signal();
+            void signal(thread::id who);
+        };
+
         unique_ptr<rpcs> rlsrpc;
         thread releaser_thread;
         unique_ptr<rsm_client> rsmc;
-        lock_release_user *lu;
         in_port_t rlock_port;
         string hostname;
         string id;
         std::mutex xid_mutex;
-        lock_protocol::xid_t next_xid;
+        lock_protocol::xid_t next_xid=0;
         fifo<maybe<lock_protocol::lockid_t>> release_fifo;
         std::mutex lock_table_lock;
-        lock_map lock_table;
+        std::map<lock_protocol::lockid_t, lock_state> lock_table;
         lock_state & get_lock_state(lock_protocol::lockid_t lid);
     public:
-        lock_client(string xdst, lock_release_user *l = 0);
+        lock_client(string xdst);
         ~lock_client();
         lock_protocol::status acquire(lock_protocol::lockid_t);
         lock_protocol::status release(lock_protocol::lockid_t);
similarity index 93%
rename from lock_protocol.h
rename to include/lock_protocol.h
index bd691a0..dd924cb 100644 (file)
@@ -1,8 +1,8 @@
 #ifndef lock_protocol_h
 #define lock_protocol_h
 
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
 
 typedef string callback_t;
 
diff --git a/include/lock_server.h b/include/lock_server.h
new file mode 100644 (file)
index 0000000..74b271a
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef lock_server_h
+#define lock_server_h
+
+#include "include/types.h"
+#include "include/lock_protocol.h"
+#include "include/rsm.h"
+#include "include/rpc/fifo.h"
+
+class lock_server : private rsm_state_transfer {
+    private:
+        using holder_t=std::pair<callback_t, lock_protocol::xid_t>;
+
+        struct lock_state {
+            inline lock_state() {}
+            lock_state(const lock_state & other);
+            bool held=false;
+            holder_t held_by;
+            std::list<holder_t> wanted_by;
+            std::map<callback_t, lock_protocol::xid_t> old_requests;
+            std::mutex m;
+
+            MEMBERS(held, held_by, wanted_by)
+        };
+
+        int nacquire;
+        std::mutex lock_table_lock;
+        std::map<lock_protocol::lockid_t, lock_state> lock_table;
+        lock_state & get_lock_state(lock_protocol::lockid_t lid);
+        fifo<lock_protocol::lockid_t> retry_fifo, revoke_fifo;
+        rsm *rsm_;
+        string marshall_state();
+        void unmarshall_state(const string & state);
+        void revoker NORETURN ();
+        void retryer NORETURN ();
+    public:
+        lock_server(rsm & r);
+        lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+        lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+};
+
+#endif
diff --git a/include/log.h b/include/log.h
new file mode 100644 (file)
index 0000000..cc3603b
--- /dev/null
@@ -0,0 +1,38 @@
+#ifndef log_h
+#define log_h
+
+#include "include/types.h"
+#include "include/paxos_protocol.h"
+#include "include/rpc/marshall.h"
+
+class log {
+    private:
+        string name;
+        std::map<string, std::function<void(unmarshall &)>> handlers;
+
+    public:
+        log(string _me);
+        string read();
+        void write(string s);
+        void replay();
+
+        struct label : public string { label(const char * s) : string(s) {} };
+#define LABEL(_x_) inline operator log::label () const { return _x_; }
+
+        // XXX should be an atomic operation
+        template <class T>
+        void append(const T & t) {
+            std::ofstream(name, std::ios::app) << marshall{label(t), t};
+        }
+
+        template <class T>
+        void handler(std::function<void(T)> h) {
+            handlers[label(T())] = [h, this] (unmarshall & from) {
+                auto entry = from.get<T>();
+                h(entry);
+                DEBUG_LOG << entry;
+            };
+        }
+};
+
+#endif
similarity index 100%
rename from maybe.h
rename to include/maybe.h
similarity index 69%
rename from paxos.h
rename to include/paxos.h
index 20e249c..9b54c05 100644 (file)
--- a/paxos.h
@@ -1,10 +1,10 @@
 #ifndef paxos_h
 #define paxos_h
 
-#include "types.h"
-#include "rpc/rpc.h"
-#include "paxos_protocol.h"
-#include "log.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
+#include "include/paxos_protocol.h"
+#include "include/log.h"
 
 using prepareres = paxos_protocol::prepareres;
 using node_t = paxos_protocol::node_t;
@@ -43,8 +43,7 @@ class proposer_acceptor {
         unsigned instance_h = 0;    // number of the highest instance we have decided
         std::map<unsigned,value_t> values;   // vals of each instance
 
-        friend class log;
-        class log l = {this, me};
+        class log l = {me};
 
         void commit(unsigned instance, const value_t & v, lock & acceptor_mutex_lock);
 
@@ -59,12 +58,39 @@ class proposer_acceptor {
         void breakpoint1();
         void breakpoint2();
 
+        // Log a committed paxos instance
+        struct log_instance {
+            unsigned number;
+            string value;
+            MEMBERS(number, value)
+            LABEL("done")
+        };
+
+        // Log the highest proposal number that the local paxos acceptor has
+        // ever seen; called from paxos when responding to preparereq with
+        // accept
+        struct log_proposal {
+            prop_t promise;
+            MEMBERS(promise)
+            LABEL("propseen")
+        };
+
+        // Log the highest proposal (proposal number and value) that the local
+        // paxos acceptor accept has ever accepted; called from paxos when
+        // responding to acceptreq with true
+        struct log_accept {
+            prop_t number;
+            string value;
+            MEMBERS(number, value)
+            LABEL("accepted")
+        };
+
     public:
         proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value);
         unsigned instance() { return instance_h; }
         const value_t & value(unsigned instance) { return values[instance]; }
-        string dump() { return l.dump(); }
-        void restore(const string & s) { l.restore(s); l.logread(); }
+        string dump() { return l.read(); }
+        void restore(const string & s) { l.write(s); l.replay(); }
         rpcs *get_rpcs() { return &pxs; }
 
         bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
similarity index 83%
rename from paxos_protocol.h
rename to include/paxos_protocol.h
index 3e1fbcd..f3c50e2 100644 (file)
@@ -1,8 +1,8 @@
 #ifndef paxos_protocol_h
 #define paxos_protocol_h
 
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
 
 struct prop_t {
     unsigned n;
@@ -23,13 +23,14 @@ namespace paxos_protocol {
     };
     using node_t = string;
     using nodes_t = std::vector<node_t>;
+    using view_t = unsigned;
     using value_t = string;
 
     REMOTE_PROCEDURE_BASE(0x11000);
     REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t));
     REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t));
     REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t));
-    REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
+    REMOTE_PROCEDURE(4, heartbeat, (view_t &, string, view_t));
 }
 
 #endif
similarity index 92%
rename from rpc/connection.h
rename to include/rpc/connection.h
index 03e92da..a6a7ffe 100644 (file)
@@ -1,13 +1,13 @@
 #ifndef connection_h
 #define connection_h
 
-#include "types.h"
+#include "include/types.h"
 #include <arpa/inet.h>
 #include <netinet/in.h>
-#include "t4.h"
-#include "poll_mgr.h"
-#include "file.h"
-#include "threaded_log.h"
+#include "include/t4.h"
+#include "include/rpc/poll_mgr.h"
+#include "include/rpc/file.h"
+#include "include/debug.h"
 
 class connection;
 
@@ -32,7 +32,7 @@ class connection : private aio_callback, public std::enable_shared_from_this<con
         static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
 
         const time_point create_time = steady_clock::now();
-        const file_t fd;
+        socket_t fd;
 
     private:
         void write_cb(int s);
similarity index 74%
rename from rpc/fifo.h
rename to include/rpc/fifo.h
index 9e4933a..dd3d8c1 100644 (file)
@@ -1,12 +1,12 @@
 #ifndef fifo_h
 #define fifo_h
 
-#include "types.h"
+#include "include/types.h"
 
 template<class T>
 class fifo {
     public:
-        fifo(size_t limit=0) : max_(limit) {}
+        fifo(size_t max_size=0) : max_(max_size) {}
 
         bool enq(T e, bool blocking=true) {
             lock ml(m_);
@@ -34,9 +34,9 @@ class fifo {
     private:
         std::list<T> q_;
         std::mutex m_;
-        cond non_empty_c_; // q went non-empty
-        cond has_space_c_; // q is not longer overfull
-        size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
+        cond non_empty_c_;
+        cond has_space_c_;
+        size_t max_;
 };
 
 #endif
similarity index 57%
rename from rpc/file.h
rename to include/rpc/file.h
index 9ea313f..fd6d580 100644 (file)
@@ -3,10 +3,12 @@
 
 #include <fcntl.h>
 #include <unistd.h>
-#include "types.h"
+#include "include/types.h"
 #include <sys/socket.h>
 
 class file_t {
+    protected:
+        static const int nofile = -1;
     private:
         int fd_;
 
@@ -20,37 +22,41 @@ class file_t {
                 operator int & () { return flags_; }
         };
     public:
-        inline file_t(int fd=-1) : fd_(fd) {}
+        inline file_t(int fd=nofile) : fd_(fd) {}
         inline file_t(const file_t &) = delete;
-        inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
-        inline ~file_t() { if (fd_ != -1) ::close(fd_); }
+        inline file_t(file_t && other) : fd_(nofile) { std::swap(fd_, other.fd_); }
+        inline ~file_t() { close(); }
         static inline void pipe(file_t *ends) {
             int fds[2];
+            ends[0].close();
+            ends[1].close();
             VERIFY(::pipe(fds) == 0);
             ends[0].fd_ = fds[0];
             ends[1].fd_ = fds[1];
         }
-        inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
+        inline operator int() const { if (fd_ == nofile) throw "no fd"; return fd_; }
         inline flags_t flags() const { return {*this}; }
         inline void close() {
-            ::close(fd_);
-            fd_ = -1;
+            if (fd_ != nofile)
+                ::close(fd_);
+            fd_ = nofile;
         }
-        template <class T>
-        inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
+        template <class T> inline typename std::enable_if<std::is_pod<T>::value, ssize_t>::type
+        read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
         inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); }
-        template <class T>
-        inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
+        template <class T> inline typename std::enable_if<std::is_pod<T>::value, ssize_t>::type
+        write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
         inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); }
 };
 
 class socket_t : public file_t {
     public:
-        socket_t(int fd=-1) : file_t(fd) {}
+        socket_t(int fd=nofile) : file_t(fd) {}
         template <class T>
         int setsockopt(int level, int option, T && value) {
             return ::setsockopt(*this, level, option, &value, sizeof(T));
         }
+        inline int shutdown(int how) { return ::shutdown(*this, how); }
 };
 
 #endif
diff --git a/include/rpc/marshall.h b/include/rpc/marshall.h
new file mode 100644 (file)
index 0000000..8b31791
--- /dev/null
@@ -0,0 +1,187 @@
+#ifndef marshall_h
+#define marshall_h
+
+#include "include/types.h"
+#include "include/rpc/rpc_protocol.h"
+
+class marshall : public string {
+    public:
+        template <typename... Args>
+        marshall(const Args & ... args) {
+            UNPACK_STATEMENT(*this << args);
+        }
+
+        template <class T>
+        static inline string datagram(const T & h, const marshall & payload="") {
+            using namespace rpc_protocol;
+            marshall m{rpc_sz_t(payload.size() + RPC_HEADER_SZ - sizeof(rpc_sz_t)), (T)h};
+            VERIFY(m.size() <= RPC_HEADER_SZ); // Datagram header too large
+            m.resize(RPC_HEADER_SZ);
+            return m + payload;
+        }
+};
+
+class unmarshall : string {
+    private:
+        string::const_iterator next = cbegin();
+
+    public:
+        template <typename... Args>
+        inline unmarshall(const string & s, Args && ...args) : string(s) {
+            UNPACK_STATEMENT(*this >> args);
+        }
+
+        template <class H, typename... Args>
+        static inline unmarshall datagram(const string & datagram, H & h, Args && ... args) {
+            rpc_protocol::rpc_sz_t s;
+            VERIFY(unmarshall(datagram, s, h)); // Datagram header too large
+            return unmarshall(datagram.substr(rpc_protocol::RPC_HEADER_SZ), args...);
+        }
+
+        template <class S> inline S get() { S s; *this >> s; return s; }
+        inline bool ok() const { return next <= end(); }
+        inline bool okdone() const { return next == end(); }
+        inline operator bool() const { return ok(); }
+        inline void read(void * t, size_t n) {
+            auto from = next;
+            next += string::const_iterator::difference_type(n);
+            if (next <= end())
+                std::copy(from, next, (char *)t);
+        }
+};
+
+//
+// Marshalling for plain old data
+//
+
+#ifndef MARSHALL_RAW_NETWORK_ORDER_AS
+#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
+inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.append((const char *)&y, sizeof(_d_)); return m; } \
+inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+#endif
+
+#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
+
+MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(int8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint16_t)
+MARSHALL_RAW_NETWORK_ORDER(int16_t)
+MARSHALL_RAW_NETWORK_ORDER(uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(int32_t)
+MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(int64_t)
+
+//
+// Marshalling for tuples (used to implement marshalling for structs)
+//
+
+// In order to iterate over the tuple elements, we first need a template
+// parameter pack containing the tuple's indices.  The function templates named
+// *_imp below accept an empty tag struct as their last argument, and use its
+// template arguments to index the tuple.  The operator<< overloads instantiate
+// the appropriate tag struct to make this possible.
+
+template <class... Args, size_t... Indices> inline marshall &
+tuple_marshall_imp(marshall & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
+    UNPACK_STATEMENT(m << std::get<Indices>(t));
+    return m;
+}
+
+template <class... Args> inline marshall &
+operator<<(marshall & m, tuple<Args...> && t) {
+    return tuple_marshall_imp(m, t, std::index_sequence_for<Args...>{});
+}
+
+template <class... Args, size_t... Indices> inline unmarshall &
+tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, std::index_sequence<Indices...>) {
+    UNPACK_STATEMENT(u >> std::get<Indices>(t));
+    return u;
+}
+
+template <class... Args> inline unmarshall &
+operator>>(unmarshall & u, tuple<Args & ...> && t) {
+    return tuple_unmarshall_imp(u, t, std::index_sequence_for<Args...>{});
+}
+
+//
+// Marshalling for structs or classes containing a MEMBERS declaration
+//
+
+// Implements struct marshalling via tuple marshalling of members.
+template <class A> inline typename
+enable_if<is_tuple_convertible<A>::value, unmarshall>::type &
+operator>>(unmarshall & u, A & a) { return u >> _tuple_(a); }
+
+template <class T> inline typename
+enable_if<is_tuple_convertible<T>::value, marshall>::type &
+operator<<(marshall & m, const T a) { return m << _tuple_(a); }
+
+//
+// Marshalling for STL containers
+//
+
+// this overload is visible for type A only if A::cbegin and A::cend exist
+template <class A> inline typename
+enable_if<is_const_iterable<A>::value, marshall>::type &
+operator<<(marshall & m, const A & x) {
+    m << (uint32_t)x.size();
+    for (const auto & a : x)
+        m << a;
+    return m;
+}
+
+// visible for type A if A::emplace_back(a) makes sense
+template <class A> inline typename
+enable_if<supports_emplace_back<A>::value, unmarshall>::type &
+operator>>(unmarshall & u, A & x) {
+    uint32_t n = u.get<uint32_t>();
+    x.clear();
+    while (n--)
+        x.emplace_back(u.get<typename A::value_type>());
+    return u;
+}
+
+// std::map<A, B>
+template <class A, class B> inline unmarshall &
+operator>>(unmarshall & u, std::map<A,B> & x) {
+    uint32_t n = u.get<uint32_t>();
+    x.clear();
+    while (n--)
+        x.emplace(u.get<std::pair<A,B>>());
+    return u;
+}
+
+// std::string
+inline marshall & operator<<(marshall & m, const string & s) {
+    m << (uint32_t)s.size();
+    m.append(s);
+    return m;
+}
+
+inline unmarshall & operator>>(unmarshall & u, string & s) {
+    uint32_t sz;
+    if (u >> sz) {
+        s.resize(sz);
+        u.read(&s[0], sz);
+    }
+    return u;
+}
+
+//
+// Marshalling for strongly-typed enums
+//
+
+template <class E> typename enable_if<std::is_enum<E>::value, marshall>::type &
+operator<<(marshall & m, E e) {
+    return m << typename std::underlying_type<E>::type(e);
+}
+
+template <class E> typename enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall & u, E & e) {
+    e = E(u.get<typename std::underlying_type<E>::type>());
+    return u;
+}
+
+#endif
similarity index 97%
rename from rpc/marshall_wrap.h
rename to include/rpc/marshall_wrap.h
index 04f5268..497c882 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef marshall_wrap_h
 #define marshall_wrap_h
 
-#include "marshall.h"
+#include "include/rpc/marshall.h"
 
 typedef std::function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
 
@@ -73,7 +73,7 @@ struct marshalled_func_imp<F, C, RV(R &, Args...), ErrorHandler> {
         return new handler([=](unmarshall && u, marshall & m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
-            ArgsStorage t{u._grab<typename std::decay<Args>::type>()...};
+            ArgsStorage t{u.get<typename std::decay<Args>::type>()...};
             // Verify successful unmarshalling of the entire input stream.
             if (!u.okdone())
                 return (RV)ErrorHandler::unmarshall_args_failure();
similarity index 97%
rename from rpc/poll_mgr.h
rename to include/rpc/poll_mgr.h
index 6fe66c0..55ffefb 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef poll_mgr_h
 #define poll_mgr_h 
 
-#include "types.h"
+#include "include/types.h"
 
 #define MAX_POLL_FDS 128
 
similarity index 93%
rename from rpc/rpc.h
rename to include/rpc/rpc.h
index 9464e57..b4fc68f 100644 (file)
--- a/rpc/rpc.h
@@ -1,20 +1,20 @@
 #ifndef rpc_h
 #define rpc_h
 
-#include "types.h"
+#include "include/types.h"
 
-#include "rpc_protocol.h"
-#include "thread_pool.h"
-#include "marshall.h"
-#include "marshall_wrap.h"
-#include "connection.h"
-#include "threaded_log.h"
+#include "include/rpc/rpc_protocol.h"
+#include "include/rpc/thread_pool.h"
+#include "include/rpc/marshall.h"
+#include "include/rpc/marshall_wrap.h"
+#include "include/rpc/connection.h"
+#include "include/debug.h"
 
 using std::chrono::milliseconds;
 
 namespace rpc {
-    static constexpr milliseconds to_max{12000};
-    static constexpr milliseconds to_min{100};
+    static constexpr auto to_max = milliseconds(12000);
+    static constexpr auto to_min = milliseconds(100);
 }
 
 template<class P, class R, class ...Args>
@@ -126,7 +126,9 @@ class rpcc : private connection_delegate {
             string rep;
             int intret = call_marshalled(proc, to, rep, marshall(args...));
             if (intret >= 0) {
-                VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking
+                rpc_protocol::reply_header rh;
+                // guaranteed by static type checking?
+                VERIFY(unmarshall::datagram(rep, rh, r).okdone());
             }
             return intret;
         }
similarity index 92%
rename from rpc/rpc_protocol.h
rename to include/rpc/rpc_protocol.h
index 4f03937..a717beb 100644 (file)
@@ -1,13 +1,13 @@
 #ifndef rpc_protocol_h
 #define rpc_protocol_h
 
-#include "types.h"
+#include "include/types.h"
 
 namespace rpc_protocol {
     using proc_id_t = uint32_t;
 
     using status = int32_t;
-    using rpc_sz_t = uint32_t;
+    using rpc_sz_t = uint64_t;
     using nonce_t = uint32_t;
     using xid_t = int32_t;
 
@@ -51,7 +51,6 @@ namespace rpc_protocol {
 
     union header_t { request_header req; reply_header rep; };
     const size_t RPC_HEADER_SZ = sizeof(header_t) + sizeof(rpc_sz_t);
-    const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
     const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
 
 #define REMOTE_PROCEDURE_BASE(_base_) \
similarity index 92%
rename from rpc/thread_pool.h
rename to include/rpc/thread_pool.h
index 98ec655..0acb486 100644 (file)
@@ -3,7 +3,7 @@
 
 #include <functional>
 #include <vector>
-#include "fifo.h"
+#include "include/rpc/fifo.h"
 
 typedef std::function<void()> job_t;
 
similarity index 93%
rename from rsm.h
rename to include/rsm.h
index 487a856..68f1729 100644 (file)
--- a/rsm.h
@@ -3,16 +3,16 @@
 #ifndef rsm_h
 #define rsm_h
 
-#include "types.h"
-#include "rsm_protocol.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+#include "include/rpc/rpc.h"
 #include <arpa/inet.h>
-#include "config.h"
+#include "include/config.h"
 
 class rsm_state_transfer {
     public:
-        virtual string marshal_state() = 0;
-        virtual void unmarshal_state(const string &) = 0;
+        virtual string marshall_state() = 0;
+        virtual void unmarshall_state(const string &) = 0;
         virtual ~rsm_state_transfer();
 };
 
diff --git a/include/rsm_client.h b/include/rsm_client.h
new file mode 100644 (file)
index 0000000..407775b
--- /dev/null
@@ -0,0 +1,47 @@
+#ifndef rsm_client_h
+#define rsm_client_h
+
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+
+//
+// rsm client interface.
+//
+// The client stubs package up an rpc, and then call the invoke procedure
+// on the replicated state machine passing the RPC as an argument.  This way
+// the replicated state machine isn't service specific; any server can use it.
+//
+
+class rsm_client {
+    protected:
+        string primary;
+        std::vector<string> known_mems;
+        std::mutex rsm_client_mutex;
+        bool init_members(lock & rsm_client_mutex_lock);
+        rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req);
+        template<class R> int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req);
+    public:
+        rsm_client(string dst);
+
+        template<class P, class R, class ...Args>
+        inline int call(rpc_protocol::proc_checked_t<P> proc, R & r, const Args & ...a1) {
+            static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
+            return call_marshalled(proc, r, marshall(a1...));
+        }
+};
+
+template<class R>
+int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) {
+    string rep, res;
+    int intret = invoke(proc.id, rep, req);
+    VERIFY( intret == rsm_client_protocol::OK );
+    auto u = unmarshall(rep, intret);
+    if (intret < 0) return intret;
+    if ((u >> res).okdone() && unmarshall(res, r).okdone())
+        return intret;
+    LOG << "failed to unmarshall reply \"" << hex_string(rep) << "\" for proceduce "
+        << proc.name << " (0x" << std::hex << proc.id << ").";
+    return rpc_protocol::unmarshall_reply_failure;
+}
+
+#endif
similarity index 95%
rename from rsm_protocol.h
rename to include/rsm_protocol.h
index 5a5b7dd..7743f7f 100644 (file)
@@ -1,8 +1,8 @@
 #ifndef rsm_protocol_h
 #define rsm_protocol_h
 
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
 
 namespace rsm_client_protocol {
     enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY};
similarity index 87%
rename from rsmtest_client.h
rename to include/rsmtest_client.h
index e7add37..cb611e7 100644 (file)
@@ -3,8 +3,8 @@
 #ifndef rsmtest_client_h
 #define rsmtest_client_h
 
-#include "types.h"
-#include "rsm_protocol.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
 
 // Client interface to the rsmtest server
 class rsmtest_client {
similarity index 91%
rename from t4.h
rename to include/t4.h
index 19eaddc..7222c0d 100644 (file)
--- a/t4.h
@@ -1,8 +1,8 @@
 #ifndef t4_h
 #define t4_h
 
-#include "types.h"
-#include "rpc/poll_mgr.h"
+#include "include/types.h"
+#include "include/rpc/poll_mgr.h"
 
 struct t4_state {
     std::mutex log_mutex;
similarity index 78%
rename from types.h
rename to include/types.h
index 53f496d..be6cdd5 100644 (file)
--- a/types.h
@@ -54,13 +54,6 @@ template<class A> struct supports_emplace_back<A,
     decltype(std::declval<A &>().emplace_back(std::declval<typename A::value_type>()), void())
 > : true_type {};
 
-template<typename E> using enum_type_t = typename enable_if<
-    std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
-
-template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
-template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
-
-
 template <class A, typename I=void> struct is_tuple_convertible : false_type {};
 
 template<class A> struct is_tuple_convertible<A,
@@ -120,6 +113,14 @@ operator<<(std::ostream & o, const A & a) {
 inline auto _tuple_() { return std::tie(__VA_ARGS__); } \
 inline auto _tuple_() const { return std::tie(__VA_ARGS__); }
 
+template <class T> inline auto _tuple_(T & t) { return t._tuple_(); }
+
+// specialized tuple adapter for std::pair
+
+template <class A, class B> struct is_tuple_convertible<std::pair<A, B>> : true_type {};
+template <class A, class B> inline auto _tuple_(std::pair<A, B> & t) { return std::tie(t.first, t.second); }
+template <class A, class B> inline auto _tuple_(const std::pair<A, B> & t) { return std::tie(t.first, t.second); }
+
 // struct ordering and comparison operations; requires the use of MEMBERS.
 // usage:
 //
@@ -138,7 +139,9 @@ LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=)
 // are fair game.  
 struct pass { template <typename... Args> inline pass(Args && ...) {} };
 
-#include "endian.h"
+#define UNPACK_STATEMENT(_x_) (void)pass{(_x_)...}
+
+#include "include/endian.h"
 
 #ifndef __has_attribute
 #define __has_attribute(x) 0
@@ -150,4 +153,18 @@ struct pass { template <typename... Args> inline pass(Args && ...) {} };
 #define NORETURN
 #endif
 
+template <class... Args, size_t... Indices> inline void
+tuple_ostream_imp(std::ostream & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
+    UNPACK_STATEMENT(m << std::get<Indices>(t));
+}
+
+template <class... Args> inline std::ostream &
+operator<<(std::ostream & m, tuple<Args...> && t) {
+    tuple_ostream_imp(m, t, std::index_sequence_for<Args...>{});
+    return m;
+}
+
+template <class T> inline typename std::enable_if<is_tuple_convertible<T>::value, std::ostream &>::type
+operator<<(std::ostream & os, const T & t) { return os << _tuple_(t); }
+
 #endif
index ca21d9d..4b26a91 100644 (file)
@@ -1,31 +1,31 @@
 // RPC stubs for clients to talk to lock_server, and cache the locks.
 
-#include "lock_client.h"
+#include "include/lock_client.h"
 #include <arpa/inet.h>
 
-void lock_state::wait(lock & mutex_lock) {
+void lock_client::lock_state::wait(lock & mutex_lock) {
     auto self = std::this_thread::get_id();
     c[self].wait(mutex_lock);
     c.erase(self);
 }
 
-void lock_state::signal() {
+void lock_client::lock_state::signal() {
     // signal anyone
     if (c.begin() != c.end())
         c.begin()->second.notify_one();
 }
 
-void lock_state::signal(thread::id who) {
+void lock_client::lock_state::signal(thread::id who) {
     if (c.count(who))
         c[who].notify_one();
 }
 
-lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
+lock_client::lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
     return lock_table[lid]; // creates the lock if it doesn't already exist
 }
 
-lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
+lock_client::lock_client(string xdst) {
     rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
     id = "127.0.0.1:" + std::to_string(rlock_port);
     rlsrpc = std::make_unique<rpcs>(rlock_port);
@@ -54,8 +54,6 @@ void lock_client::releaser() {
             sl.unlock();
             int r;
             rsmc->call(lock_protocol::release, r, lid, id, st.xid);
-            if (lu)
-                lu->dorelease(lid);
             sl.lock();
         }
         st.state = lock_state::none;
index 86e5ad2..1241409 100644 (file)
@@ -1,27 +1,17 @@
 // the caching lock server implementation
 
-#include "lock_server.h"
+#include "include/lock_server.h"
 #include <unistd.h>
 #include <arpa/inet.h>
 
-lock_state::lock_state():
-    held(false)
-{
+lock_server::lock_state::lock_state(const lock_state & other) {
+    held = other.held;
+    held_by = other.held_by;
+    wanted_by = other.wanted_by;
+    old_requests = other.old_requests;
 }
 
-lock_state::lock_state(const lock_state & other) {
-    *this = other;
-}
-
-lock_state & lock_state::operator=(const lock_state & o) {
-    held = o.held;
-    held_by = o.held_by;
-    wanted_by = o.wanted_by;
-    old_requests = o.old_requests;
-    return *this;
-}
-
-lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
     lock sl(lock_table_lock);
     // this will create the lock if it doesn't already exist
     return lock_table[lid];
@@ -151,12 +141,12 @@ lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, c
     return lock_protocol::OK;
 }
 
-string lock_server::marshal_state() {
+string lock_server::marshall_state() {
     lock sl(lock_table_lock);
-    return marshall(nacquire, lock_table).content();
+    return marshall(nacquire, lock_table);
 }
 
-void lock_server::unmarshal_state(const string & state) {
+void lock_server::unmarshall_state(const string & state) {
     lock sl(lock_table_lock);
-    unmarshall(state, false, nacquire, lock_table);
+    unmarshall(state, nacquire, lock_table);
 }
diff --git a/lock_server.h b/lock_server.h
deleted file mode 100644 (file)
index d182876..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-#ifndef lock_server_h
-#define lock_server_h
-
-#include "types.h"
-#include "lock_protocol.h"
-#include "rsm.h"
-#include "rpc/fifo.h"
-
-typedef std::pair<callback_t, lock_protocol::xid_t> holder_t;
-
-class lock_state {
-public:
-    lock_state();
-    lock_state(const lock_state & other);
-    bool held;
-    holder_t held_by;
-    std::list<holder_t> wanted_by;
-    std::map<callback_t, lock_protocol::xid_t> old_requests;
-    std::mutex m;
-    lock_state & operator=(const lock_state &);
-
-    MEMBERS(held, held_by, wanted_by)
-};
-
-typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
-
-class lock_server : private rsm_state_transfer {
-    private:
-        int nacquire;
-        std::mutex lock_table_lock;
-        lock_map lock_table;
-        lock_state & get_lock_state(lock_protocol::lockid_t lid);
-        fifo<lock_protocol::lockid_t> retry_fifo;
-        fifo<lock_protocol::lockid_t> revoke_fifo;
-        rsm *rsm_;
-        string marshal_state();
-        void unmarshal_state(const string & state);
-        void revoker NORETURN ();
-        void retryer NORETURN ();
-    public:
-        lock_server(rsm & r);
-        lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
-        lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
-};
-
-#endif
index 64f7985..52e6c4f 100644 (file)
@@ -1,4 +1,6 @@
-#include "lock_server.h"
+#include "include/lock_server.h"
+
+using namespace std::chrono;
 
 int main(int argc, char *argv[]) {
     global = new t4_state('s');
@@ -16,5 +18,5 @@ int main(int argc, char *argv[]) {
     rsm.start();
 
     while(1)
-        std::this_thread::sleep_for(milliseconds(1000));
+        std::this_thread::sleep_for(1000ms);
 }
index fbdeeb4..63c0722 100644 (file)
@@ -2,10 +2,12 @@
 // Lock server tester
 //
 
-#include "lock_client.h"
+#include "include/lock_client.h"
 #include <arpa/inet.h>
 #include <unistd.h>
 
+using namespace std::chrono;
+
 // must be >= 2
 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
 static lock_client *lc[nt];
@@ -65,7 +67,7 @@ static void test2(int i) {
     lc[i]->acquire(a);
     LOG_NONMEMBER << "test2: client " << i << " acquire done";
     check_grant(a);
-    std::this_thread::sleep_for(milliseconds(100));
+    std::this_thread::sleep_for(100ms);
     LOG_NONMEMBER << "test2: client " << i << " release";
     check_release(a);
     lc[i]->release(a);
diff --git a/log.cc b/log.cc
index decb827..36f10a3 100644 (file)
--- a/log.cc
+++ b/log.cc
@@ -1,89 +1,31 @@
-#include "log.h"
-#include "paxos.h"
+#include "include/log.h"
+#include "include/paxos.h"
 
-// Paxos must maintain some durable state (i.e., that survives power
-// failures) to run Paxos correct.  This module implements a log with
-// all durable state to run Paxos.  Since the values chosen correspond
-// to views, the log contains all views since the beginning of time.
+// Maintains durable state (i.e. surviving power failures) needed for correct
+// operation of Paxos as a log.
 
-log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) {
-    name = "paxos-" + _me + ".log";
-    logread();
-}
+log::log(string _me) : name("paxos-" + _me + ".log") {}
 
-void log::logread(void) {
-    std::ifstream from(name);
+void log::replay() {
+    auto from = unmarshall(read());
     string type;
-    unsigned instance;
 
-    LOG << "logread";
+    DEBUG_LOG << "Replaying paxos log from disk";
     while (from >> type) {
-        if (type == "done") {
-            string v;
-            from >> instance;
-            from.get();
-            getline(from, v);
-            pxs->values[instance] = v;
-            pxs->instance_h = instance;
-            LOG << "logread: instance: " << instance << " w. v = "
-                << pxs->values[instance];
-            pxs->accepted_value.clear();
-            pxs->promise.n = 0;
-            pxs->accepted.n = 0;
-        } else if (type == "propseen") {
-            from >> pxs->promise.n >> pxs->promise.m;
-            LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")";
-        } else if (type == "accepted") {
-            string v;
-            from >> pxs->accepted.n >> pxs->accepted.m;
-            from.get();
-            getline(from, v);
-            pxs->accepted_value = v;
-            LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value;
+        if (handlers.count(type)) {
+            handlers[type](from);
         } else {
-            LOG << "logread: unknown log record";
+            DEBUG_LOG << "unknown log record";
             VERIFY(0);
         }
-    } 
-    from.close();
-}
-
-string log::dump() {
-    std::ifstream from(name);
-    string res;
-    string v;
-    while (std::getline(from, v))
-        res += v + "\n";
-    from.close();
-    return res;
-}
-
-void log::restore(string s) {
-    LOG << "restore: " << s;
-    std::ofstream f(name, std::ios::trunc);
-    f << s;
-    f.close();
-}
-
-// XXX should be an atomic operation
-void log::loginstance(unsigned instance, string v) {
-    std::ofstream f(name, std::ios::app);
-    f << "done " << instance << " " << v << "\n";
-    f.close();
+    }
 }
 
-// an acceptor should call logprop(promise) when it
-// receives a prepare to which it responds prepare_ok().
-void log::logprop(prop_t promise) {
-    std::ofstream f(name, std::ios::app);
-    f << "propseen " << promise.n << " " << promise.m << "\n";
-    f.close();
+string log::read() {
+    return (std::stringstream() << std::ifstream(name).rdbuf()).str();
 }
 
-// an acceptor should call logaccept(accepted, accepted_value) when it
-// receives an accept RPC to which it replies accept_ok().
-void log::logaccept(prop_t n, string v) {
-    std::ofstream f(name, std::ios::app);
-    f << "accepted " << n.n << " " << n.m << " " << v << "\n";
-    f.close();
+void log::write(string s) {
+    DEBUG_LOG << s;
+    std::ofstream(name, std::ios::trunc) << s;
 }
diff --git a/log.h b/log.h
deleted file mode 100644 (file)
index 201cb80..0000000
--- a/log.h
+++ /dev/null
@@ -1,27 +0,0 @@
-#ifndef log_h
-#define log_h
-
-#include "types.h"
-#include "paxos_protocol.h"
-
-class proposer_acceptor;
-
-class log {
-    private:
-        string name;
-        proposer_acceptor *pxs;
-    public:
-        log (proposer_acceptor*, string _me);
-        string dump();
-        void restore(string s);
-        void logread(void);
-        // Log a committed paxos instance
-        void loginstance(unsigned instance, string v);
-        // Log the highest proposal number that the local paxos acceptor has ever seen
-        void logprop(prop_t n_h);
-        // Log the proposal (proposal number and value) that the local paxos acceptor 
-        // accept has ever accepted
-        void logaccept(prop_t n_a, string v);
-};
-
-#endif
diff --git a/marshall.py b/marshall.py
new file mode 100755 (executable)
index 0000000..c4936fa
--- /dev/null
@@ -0,0 +1,132 @@
+import os, numpy as np
+from clang.cindex import *
+import subprocess
+
+libt4_path = '/Users/peteriannucci/invirt/third/libt4/'
+OPTFLAGS = ['-O3']
+STDLIB = ['-stdlib=libc++']
+PEDANTRY = ['-Werror', '-Weverything', '-Wall', '-Wextra', '-pedantic-errors', '-pedantic',
+                   '-Wno-c++98-compat-pedantic', '-Wno-padded', '-Weffc++',
+                   '-Wno-non-virtual-dtor', '-Wno-weak-vtables']
+CXXFLAGS = ['-x', 'c++', '-I', libt4_path, '-std=c++1y'] + STDLIB + PEDANTRY + OPTFLAGS
+CXXFLAGS += ['-DMARSHALL_RAW_NETWORK_ORDER_AS(a,b)=static void macro_marshall_##a##_as_##b(a first, b second);']
+
+target = 'paxos.cc'
+
+libdir = subprocess.Popen(['llvm-config-mp-3.4', '--libdir'], stdout=subprocess.PIPE).communicate()[0].strip()
+Config.set_library_path(libdir)
+index = Index.create()
+tu = index.parse(os.path.join(libt4_path, target), args=CXXFLAGS)
+
+def dive_all(cursor, kinds, pred=None, depth_limit=-1):
+    if cursor.kind in kinds and (pred is None or pred(cursor)):
+        yield cursor
+    if depth_limit != 0:
+        for c in cursor.get_children():
+            for d in dive_all(c, kinds, pred, depth_limit-1):
+                yield d
+
+def get_label(cursor):
+    try:
+        cursor = dive_all(cursor, (CursorKind.CONVERSION_FUNCTION,), lambda c: c.displayname == 'operator label()', 1).next()
+        cursor = dive_all(cursor, (CursorKind.STRING_LITERAL,)).next()
+        return eval(cursor.get_tokens().next().spelling)
+    except:
+        return None
+
+def get_members(cursor):
+    try:
+        adapter = dive_all(cursor, (CursorKind.CXX_METHOD,), lambda c: c.displayname == '_tuple_()', 1).next()
+        fields = {x.displayname:x for x in dive_all(cursor, (CursorKind.FIELD_DECL,))}
+        return [fields[m.displayname] for m in dive_all(adapter, (CursorKind.MEMBER_REF_EXPR,))]
+    except:
+        return None
+
+class Marshallable:
+    def __init__(self, m, u):
+        self.marshall = m
+        self.unmarshall = u
+
+def MarshallRawNetworkOrderAs(what, as_what):
+    as_what = np.dtype(as_what).newbyteorder('B')
+    def m(x):
+        return np.array([x], as_what).tostring()
+    def u(s):
+        return np.fromstring(s[:as_what.itemsize], as_what).astype(what)[0], s[as_what.itemsize:]
+    mu = Marshallable(m, u)
+    mu.__repr__ = lambda : '<Marshallable(%s)>' % np.dtype(what).name
+    return mu
+
+def RecordClass(displayname, members):
+    member_names = [f.displayname for f in members]
+    member_types = [f.type.get_canonical().spelling for f in members]
+    member_types_nice = [f.type.spelling for f in members]
+    member_marshallers = [types[t] for t in member_types]
+    class Record(object):
+        def __init__(self, *args, **kwargs):
+            if args:
+                assert len(args) == len(member_names)
+                self.__dict__.update(dict(zip(member_names, args)))
+            else:
+                assert set(kwargs.keys()) == set(member_names)
+                self.__dict__.update(kwargs)
+        __init__.__doc__ = 'Required arguments:\n' + '\n'.join('%s (%s)' % (n,t) for t,n in zip(member_types_nice, member_names))
+        @staticmethod
+        def marshall(self):
+            return ''.join(marshaller.marshall(getattr(self, name)) for name, marshaller in zip(member_names, member_marshallers))
+        @staticmethod
+        def unmarshall(s):
+            member_values = []
+            for marshaller in member_marshallers:
+                member_value, s = marshaller.unmarshall(s)
+                member_values.append(member_value)
+            return Record(**dict(zip(member_names, member_values))), s
+        def __repr__(self):
+            return '<%s ' % displayname + ' '.join('%s=%s' % (k, repr(getattr(self, k))) for k in member_names) + '>'
+    Record.__name__ = displayname
+    return Record
+
+class MarshallString:
+    def __init__(self):
+        pass
+    def marshall(self, s):
+        return types['unsigned int'].marshall(len(s)) + s
+    def unmarshall(self, s):
+        l, s = types['unsigned int'].unmarshall(s)
+        return s[:l], s[l:]
+
+types = {}
+labels = {}
+types['std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >'] = MarshallString()
+
+numpy_type = {'bool': np.bool, 'char': np.int8, 'signed char': np.int8, 'unsigned char': np.uint8, 'short': np.short, 'unsigned short': np.ushort,
+              'int': np.intc, 'unsigned int': np.uintc, 'long': np.int_, 'unsigned long': np.uint, 'long long': np.longlong, 'unsigned long long': np.ulonglong}
+
+def processDeclaration(cursor):
+    if cursor.kind == CursorKind.ENUM_DECL:
+        enum_type = cursor.enum_type.get_canonical().spelling
+        if enum_type != '<dependent type>':
+            processDeclaration(cursor.enum_type.get_canonical().get_declaration())
+            assert enum_type in types, enum_type
+            types[cursor.type.get_canonical().spelling] = types[enum_type]
+    elif cursor.kind == CursorKind.FUNCTION_DECL:
+        if cursor.displayname.startswith('macro_marshall_'):
+            what, as_what = [x.type.get_canonical().spelling for x in cursor.get_arguments()]
+            types[what] = MarshallRawNetworkOrderAs(numpy_type[what], numpy_type[as_what])
+    else:
+        members = get_members(cursor)
+        label = get_label(cursor)
+        if members is not None:
+            for f in members:
+                field_type = f.type.get_canonical().spelling
+                processDeclaration(f.type.get_canonical().get_declaration())
+                assert field_type in types, field_type
+            types[cursor.type.get_canonical().spelling] = RecordClass(cursor.displayname, members)
+        if label is not None:
+            labels[cursor.type.get_canonical().spelling] = label
+
+for cursor in dive_all(tu.cursor, (CursorKind.FUNCTION_DECL,)):
+    processDeclaration(cursor)
+
+for cursor in dive_all(tu.cursor, set([CursorKind.STRUCT_DECL, CursorKind.CLASS_DECL, CursorKind.ENUM_DECL])):
+    processDeclaration(cursor)
index c7f2d1d..108dfad 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,6 +1,7 @@
-#include "paxos.h"
+#include "include/paxos.h"
 
 using namespace std::placeholders;
+using namespace std::chrono;
 
 paxos_change::~paxos_change() {}
 
@@ -28,12 +29,24 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
         bool _first, const node_t & _me, const value_t & _value)
     : delegate(_delegate), me (_me)
 {
-    // at this point, the log has already been replayed
-    if (instance_h == 0 && _first) {
-        values[1] = _value;
-        l.loginstance(1, _value);
-        instance_h = 1;
-    }
+    l.handler<log_instance>([this] (auto entry) {
+        instance_h = entry.number;
+        values[entry.number] = entry.value;
+        accepted = promise = {0, me};
+        accepted_value.clear();
+    });
+    l.handler<log_proposal>([this] (auto entry) {
+        promise = entry.promise;
+    });
+    l.handler<log_accept>([this] (auto entry) {
+        accepted = entry.number;
+        accepted_value = entry.value;
+    });
+
+    if (instance_h == 0 && _first)
+        l.append(log_instance{1, _value});
+
+    l.replay();
 
     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
@@ -96,7 +109,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
         auto cl = rpcc::bind_cached(i);
         if (!cl)
             continue;
-        int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100),
+        int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
                 res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
@@ -124,7 +137,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
     bool accept = false;
     for (auto i : nodes) {
         if (auto cl = rpcc::bind_cached(i)) {
-            int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100),
+            int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
                     accept, me, instance, proposal, v);
             if (status == paxos_protocol::OK && accept)
                 accepts.push_back(i);
@@ -136,7 +149,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const
     int res = 0;
     for (auto i : accepts)
         if (auto cl = rpcc::bind_cached(i))
-            cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
+            cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
 }
 
 paxos_protocol::status
@@ -149,7 +162,7 @@ proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance,
     } else if (n > promise) {
         LOG << "looks good to me";
         promise = n;
-        l.logprop(promise);
+        l.append(log_proposal{n});
         r = prepareres{prepareres::accept, accepted, accepted_value};
     } else {
         LOG << "I totally rejected this request.  Ha.";
@@ -169,7 +182,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t
     if (n >= promise) {
         accepted = n;
         accepted_value = v;
-        l.logaccept(accepted, accepted_value);
+        l.append(log_accept{n, v});
         r = true;
     }
     return paxos_protocol::OK;
@@ -197,9 +210,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock &
     LOG << "instance=" << instance << " has v=" << value;
     if (instance > instance_h) {
         LOG << "highestacceptedinstance = " << instance;
-        values[instance] = value;
-        l.loginstance(instance, value);
+        l.append(log_instance{instance, value});
         instance_h = instance;
+        values[instance] = value;
         accepted = promise = {0, me};
         accepted_value.clear();
         if (delegate) {
index 8966cc2..7a6371a 100644 (file)
@@ -1,10 +1,10 @@
-#include "connection.h"
-#include "rpc_protocol.h"
+#include "include/rpc/connection.h"
+#include "include/rpc/rpc_protocol.h"
 #include <cerrno>
 #include <csignal>
 #include <netinet/tcp.h>
 #include <unistd.h>
-#include "marshall.h"
+#include "include/rpc/marshall.h"
 
 connection_delegate::~connection_delegate() {}
 
@@ -24,13 +24,12 @@ connection::~connection() {
         if (dead_)
             return;
         dead_ = true;
-        shutdown(fd,SHUT_RDWR);
+        fd.shutdown(SHUT_RDWR);
     }
     // after block_remove_fd, select will never wait on fd and no callbacks
     // will be active
     global->shared_mgr.block_remove_fd(fd);
-    VERIFY(dead_);
-    VERIFY(wpdu_.status == unused);
+    VERIFY(dead_ && wpdu_.status == unused);
 }
 
 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
@@ -58,7 +57,7 @@ bool connection::send(const string & b) {
 
     if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
         IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
-        shutdown(fd,SHUT_RDWR);
+        fd.shutdown(SHUT_RDWR);
     }
 
     if (!writepdu()) {
@@ -103,7 +102,7 @@ bool connection::writepdu() {
     if (wpdu_.cursor == wpdu_.buf.size())
         return true;
 
-    ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
+    ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
     if (n < 0) {
         if (errno != EAGAIN) {
             IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
@@ -189,8 +188,8 @@ bool connection::readpdu() {
 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
 {
-    tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
-    tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
+    tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1});
+    tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1});
     tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
     tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
 
diff --git a/rpc/marshall.h b/rpc/marshall.h
deleted file mode 100644 (file)
index bf3cbc7..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-#ifndef marshall_h
-#define marshall_h
-
-#include "types.h"
-#include "rpc_protocol.h"
-
-class marshall {
-    private:
-        string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0);
-        size_t index_ = rpc_protocol::RPC_HEADER_SZ;
-
-    public:
-        template <typename... Args>
-        marshall(const Args & ... args) {
-            (void)pass{(*this << args)...};
-        }
-
-        void write(const void *p, size_t n) {
-            if (index_+n > buf_.size())
-                buf_.resize(index_+n);
-            std::copy((char *)p, (char *)p+n, &buf_[index_]);
-            index_ += n;
-        }
-
-        // with header
-        inline operator string() const { return buf_.substr(0,index_); }
-        // without header
-        inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
-
-        // letting S be a defaulted template parameter forces the compiler to
-        // delay looking up operator<<(marshall &, rpc_sz_t) until we define it
-        // (i.e. we define an operator for marshalling uint32_t)
-        template <class T, class S=rpc_protocol::rpc_sz_t> inline void
-        write_header(const T & h) {
-            VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ);
-            size_t saved_sz = index_;
-            index_ = 0;
-            *this << (S)(saved_sz - sizeof(S)) << (T)h;
-            index_ = saved_sz;
-        }
-};
-
-class unmarshall {
-    private:
-        string buf_;
-        size_t index_ = rpc_protocol::RPC_HEADER_SZ;
-        bool ok_ = false;
-
-    public:
-        template <typename... Args>
-        unmarshall(const string & s, bool has_header, Args && ... args)
-            : buf_(s) {
-            if (!has_header)
-                buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
-            ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
-            (void)pass{(*this >> args)...};
-        }
-
-        inline bool ok() const { return ok_; }
-        inline bool okdone() const { return ok_ && index_ == buf_.size(); }
-
-        void read(void * t, size_t n) {
-            if (index_+n > buf_.size())
-                ok_ = false;
-            if (ok_) {
-                std::copy(&buf_[index_], &buf_[index_+n], (char *)t);
-                index_ += n;
-            }
-        }
-
-        template <class T> inline void
-        read_header(T & h) {
-            VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ);
-            // first 4 bytes hold length field
-            index_ = sizeof(rpc_protocol::rpc_sz_t);
-            *this >> h;
-            index_ = rpc_protocol::RPC_HEADER_SZ;
-        }
-
-        template <class T> inline T _grab() { T t; *this >> t; return t; }
-};
-
-//
-// Marshalling for plain old data
-//
-
-#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
-inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.write(&y, sizeof(_d_)); return m; } \
-inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
-
-#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
-
-MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
-MARSHALL_RAW_NETWORK_ORDER(uint8_t)
-MARSHALL_RAW_NETWORK_ORDER(int8_t)
-MARSHALL_RAW_NETWORK_ORDER(uint16_t)
-MARSHALL_RAW_NETWORK_ORDER(int16_t)
-MARSHALL_RAW_NETWORK_ORDER(uint32_t)
-MARSHALL_RAW_NETWORK_ORDER(int32_t)
-MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t)
-MARSHALL_RAW_NETWORK_ORDER(uint64_t)
-MARSHALL_RAW_NETWORK_ORDER(int64_t)
-
-//
-// Marshalling for tuples (used to implement marshalling for structs)
-//
-
-// In order to iterate over the tuple elements, we first need a template
-// parameter pack containing the tuple's indices.  The function templates named
-// *_imp below accept an empty tag struct as their last argument, and use its
-// template arguments to index the tuple.  The operator<< overloads instantiate
-// the appropriate tag struct to make this possible.
-
-template <class... Args, size_t... Indices> inline marshall &
-tuple_marshall_imp(marshall & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
-    // Note that brace initialization is used for the empty structure "pack",
-    // forcing the comma-separated expressions expanded from the parameter pack
-    // to be evaluated in order.  Order matters because the elements must be
-    // serialized consistently!  The empty struct resulting from construction
-    // is discarded.
-    (void)pass{(m << std::get<Indices>(t))...};
-    return m;
-}
-
-template <class... Args> inline marshall &
-operator<<(marshall & m, tuple<Args...> && t) {
-    return tuple_marshall_imp(m, t, std::index_sequence_for<Args...>{});
-}
-
-template <class... Args, size_t... Indices> inline unmarshall &
-tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, std::index_sequence<Indices...>) {
-    (void)pass{(u >> std::get<Indices>(t))...};
-    return u;
-}
-
-template <class... Args> inline unmarshall &
-operator>>(unmarshall & u, tuple<Args & ...> && t) {
-    return tuple_unmarshall_imp(u, t, std::index_sequence_for<Args...>{});
-}
-
-//
-// Marshalling for structs or classes containing a MEMBERS declaration
-//
-
-// Implements struct marshalling via tuple marshalling of members.
-template <class T> inline typename
-enable_if<is_tuple_convertible<T>::value, unmarshall>::type &
-operator>>(unmarshall & u, T & a) { return u >> a._tuple_(); }
-
-template <class T> inline typename
-enable_if<is_tuple_convertible<T>::value, marshall>::type &
-operator<<(marshall & m, const T a) { return m << a._tuple_(); }
-
-//
-// Marshalling for STL containers
-//
-
-// this overload is visible for type A only if A::cbegin and A::cend exist
-template <class A> inline typename
-enable_if<is_const_iterable<A>::value, marshall>::type &
-operator<<(marshall & m, const A & x) {
-    m << (uint32_t)x.size();
-    for (const auto & a : x)
-        m << a;
-    return m;
-}
-
-// visible for type A if A::emplace_back(a) makes sense
-template <class A> inline typename
-enable_if<supports_emplace_back<A>::value, unmarshall>::type &
-operator>>(unmarshall & u, A & x) {
-    uint32_t n = u._grab<uint32_t>();
-    x.clear();
-    while (n--)
-        x.emplace_back(u._grab<typename A::value_type>());
-    return u;
-}
-
-// std::pair<A, B>
-template <class A, class B> inline marshall &
-operator<<(marshall & m, const std::pair<A,B> & d) {
-    return m << d.first << d.second;
-}
-
-template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, std::pair<A,B> & d) {
-    return u >> d.first >> d.second;
-}
-
-// std::map<A, B>
-template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, std::map<A,B> & x) {
-    uint32_t n = u._grab<uint32_t>();
-    x.clear();
-    while (n--)
-        x.emplace(u._grab<std::pair<A,B>>());
-    return u;
-}
-
-// std::string
-inline marshall & operator<<(marshall & m, const string & s) {
-    m << (uint32_t)s.size();
-    m.write(s.data(), s.size());
-    return m;
-}
-
-inline unmarshall & operator>>(unmarshall & u, string & s) {
-    uint32_t sz = u._grab<uint32_t>();
-    if (u.ok()) {
-        s.resize(sz);
-        u.read(&s[0], sz);
-    }
-    return u;
-}
-
-//
-// Marshalling for strongly-typed enums
-//
-
-template <class E> typename enable_if<std::is_enum<E>::value, marshall>::type &
-operator<<(marshall & m, E e) {
-    return m << from_enum(e);
-}
-
-template <class E> typename enable_if<std::is_enum<E>::value, unmarshall>::type &
-operator>>(unmarshall & u, E & e) {
-    e = to_enum<E>(u._grab<enum_type_t<E>>());
-    return u;
-}
-
-//
-// Recursive marshalling
-//
-
-inline marshall & operator<<(marshall & m, marshall & n) {
-    return m << n.content();
-}
-
-inline unmarshall & operator>>(unmarshall & u, unmarshall & v) {
-    v = unmarshall(u._grab<string>(), false);
-    return u;
-}
-
-#endif
index 102b8dc..a3ab08a 100644 (file)
@@ -1,8 +1,8 @@
-#include "poll_mgr.h"
+#include "include/rpc/poll_mgr.h"
 #include <errno.h>
 #include <sys/select.h>
-#include "file.h"
-#include "threaded_log.h"
+#include "include/rpc/file.h"
+#include "include/debug.h"
 
 #ifdef __linux__
 #include <sys/epoll.h>
index 8f868f5..e33b25e 100644 (file)
@@ -52,7 +52,7 @@
 // x exited worker threads).
 //
 
-#include "rpc.h"
+#include "include/rpc/rpc.h"
 
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
@@ -106,7 +106,7 @@ shared_ptr<rpcc> rpcc::bind_cached(const string & destination) {
     lock cl = lock(client->bind_m_);
     if (!client->bind_done_) {
         LOG_NONMEMBER << "bind(\"" << destination << "\")";
-        int ret = client->bind(milliseconds(1000));
+        int ret = client->bind(1000ms);
         if (ret < 0) {
             LOG_NONMEMBER << "bind failure! " << destination << " " << ret;
             client.reset();
@@ -149,7 +149,7 @@ int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, st
 
     caller ca(0, &rep);
     xid_t xid_rep;
-    marshall datagram = req;
+    string datagram;
     {
         lock ml(m_);
 
@@ -164,9 +164,9 @@ int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, st
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        datagram.write_header(rpc_protocol::request_header{
+        datagram = marshall::datagram(rpc_protocol::request_header{
                 ca.xid, proc.id, clt_nonce_, srv_nonce_, xid_rep_window_.front()
-                });
+            }, req);
         xid_rep = xid_rep_window_.front();
     }
 
@@ -272,11 +272,9 @@ void rpcc::get_latest_connection(shared_ptr<connection> & ch) {
 // Runs in poll_mgr's thread as an upcall from the connection object to the
 // rpcc.  Does not call blocking RPC handlers.
 bool rpcc::got_pdu(const shared_ptr<connection> &, const string & b) {
-    unmarshall rep(b, true);
     rpc_protocol::reply_header h;
-    rep.read_header(h);
 
-    if (!rep.ok()) {
+    if (!unmarshall::datagram(b, h)) {
         IF_LEVEL(1) LOG << "unmarshall header failed!!!";
         return true;
     }
@@ -354,21 +352,20 @@ bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
 }
 
 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
-    unmarshall req(buf, true);
-
     rpc_protocol::request_header h;
-    req.read_header(h);
-    proc_id_t proc = h.proc;
 
-    if (!req.ok()) {
+    auto req = unmarshall::datagram(buf, h);
+
+    if (!req) {
         IF_LEVEL(1) LOG << "unmarshall header failed";
         return;
     }
 
+    proc_id_t proc = h.proc;
+
     IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep "
                     << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
 
-    marshall rep;
     rpc_protocol::reply_header rh{h.xid,0};
 
     // is client sending to an old instance of server?
@@ -376,8 +373,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
         IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
                         << " (current " << nonce_ << ") proc " << std::hex << proc;
         rh.ret = rpc_protocol::oldsrv_failure;
-        rep.write_header(rh);
-        c->send(rep);
+        c->send(marshall::datagram(rh));
         return;
     }
 
@@ -418,17 +414,19 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
 
     switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, stored_reply)) {
         case NEW: // new request
-            rh.ret = (*f)(std::forward<unmarshall>(req), rep);
-            if (rh.ret == rpc_protocol::unmarshall_args_failure) {
-                LOG << "failed to unmarshall the arguments. You are "
-                    << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
-                    << "types of arguments.";
-                VERIFY(0);
-            }
-            VERIFY(rh.ret >= 0);
+            {
+                marshall rep;
+                rh.ret = (*f)(std::forward<unmarshall>(req), rep);
+                if (rh.ret == rpc_protocol::unmarshall_args_failure) {
+                    LOG << "failed to unmarshall the arguments. You are "
+                        << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
+                        << "types of arguments.";
+                    VERIFY(0);
+                }
+                VERIFY(rh.ret >= 0);
 
-            rep.write_header(rh);
-            stored_reply = rep;
+                stored_reply = marshall::datagram(rh, rep);
+            }
 
             IF_LEVEL(2) LOG << "sending and saving reply of size " << stored_reply.size() << " for rpc "
                             << h.xid << ", proc " << std::hex << proc << " ret " << std::dec
@@ -453,8 +451,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
         case FORGOTTEN: // very old request and we don't have the response anymore
             IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce;
             rh.ret = rpc_protocol::atmostonce_failure;
-            rep.write_header(rh);
-            c->send(rep);
+            c->send(marshall::datagram(rh));
             break;
     }
 }
index 0435ab1..968fc22 100644 (file)
@@ -1,13 +1,13 @@
 // RPC test and pseudo-documentation.
 // generates print statements on failures, but eventually says "rpctest OK"
 
-#include "types.h"
-#include "rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
 #include <arpa/inet.h>
 #include <getopt.h>
 #include <unistd.h>
 #include <string.h>
-#include "threaded_log.h"
+#include "include/debug.h"
 
 #define NUM_CL 2
 
@@ -21,19 +21,8 @@ using std::endl;
 using namespace std::chrono;
 using std::vector;
 
-// server-side handlers. they must be methods of some class
-// to simplify rpcs::reg(). a server process can have handlers
-// from multiple classes.
-class srv {
-    public:
-        int handle_22(string & r, const string a, const string b);
-        int handle_fast(int & r, const int a);
-        int handle_slow(int & r, const int a);
-        int handle_bigrep(string & r, const size_t a);
-};
-
 namespace srv_protocol {
-    using status = rpc_protocol::status;
+    enum status : rpc_protocol::status {OK};
     REMOTE_PROCEDURE_BASE(0);
     REMOTE_PROCEDURE(22, _22, (string &, string, string));
     REMOTE_PROCEDURE(23, fast, (int &, int));
@@ -41,6 +30,17 @@ namespace srv_protocol {
     REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
 }
 
+// server-side handlers. they must be methods of some class
+// to simplify rpcs::reg(). a server process can have handlers
+// from multiple classes.
+class srv {
+    public:
+        srv_protocol::status handle_22(string & r, const string a, const string b);
+        srv_protocol::status handle_fast(int & r, const int a);
+        srv_protocol::status handle_slow(int & r, const int a);
+        srv_protocol::status handle_bigrep(string & r, const size_t a);
+};
+
 // a handler. a and b are arguments, r is the result.
 // there can be multiple arguments but only one result.
 // the caller also gets to see the int return value
@@ -48,26 +48,26 @@ namespace srv_protocol {
 // rpcs::reg() decides how to unmarshall by looking
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
-int srv::handle_22(string & r, const string a, string b) {
+srv_protocol::status srv::handle_22(string & r, const string a, string b) {
     r = a + b;
-    return 0;
+    return srv_protocol::OK;
 }
 
-int srv::handle_fast(int & r, const int a) {
+srv_protocol::status srv::handle_fast(int & r, const int a) {
     r = a + 1;
-    return 0;
+    return srv_protocol::OK;
 }
 
-int srv::handle_slow(int & r, const int a) {
-    int us = std::uniform_int_distribution<>(0,500)(global->random_generator);
-    std::this_thread::sleep_for(microseconds(us));
+srv_protocol::status srv::handle_slow(int & r, const int a) {
+    auto duration = std::uniform_int_distribution<>(0,500)(global->random_generator) * 1us;
+    std::this_thread::sleep_for(duration);
     r = a + 2;
-    return 0;
+    return srv_protocol::OK;
 }
 
-int srv::handle_bigrep(string & r, const size_t len) {
+srv_protocol::status srv::handle_bigrep(string & r, const size_t len) {
     r = string(len, 'x');
-    return 0;
+    return srv_protocol::OK;
 }
 
 static srv service;
@@ -84,8 +84,7 @@ static void startserver() {
 static void testmarshall() {
     marshall m;
     rpc_protocol::request_header rh{1,2,3,4,5};
-    m.write_header(rh);
-    VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
+    VERIFY(marshall::datagram(rh, m).size()==rpc_protocol::RPC_HEADER_SZ);
     int i = 12345;
     unsigned long long l = 1223344455L;
     size_t sz = 101010101;
@@ -97,12 +96,11 @@ static void testmarshall() {
     m << sz;
     m << bin;
 
-    string b = m;
-    VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size());
+    string b = marshall::datagram(rh, m);
+    VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint64_t)+sizeof(uint32_t)+bin.size());
 
-    unmarshall un(b, true);
     rpc_protocol::request_header rh1;
-    un.read_header(rh1);
+    auto un = unmarshall::datagram(b, rh1);
     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
     int i1;
     unsigned long long l1;
@@ -143,7 +141,7 @@ static void client1(size_t cl) {
 
         int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
         auto end = steady_clock::now();
-        auto diff = duration_cast<milliseconds>(end - start).count();
+        auto diff = (end - start) / 1ms;
         if (ret != 0)
             cout << diff << " ms have elapsed!!!" << endl;
         VERIFY(ret == 0);
@@ -172,7 +170,7 @@ static void client3(void *xx) {
 
     for(int i = 0; i < 4; i++){
         int rep = 0;
-        int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
+        int ret = c->call_timeout(srv_protocol::slow, 300ms, rep, i);
         VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
     }
 }
@@ -190,14 +188,14 @@ static void simple_tests(rpcc *c) {
     cout << "   -- string concat RPC .. ok" << endl;
 
     // small request, big reply (perhaps req via UDP, reply via TCP)
-    intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
+    intret = c->call_timeout(srv_protocol::bigrep, 20000ms, rep, 70000ul);
     VERIFY(intret == 0);
     VERIFY(rep.size() == 70000);
     cout << "   -- small request, big reply .. ok" << endl;
 
     // specify a timeout value to an RPC that should succeed (udp)
     int xx = 0;
-    intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
+    intret = c->call_timeout(srv_protocol::fast, 300ms, xx, 77);
     VERIFY(intret == 0 && xx == 78);
     cout << "   -- no spurious timeout .. ok" << endl;
 
@@ -205,7 +203,7 @@ static void simple_tests(rpcc *c) {
     {
         string arg(1000, 'x');
         string rep2;
-        c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
+        c->call_timeout(srv_protocol::_22, 300ms, rep2, arg, (string)"x");
         VERIFY(rep2.size() == 1001);
         cout << "   -- no spurious timeout .. ok" << endl;
     }
@@ -221,7 +219,7 @@ static void simple_tests(rpcc *c) {
     string non_existent = "127.0.0.1:7661";
     rpcc *c1 = new rpcc(non_existent);
     time_t t0 = time(0);
-    intret = c1->bind(milliseconds(300));
+    intret = c1->bind(300ms);
     time_t t1 = time(0);
     VERIFY(intret < 0 && (t1 - t0) <= 4);
     cout << "   -- rpc timeout .. ok" << endl;
@@ -283,7 +281,7 @@ static void failure_test() {
     delete server;
 
     client1 = new rpcc(*dst);
-    VERIFY (client1->bind(milliseconds(3000)) < 0);
+    VERIFY (client1->bind(3000ms) < 0);
     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
     delete client1;
@@ -421,5 +419,5 @@ int main(int argc, char *argv[]) {
     }
 
     while (1)
-        std::this_thread::sleep_for(milliseconds(100));
+        std::this_thread::sleep_for(100ms);
 }
index 2c4ab06..1c104d8 100644 (file)
@@ -1,4 +1,4 @@
-#include "thread_pool.h"
+#include "include/rpc/thread_pool.h"
 
 thread_pool::thread_pool(size_t sz) : th_(sz) {
     for (auto & t : th_)
diff --git a/rsm.cc b/rsm.cc
index c651e86..7104a09 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
 // The rule is that a module releases its internal locks before it
 // upcalls, but can keep its locks when calling down.
 
-#include "rsm.h"
-#include "rsm_client.h"
+#include "include/rsm.h"
+#include "include/rsm_client.h"
 #include <unistd.h>
 
 using std::vector;
+using namespace std::chrono;
 
 rsm_state_transfer::~rsm_state_transfer() {}
 
@@ -130,7 +131,7 @@ void rsm::recovery() {
                 commit_change(cfg->view_id(), ml);
             } else {
                 ml.unlock();
-                std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
+                std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary?
                 ml.lock();
             }
         }
@@ -174,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) {
     rsm_mutex_lock.lock();
     // Start accepting synchronization request (statetransferreq) now!
     insync = true;
-    cfg->get_view(vid_insync, backups);
+    backups = cfg->get_view(vid_insync);
     backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
     LOG << "backups " << backups;
     sync_cond.wait(rsm_mutex_lock);
@@ -208,7 +209,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         rsm_mutex_lock.unlock();
         cl = rpcc::bind_cached(m);
         if (cl) {
-            ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
+            ret = cl->call_timeout(rsm_protocol::transferreq, 100ms,
                     r, cfg->myaddr(), last_myvs, vid_insync);
         }
         rsm_mutex_lock.lock();
@@ -218,7 +219,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock)
         return false;
     }
     if (stf && last_myvs != r.last) {
-        stf->unmarshal_state(r.state);
+        stf->unmarshall_state(r.state);
     }
     last_myvs = r.last;
     LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
@@ -247,7 +248,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) {
     rsm_mutex_lock.unlock();
     auto cl = rpcc::bind_cached(m);
     if (cl)
-        ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+        ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs);
     rsm_mutex_lock.lock();
 
     if (cl == 0 || ret != rsm_protocol::OK) {
@@ -290,8 +291,8 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
     handler *h = procs[procno];
     VERIFY(h);
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
-    r = marshall(ret, rep.content()).content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep);
+    r = marshall(ret, rep);
 }
 
 static void logHexString(locked_ostream && log, const string & s) {
@@ -322,23 +323,23 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
         if (primary != myaddr)
             return rsm_client_protocol::NOTPRIMARY;
         LOG << "Assigning a viewstamp";
-        cfg->get_view(vid_commit, m);
+        m = cfg->get_view(vid_commit);
         // assign the RPC the next viewstamp number
         vs = myvs;
         myvs++;
     }
 
-    // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+    // send an invoke RPC to all slaves in the current view with a timeout
     LOG << "Invoking slaves";
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != myaddr) {
+    for (auto & mm : m) {
+        if (mm != myaddr) {
             // if invoke on slave fails, return rsm_client_protocol::BUSY
-            LOG << "Sending invoke to " << m[i];
-            auto cl = rpcc::bind_cached(m[i]);
+            LOG << "Sending invoke to " << mm;
+            auto cl = rpcc::bind_cached(mm);
             if (!cl)
                 return rsm_client_protocol::BUSY;
             int ignored_rval;
-            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+            auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req);
             LOG << "Invoke returned " << ret;
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
@@ -364,7 +365,6 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
 rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
     LOG << "invoke procno 0x" << std::hex << proc;
     lock ml(invoke_mutex);
-    vector<string> m;
     string myaddr;
     {
         lock ml2(rsm_mutex);
@@ -377,7 +377,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
         myaddr = cfg->myaddr();
         if (primary == myaddr)
             return rsm_protocol::ERR;
-        cfg->get_view(vid_commit, m);
+        vector<string> m = cfg->get_view(vid_commit);
         if (std::find(m.begin(), m.end(), myaddr) == m.end())
             return rsm_protocol::ERR;
         // check sequence number
@@ -404,7 +404,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const strin
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
     if (stf && last != last_myvs)
-        r.state = stf->marshal_state();
+        r.state = stf->marshall_state();
     r.last = last_myvs;
     return rsm_protocol::OK;
 }
@@ -465,9 +465,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 // primary failure
 //
 rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
-    vector<string> m;
     lock ml(rsm_mutex);
-    cfg->get_view(vid_commit, m);
+    vector<string> m = cfg->get_view(vid_commit);
     m.push_back(primary);
     r = m;
     LOG << "return " << m << " m " << primary;
@@ -478,9 +477,7 @@ rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
 // otherwise, the lowest number node of the previous view.
 // caller should hold rsm_mutex
 void rsm::set_primary(unsigned vid) {
-    vector<string> c, p;
-    cfg->get_view(vid, c);
-    cfg->get_view(vid - 1, p);
+    vector<string> c = cfg->get_view(vid), p = cfg->get_view(vid - 1);
     VERIFY (c.size() > 0);
 
     if (isamember(primary,c)) {
@@ -489,9 +486,9 @@ void rsm::set_primary(unsigned vid) {
     }
 
     VERIFY(p.size() > 0);
-    for (unsigned i = 0; i < p.size(); i++) {
-        if (isamember(p[i], c)) {
-            primary = p[i];
+    for (auto & pp : p) {
+        if (isamember(pp, c)) {
+            primary = pp;
             LOG << "primary is " << primary;
             return;
         }
@@ -509,12 +506,10 @@ bool rsm::amiprimary() {
 
 void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
     VERIFY(rsm_mutex_lock);
-    vector<string> m;
-    cfg->get_view(vid_commit, m);
-    for (unsigned i  = 0; i < m.size(); i++) {
-        if (m[i] != cfg->myaddr()) {
-            LOG << "member " << m[i] << " " << heal;
-            if (auto cl = rpcc::bind_cached(m[i]))
+    for (auto & mm : cfg->get_view(vid_commit)) {
+        if (mm != cfg->myaddr()) {
+            LOG << "member " << mm << " " << heal;
+            if (auto cl = rpcc::bind_cached(mm))
                 cl->set_reachable(heal);
         }
     }
index 4a484ae..b24f056 100644 (file)
@@ -1,7 +1,9 @@
-#include "rsm_client.h"
+#include "include/rsm_client.h"
 #include <arpa/inet.h>
 #include <unistd.h>
 
+using namespace std::chrono;
+
 rsm_client::rsm_client(string dst) : primary(dst) {
     LOG << "create rsm_client";
     lock ml(rsm_client_mutex);
@@ -9,12 +11,7 @@ rsm_client::rsm_client(string dst) : primary(dst) {
     LOG << "done";
 }
 
-void rsm_client::primary_failure(lock &) {
-    primary = known_mems.back();
-    known_mems.pop_back();
-}
-
-rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
+rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) {
     lock ml(rsm_client_mutex);
     while (1) {
         LOG << "proc " << std::hex << proc << " primary " << primary;
@@ -24,7 +21,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
         auto cl = rpcc::bind_cached(prim);
         auto ret = rsm_client_protocol::OK;
         if (cl)
-            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
+            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req);
         ml.lock();
 
         if (!cl)
@@ -35,7 +32,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
             return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
             LOG << "rsm is busy " << prim;
-            std::this_thread::sleep_for(milliseconds(300));
+            std::this_thread::sleep_for(300ms);
             continue;
         }
         if (ret == rsm_client_protocol::NOTPRIMARY) {
@@ -45,7 +42,8 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s
         }
 prim_fail:
         LOG << "primary " << prim << " failed ret " << std::dec << ret;
-        primary_failure(ml);
+        primary = known_mems.back();
+        known_mems.pop_back();
         LOG << "retry new primary " << prim;
     }
 }
@@ -57,14 +55,13 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
     shared_ptr<rpcc> cl;
     {
         rsm_client_mutex_lock.unlock();
-        cl = rpcc::bind_cached(prim);
-        if (cl)
-            ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
+        if ((cl = rpcc::bind_cached(prim)))
+            ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0);
         rsm_client_mutex_lock.lock();
     }
-    if (cl == 0 || ret != rsm_protocol::OK)
+    if (ret != rsm_protocol::OK)
         return false;
-    if (known_mems.size() < 1) {
+    if (!known_mems.size()) {
         LOG << "do not know any members!";
         VERIFY(0);
     }
diff --git a/rsm_client.h b/rsm_client.h
deleted file mode 100644 (file)
index 32dde43..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef rsm_client_h
-#define rsm_client_h
-
-#include "types.h"
-#include "rsm_protocol.h"
-
-//
-// rsm client interface.
-//
-// The client stubs package up an rpc, and then call the invoke procedure
-// on the replicated state machine passing the RPC as an argument.  This way
-// the replicated state machine isn't service specific; any server can use it.
-//
-
-class rsm_client {
-    protected:
-        string primary;
-        std::vector<string> known_mems;
-        std::mutex rsm_client_mutex;
-        void primary_failure(lock & rsm_client_mutex_lock);
-        bool init_members(lock & rsm_client_mutex_lock);
-        rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req);
-        template<class R> int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req);
-    public:
-        rsm_client(string dst);
-
-        template<class P, class R, class ...Args>
-        inline int call(rpc_protocol::proc_checked_t<P> proc, R & r, const Args & ...a1) {
-            static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
-            return call_marshalled(proc, r, marshall(a1...));
-        }
-};
-
-inline string hexify(const string & s) {
-    string bytes;
-    for (char ch : s) {
-        bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]);
-        bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]);
-    }
-    return bytes;
-}
-
-template<class R>
-int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) {
-    string rep;
-    int intret = invoke(proc.id, rep, req.content());
-    VERIFY( intret == rsm_client_protocol::OK );
-    unmarshall u(rep, false, intret);
-    if (intret < 0) return intret;
-    string res;
-    u >> res;
-    if (!u.okdone()) {
-        LOG << "failed to unmarshall the reply.";
-        LOG << "You probably forgot to set the reply string in "
-            << "rsm::client_invoke, or you may have called RPC "
-            << proc.name << " (0x" << std::hex << proc.id << ") with the wrong return type";
-        LOG << "here's what I got: \"" << hexify(rep) << "\"";
-        VERIFY(0);
-        return rpc_protocol::unmarshall_reply_failure;
-    }
-    if(!unmarshall(res, false, r).okdone()) {
-        LOG << "failed to unmarshall the reply.";
-        LOG << "You are probably calling RPC " << proc.name << " (0x"
-            << std::hex << proc.id << ") with the wrong return type.";
-        LOG << "here's what I got: \"" << hexify(res) << "\"";
-        VERIFY(0);
-        return rpc_protocol::unmarshall_reply_failure;
-    }
-    return intret;
-}
-
-#endif
index 5507f80..42aaebe 100644 (file)
@@ -2,9 +2,9 @@
 // RSM test client
 //
 
-#include "types.h"
-#include "rsm_protocol.h"
-#include "rsmtest_client.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+#include "include/rsmtest_client.h"
 
 int main(int argc, char *argv[]) {
     global = new t4_state('t');
@@ -15,13 +15,11 @@ int main(int argc, char *argv[]) {
 
     rsmtest_client *lc = new rsmtest_client(argv[1]);
     string command(argv[2]);
-    if (command == "partition") {
+    if (command == "partition")
         LOG_NONMEMBER << "net_repair returned " << lc->net_repair(std::stoi(argv[3]));
-    } else if (command == "breakpoint") {
-        int b = std::stoi(argv[3]);
-        LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b);
-    } else {
+    else if (command == "breakpoint")
+        LOG_NONMEMBER << "breakpoint " << argv[3] << " returned " << lc->breakpoint(std::stoi(argv[3]));
+    else
         LOG_NONMEMBER << "Unknown command " << argv[2];
-    }
     return 0;
 }
index e9c8001..48744d3 100644 (file)
@@ -1,23 +1,21 @@
 // RPC stubs for clients to talk to rsmtest_server
 
-#include "rsmtest_client.h"
+#include "include/rsmtest_client.h"
 #include <arpa/inet.h>
 
 rsmtest_client::rsmtest_client(string dst) {
     if (!(cl = rpcc::bind_cached(dst)))
-        LOG << "rsmtest_client: call bind";
+        LOG << "could not bind to " << dst;
 }
 
 rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
     rsm_test_protocol::status r = rsm_test_protocol::ERR;
-    auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::net_repair, r, heal);
-    VERIFY (ret == rsm_test_protocol::OK);
+    VERIFY (cl->call(rsm_test_protocol::net_repair, r, heal) == rsm_test_protocol::OK);
     return r;
 }
 
 rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
     rsm_test_protocol::status r = rsm_test_protocol::ERR;
-    auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::breakpoint, r, b);
-    VERIFY (ret == rsm_test_protocol::OK);
+    VERIFY (cl->call(rsm_test_protocol::breakpoint, r, b) == rsm_test_protocol::OK);
     return r;
 }
diff --git a/t4.cc b/t4.cc
index 834136b..9b90f5d 100644 (file)
--- a/t4.cc
+++ b/t4.cc
@@ -1,6 +1,6 @@
-#include "t4.h"
+#include "include/t4.h"
 #include <unistd.h>
-#include "rpc/rpc.h"
+#include "include/rpc/rpc.h"
 
 using namespace std::chrono;
 
@@ -9,7 +9,7 @@ t4_state *global;
 t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) {
     uint32_t seed = std::random_device()();
     auto time = system_clock::now().time_since_epoch();
-    auto ticks = duration_cast<nanoseconds>(time).count();
+    auto ticks = time / 1ns;
     seed ^= (uint32_t)ticks;
     auto pid = getpid();
     seed ^= (uint32_t)pid;
@@ -17,7 +17,7 @@ t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) {
     seed ^= (uint32_t)tid;
     random_generator.seed(seed);
     // make sure the clock will read differently next time!
-    std::this_thread::sleep_for(microseconds(1));
+    std::this_thread::sleep_for(1us);
 }
 
 t4_state::~t4_state() {