From ab9eee5d7f1fbe7a3fe6229d4a78136efb14371b Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Fri, 18 Jul 2014 13:48:07 -0400 Subject: [PATCH] So many changes. Broken. --- Makefile | 55 ++++-- Makefile.osx | 32 +--- config.cc | 90 +++++----- threaded_log.cc => debug.cc | 18 +- config.h => include/config.h | 10 +- threaded_log.h => include/debug.h | 7 +- endian.h => include/endian.h | 1 + lock_client.h => include/lock_client.h | 65 +++---- lock_protocol.h => include/lock_protocol.h | 4 +- include/lock_server.h | 41 +++++ include/log.h | 38 ++++ maybe.h => include/maybe.h | 0 paxos.h => include/paxos.h | 42 ++++- paxos_protocol.h => include/paxos_protocol.h | 7 +- {rpc => include/rpc}/connection.h | 12 +- {rpc => include/rpc}/fifo.h | 10 +- {rpc => include/rpc}/file.h | 30 ++-- include/rpc/marshall.h | 187 ++++++++++++++++++++ {rpc => include/rpc}/marshall_wrap.h | 4 +- {rpc => include/rpc}/poll_mgr.h | 2 +- {rpc => include/rpc}/rpc.h | 22 +-- {rpc => include/rpc}/rpc_protocol.h | 5 +- {rpc => include/rpc}/thread_pool.h | 2 +- rsm.h => include/rsm.h | 12 +- include/rsm_client.h | 47 +++++ rsm_protocol.h => include/rsm_protocol.h | 4 +- rsmtest_client.h => include/rsmtest_client.h | 4 +- t4.h => include/t4.h | 4 +- types.h => include/types.h | 33 +++- lock_client.cc | 14 +- lock_server.cc | 32 ++-- lock_server.h | 46 ----- lock_smain.cc | 6 +- lock_tester.cc | 6 +- log.cc | 92 ++-------- log.h | 27 --- marshall.py | 132 ++++++++++++++ paxos.cc | 41 +++-- rpc/connection.cc | 19 +- rpc/marshall.h | 244 -------------------------- rpc/poll_mgr.cc | 6 +- rpc/rpc.cc | 53 +++--- rpc/rpctest.cc | 76 ++++---- rpc/thread_pool.cc | 2 +- rsm.cc | 61 +++---- rsm_client.cc | 27 ++- rsm_client.h | 72 -------- rsm_tester.cc | 16 +- rsmtest_client.cc | 10 +- t4.cc | 8 +- 50 files changed, 904 insertions(+), 874 deletions(-) rename threaded_log.cc => debug.cc (71%) rename config.h => include/config.h (83%) rename threaded_log.h => include/debug.h (88%) rename endian.h => include/endian.h (98%) rename lock_client.h => include/lock_client.h (63%) rename lock_protocol.h => include/lock_protocol.h (93%) create mode 100644 include/lock_server.h create mode 100644 include/log.h rename maybe.h => include/maybe.h (100%) rename paxos.h => include/paxos.h (69%) rename paxos_protocol.h => include/paxos_protocol.h (83%) rename {rpc => include/rpc}/connection.h (92%) rename {rpc => include/rpc}/fifo.h (74%) rename {rpc => include/rpc}/file.h (57%) create mode 100644 include/rpc/marshall.h rename {rpc => include/rpc}/marshall_wrap.h (97%) rename {rpc => include/rpc}/poll_mgr.h (97%) rename {rpc => include/rpc}/rpc.h (93%) rename {rpc => include/rpc}/rpc_protocol.h (92%) rename {rpc => include/rpc}/thread_pool.h (92%) rename rsm.h => include/rsm.h (93%) create mode 100644 include/rsm_client.h rename rsm_protocol.h => include/rsm_protocol.h (95%) rename rsmtest_client.h => include/rsmtest_client.h (87%) rename t4.h => include/t4.h (91%) rename types.h => include/types.h (78%) delete mode 100644 lock_server.h delete mode 100644 log.h create mode 100755 marshall.py delete mode 100644 rpc/marshall.h delete mode 100644 rsm_client.h diff --git a/Makefile b/Makefile index 5dd06c5..e2c8d5e 100644 --- a/Makefile +++ b/Makefile @@ -1,31 +1,50 @@ -CXXFLAGS ?= -g -MMD -Werror -I. -std=c++1y -LDFLAGS ?= -CXX ?= g++ -CC ?= g++ -EXTRA_TARGETS ?= +CLANGXX ?= clang++ +GXX ?= g++ +USE_CLANG ?= 1 +CC = $(CXX) -all: lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS) +OPTFLAGS = -O3 +CXXFLAGS = -ggdb3 -MMD -I. -std=c++1y $(STDLIB) $(PEDANTRY) $(OPTFLAGS) +LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS) -rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o - rm -f $@ - ar cq $@ $^ - ranlib rpc/librpc.a +POSTBUILD ?= -rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o +PRODUCTS = lock_server lock_tester rsm_tester rpc/rpctest -lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o +ifeq "$(USE_CLANG)" "1" -lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o +PEDANTRY = -Werror -Weverything -Wall -Wextra -pedantic-errors -pedantic \ + -Wno-c++98-compat-pedantic -Wno-padded -Weffc++ \ + -Wno-non-virtual-dtor -Wno-weak-vtables +STDLIB = -stdlib=libc++ +CXX = $(CLANGXX) -rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o +else -%.o: %.cc - $(CXX) $(CXXFLAGS) -c $< -o $@ +PEDANTRY = -pedantic -Wall -Wextra -fno-default-inline -Werror +STDLIB = -pthread +CXX = $(GXX) + +endif + +all: $(PRODUCTS) $(POSTBUILD) + +LIBRPC_OBJECTS = rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o + +rpc/librpc.a: $(foreach x,$(LIBRPC_OBJECTS),rpc/librpc.a($(x))) + +rpc/rpctest: rpc/rpctest.o debug.o rpc/librpc.a t4.o + +lock_tester : lock_tester.o lock_client.o debug.o rsm_client.o rpc/librpc.a t4.o + +lock_server : lock_smain.o debug.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o + +rsm_tester: rsm_tester.o rsmtest_client.o debug.o rpc/librpc.a t4.o -include *.d -include rpc/*.d clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester rsm_tester .PHONY: clean $(EXTRA_TARGETS) -clean: - rm -rf $(clean_files) +clean: + -rm -rf $(clean_files) diff --git a/Makefile.osx b/Makefile.osx index 92638c7..fd7d6ac 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -1,31 +1,9 @@ -USE_CLANG = 1 +CLANGXX = clang++-mp-3.4 +GXX = g++-mp-4.8 -PEDANTRY = -STDLIB = -OPTFLAGS = -O3 #-fno-omit-frame-pointer -fsanitize=address ,thread,undefined -fsanitize-memory-track-origins -CXXFLAGS = -std=c++1y -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS) -LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS) - -ifeq "$(USE_CLANG)" "1" - -PEDANTRY += \ - -Weverything -pedantic-errors -Werror -Wno-c++98-compat-pedantic \ - -Wno-padded -pedantic -Wall -Wextra -Weffc++ -STDLIB += -stdlib=libc++ -CXX = clang++-mp-3.4 - -else - -PEDANTRY += -pedantic -Wall -Wextra -fno-default-inline -Werror -STDLIB += -pthread -CXX = g++-mp-4.8 - -endif - -CC := $(CXX) -EXTRA_TARGETS = signatures +POSTBUILD = signatures socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw signatures : lock_server lock_tester rpc/rpctest - echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true - echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true + echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) -s || true + echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true diff --git a/config.cc b/config.cc index 8a7bf65..b2d2e5c 100644 --- a/config.cc +++ b/config.cc @@ -1,38 +1,35 @@ -#include "config.h" +#include "include/config.h" using std::vector; - -// The config module maintains views. As a node joins or leaves a -// view, the next view will be the same as previous view, except with -// the new node added or removed. The first view contains only node -// 1. If node 2 joins after the first node (it will download the views -// from node 1), it will learn about view 1 with the first node as the -// only member. It will then invoke Paxos to create the next view. -// It will tell Paxos to ask the nodes in view 1 to agree on the value -// {1, 2}. If Paxos returns success, then it moves to view 2 with -// {1,2} as the members. When node 3 joins, the config module runs -// Paxos with the nodes in view 2 and the proposed value to be -// {1,2,3}. And so on. When a node discovers that some node of the -// current view is not responding, it kicks off Paxos to propose a new -// value (the current view minus the node that isn't responding). The -// config module uses Paxos to create a total order of views, and it -// is ensured that the majority of the previous view agrees to the -// next view. The Paxos log contains all the values (i.e., views) -// agreed on. +using namespace std::chrono; + +// The config module maintains views. As a node joins or leaves a view, the +// next view will be the same as previous view, except with the new node added +// or removed. The first view contains only node 1. If node 2 joins after the +// first node (it will download the views from node 1), it will learn about +// view 1 with the first node as the only member. It will then invoke Paxos to +// create the next view. It will tell Paxos to ask the nodes in view 1 to +// agree on the value {1, 2}. If Paxos returns success, then it moves to view +// 2 with {1,2} as the members. When node 3 joins, the config module runs Paxos +// with the nodes in view 2 and the proposed value to be {1,2,3}. And so on. +// When a node discovers that some node of the current view is not responding, +// it kicks off Paxos to propose a new value (the current view minus the node +// that isn't responding). The config module uses Paxos to create a total order +// of views, and it is ensured that the majority of the previous view agrees to +// the next view. The Paxos log contains all the values (i.e., views) agreed +// on. // -// The RSM module informs config to add nodes. The config module -// runs a heartbeater thread that checks in with nodes. If a node -// doesn't respond, the config module will invoke Paxos's proposer to -// remove the node. Higher layers will learn about this change when a -// Paxos acceptor accepts the new proposed value through -// paxos_commit(). +// The RSM module informs config to add nodes. The config module runs a +// heartbeater thread that checks in with nodes. If a node doesn't respond, +// the config module will invoke Paxos's proposer to remove the node. Higher +// layers will learn about this change when a Paxos acceptor accepts the new +// proposed value through paxos_commit(). // -// To be able to bring other nodes up to date to the latest formed -// view, each node will have a complete history of all view numbers -// and their values that it knows about. At any time a node can reboot -// and when it re-joins, it may be many views behind; by remembering -// all views, the other nodes can bring this re-joined node up to -// date. +// To be able to bring other nodes up to date to the latest formed view, each +// node will have a complete history of all view numbers and their values that +// it knows about. At any time a node can reboot and when it re-joins, it may +// be many views behind; by remembering all views, the other nodes can bring +// this re-joined node up to date. config_view_change::~config_view_change() {} @@ -52,23 +49,23 @@ void config::restore(const string & s) { reconstruct(cfg_mutex_lock); } -void config::get_view(unsigned instance, vector & m) { +vector config::get_view(unsigned instance) { lock cfg_mutex_lock(cfg_mutex); - get_view(instance, m, cfg_mutex_lock); + return get_view(instance, cfg_mutex_lock); } -void config::get_view(unsigned instance, vector & m, lock & cfg_mutex_lock) { +vector config::get_view(unsigned instance, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); string value = paxos.value(instance); LOG << "get_view(" << instance << "): returns " << value; - m = explode(value); + return explode(value); } void config::reconstruct(lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); my_view_id = paxos.instance(); if (my_view_id > 0) { - get_view(my_view_id, mems, cfg_mutex_lock); + mems = get_view(my_view_id, cfg_mutex_lock); LOG << "view " << my_view_id << " " << mems; } } @@ -99,9 +96,7 @@ void config::paxos_commit(unsigned instance, const string & value) { bool config::ismember(const string & m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); - vector v; - get_view(vid, v, cfg_mutex_lock); - return isamember(m, v); + return isamember(m, get_view(vid, cfg_mutex_lock)); } bool config::add(const string & new_m, unsigned vid) { @@ -146,13 +141,12 @@ void config::heartbeater() { lock cfg_mutex_lock(cfg_mutex); while (1) { - auto next_timeout = steady_clock::now() + milliseconds(300); + auto next_timeout = steady_clock::now() + 300ms; LOG << "go to sleep"; config_cond.wait_until(cfg_mutex_lock, next_timeout); unsigned vid = my_view_id; - vector cmems; - get_view(vid, cmems, cfg_mutex_lock); + vector cmems = get_view(vid, cfg_mutex_lock); LOG << "current membership " << cmems; if (!isamember(me, cmems)) { @@ -180,10 +174,10 @@ void config::heartbeater() { } } -paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { +paxos_protocol::status config::heartbeat(paxos_protocol::view_t & r, string m, paxos_protocol::view_t vid) { lock cfg_mutex_lock(cfg_mutex); - r = (int) my_view_id; - LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id; + r = my_view_id; + LOG << "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id; if (vid == my_view_id) return paxos_protocol::OK; else if (paxos.isrunning()) { @@ -195,13 +189,13 @@ paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); - unsigned vid = my_view_id; + paxos_protocol::view_t vid = my_view_id, r; LOG << "heartbeat to " << m << " (" << vid << ")"; cfg_mutex_lock.unlock(); - int r = 0, ret = rpc_protocol::bind_failure; + int ret = rpc_protocol::bind_failure; if (auto cl = rpcc::bind_cached(m)) - ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); + ret = cl->call_timeout(paxos_protocol::heartbeat, 100ms, r, me, vid); cfg_mutex_lock.lock(); heartbeat_t res = OK; diff --git a/threaded_log.cc b/debug.cc similarity index 71% rename from threaded_log.cc rename to debug.cc index 3218e33..784a054 100644 --- a/threaded_log.cc +++ b/debug.cc @@ -1,6 +1,6 @@ -#include "t4.h" -#include "types.h" -#include "threaded_log.h" +#include "include/t4.h" +#include "include/types.h" +#include "include/debug.h" using namespace std::chrono; @@ -9,8 +9,7 @@ locked_ostream && _log_prefix(locked_ostream && f, const string & file, const st int tid = global->thread_name_map[thread]; if (tid==0) tid = global->thread_name_map[thread] = ++global->next_thread_num; - auto utime = duration_cast( - system_clock::now().time_since_epoch()).count() % 1000000000; + auto utime = (system_clock::now().time_since_epoch() / 1us) % 1000000000; f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " "; f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid; f << " " << std::setw(20) << file << " " << std::setw(18) << func; @@ -32,3 +31,12 @@ int _log_debug_level() { lock _log_lock() { return lock(global->log_mutex); } + +string hex_string(const string & s) { + string bytes; + for (char ch : s) { + bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]); + bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]); + } + return bytes; +} diff --git a/config.h b/include/config.h similarity index 83% rename from config.h rename to include/config.h index 8d0c836..ce8990d 100644 --- a/config.h +++ b/include/config.h @@ -1,8 +1,8 @@ #ifndef config_h #define config_h -#include "types.h" -#include "paxos.h" +#include "include/types.h" +#include "include/paxos.h" class config_view_change { public: @@ -20,8 +20,8 @@ class config : public paxos_change { std::vector mems; std::mutex cfg_mutex; cond config_cond; - paxos_protocol::status heartbeat(int & r, string m, unsigned instance); - void get_view(unsigned instance, std::vector & m, lock & cfg_mutex_lock); + paxos_protocol::status heartbeat(paxos_protocol::view_t & r, string m, unsigned instance); + std::vector get_view(unsigned instance, lock & cfg_mutex_lock); bool remove(const string &, lock & cfg_mutex_lock); void reconstruct(lock & cfg_mutex_lock); typedef enum { @@ -35,7 +35,7 @@ class config : public paxos_change { unsigned view_id() { return my_view_id; } const string & myaddr() const { return me; } string dump() { return paxos.dump(); } - void get_view(unsigned instance, std::vector & m); + std::vector get_view(unsigned instance); void restore(const string & s); bool add(const string &, unsigned view_id); bool ismember(const string & m, unsigned view_id); diff --git a/threaded_log.h b/include/debug.h similarity index 88% rename from threaded_log.h rename to include/debug.h index 9a83bfb..f42be6b 100644 --- a/threaded_log.h +++ b/include/debug.h @@ -1,5 +1,5 @@ -#ifndef threaded_log_h -#define threaded_log_h +#ifndef debug_h +#define debug_h #include #include @@ -20,7 +20,10 @@ lock _log_lock(); #define LOG_NONMEMBER _log_prefix(locked_ostream{std::cerr, _log_lock()}, __FILE__, __func__) #define LOG _log_member(LOG_NONMEMBER, (const void *)this) +#define DEBUG_LOG LOG #define IF_LEVEL(level) if(_log_debug_level() >= abs(level)) +string hex_string(const string & s); + #endif diff --git a/endian.h b/include/endian.h similarity index 98% rename from endian.h rename to include/endian.h index 7c78bbc..450eb5d 100644 --- a/endian.h +++ b/include/endian.h @@ -2,6 +2,7 @@ #define endian_h #include +#include constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1}; diff --git a/lock_client.h b/include/lock_client.h similarity index 63% rename from lock_client.h rename to include/lock_client.h index 9e449f4..64848b5 100644 --- a/lock_client.h +++ b/include/lock_client.h @@ -5,59 +5,50 @@ #ifdef __cplusplus -#include "types.h" -#include "lock_protocol.h" -#include "rpc/fifo.h" -#include "rsm_client.h" -#include "maybe.h" - -class lock_release_user { - public: - virtual void dorelease(lock_protocol::lockid_t) = 0; - virtual ~lock_release_user() {} -}; - -class lock_state { -public: - enum { - none = 0, - retrying, - free, - locked, - acquiring, - releasing - } state = none; - std::thread::id held_by; - std::list wanted_by; - std::mutex m; - std::map c; - lock_protocol::xid_t xid; - void wait(lock & mutex_lock); - void signal(); - void signal(thread::id who); -}; - -typedef std::map lock_map; +#include "include/types.h" +#include "include/lock_protocol.h" +#include "include/rpc/fifo.h" +#include "include/rsm_client.h" +#include "include/maybe.h" // Clients that caches locks. The server can revoke locks using // lock_revoke_server. class lock_client { private: + class lock_state { + public: + enum { + none = 0, + retrying, + free, + locked, + acquiring, + releasing + } state = none; + std::thread::id held_by; + std::list wanted_by; + std::mutex m; + std::map c; + lock_protocol::xid_t xid; + void wait(lock & mutex_lock); + void signal(); + void signal(thread::id who); + }; + unique_ptr rlsrpc; thread releaser_thread; unique_ptr rsmc; - lock_release_user *lu; in_port_t rlock_port; string hostname; string id; std::mutex xid_mutex; - lock_protocol::xid_t next_xid; + lock_protocol::xid_t next_xid=0; fifo> release_fifo; std::mutex lock_table_lock; - lock_map lock_table; + std::map lock_table; lock_state & get_lock_state(lock_protocol::lockid_t lid); public: - lock_client(string xdst, lock_release_user *l = 0); + lock_client(string xdst); ~lock_client(); lock_protocol::status acquire(lock_protocol::lockid_t); lock_protocol::status release(lock_protocol::lockid_t); diff --git a/lock_protocol.h b/include/lock_protocol.h similarity index 93% rename from lock_protocol.h rename to include/lock_protocol.h index bd691a0..dd924cb 100644 --- a/lock_protocol.h +++ b/include/lock_protocol.h @@ -1,8 +1,8 @@ #ifndef lock_protocol_h #define lock_protocol_h -#include "types.h" -#include "rpc/rpc.h" +#include "include/types.h" +#include "include/rpc/rpc.h" typedef string callback_t; diff --git a/include/lock_server.h b/include/lock_server.h new file mode 100644 index 0000000..74b271a --- /dev/null +++ b/include/lock_server.h @@ -0,0 +1,41 @@ +#ifndef lock_server_h +#define lock_server_h + +#include "include/types.h" +#include "include/lock_protocol.h" +#include "include/rsm.h" +#include "include/rpc/fifo.h" + +class lock_server : private rsm_state_transfer { + private: + using holder_t=std::pair; + + struct lock_state { + inline lock_state() {} + lock_state(const lock_state & other); + bool held=false; + holder_t held_by; + std::list wanted_by; + std::map old_requests; + std::mutex m; + + MEMBERS(held, held_by, wanted_by) + }; + + int nacquire; + std::mutex lock_table_lock; + std::map lock_table; + lock_state & get_lock_state(lock_protocol::lockid_t lid); + fifo retry_fifo, revoke_fifo; + rsm *rsm_; + string marshall_state(); + void unmarshall_state(const string & state); + void revoker NORETURN (); + void retryer NORETURN (); + public: + lock_server(rsm & r); + lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); + lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); +}; + +#endif diff --git a/include/log.h b/include/log.h new file mode 100644 index 0000000..cc3603b --- /dev/null +++ b/include/log.h @@ -0,0 +1,38 @@ +#ifndef log_h +#define log_h + +#include "include/types.h" +#include "include/paxos_protocol.h" +#include "include/rpc/marshall.h" + +class log { + private: + string name; + std::map> handlers; + + public: + log(string _me); + string read(); + void write(string s); + void replay(); + + struct label : public string { label(const char * s) : string(s) {} }; +#define LABEL(_x_) inline operator log::label () const { return _x_; } + + // XXX should be an atomic operation + template + void append(const T & t) { + std::ofstream(name, std::ios::app) << marshall{label(t), t}; + } + + template + void handler(std::function h) { + handlers[label(T())] = [h, this] (unmarshall & from) { + auto entry = from.get(); + h(entry); + DEBUG_LOG << entry; + }; + } +}; + +#endif diff --git a/maybe.h b/include/maybe.h similarity index 100% rename from maybe.h rename to include/maybe.h diff --git a/paxos.h b/include/paxos.h similarity index 69% rename from paxos.h rename to include/paxos.h index 20e249c..9b54c05 100644 --- a/paxos.h +++ b/include/paxos.h @@ -1,10 +1,10 @@ #ifndef paxos_h #define paxos_h -#include "types.h" -#include "rpc/rpc.h" -#include "paxos_protocol.h" -#include "log.h" +#include "include/types.h" +#include "include/rpc/rpc.h" +#include "include/paxos_protocol.h" +#include "include/log.h" using prepareres = paxos_protocol::prepareres; using node_t = paxos_protocol::node_t; @@ -43,8 +43,7 @@ class proposer_acceptor { unsigned instance_h = 0; // number of the highest instance we have decided std::map values; // vals of each instance - friend class log; - class log l = {this, me}; + class log l = {me}; void commit(unsigned instance, const value_t & v, lock & acceptor_mutex_lock); @@ -59,12 +58,39 @@ class proposer_acceptor { void breakpoint1(); void breakpoint2(); + // Log a committed paxos instance + struct log_instance { + unsigned number; + string value; + MEMBERS(number, value) + LABEL("done") + }; + + // Log the highest proposal number that the local paxos acceptor has + // ever seen; called from paxos when responding to preparereq with + // accept + struct log_proposal { + prop_t promise; + MEMBERS(promise) + LABEL("propseen") + }; + + // Log the highest proposal (proposal number and value) that the local + // paxos acceptor accept has ever accepted; called from paxos when + // responding to acceptreq with true + struct log_accept { + prop_t number; + string value; + MEMBERS(number, value) + LABEL("accepted") + }; + public: proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value); unsigned instance() { return instance_h; } const value_t & value(unsigned instance) { return values[instance]; } - string dump() { return l.dump(); } - void restore(const string & s) { l.restore(s); l.logread(); } + string dump() { return l.read(); } + void restore(const string & s) { l.write(s); l.replay(); } rpcs *get_rpcs() { return &pxs; } bool run(unsigned instance, const nodes_t & cnodes, const value_t & v); diff --git a/paxos_protocol.h b/include/paxos_protocol.h similarity index 83% rename from paxos_protocol.h rename to include/paxos_protocol.h index 3e1fbcd..f3c50e2 100644 --- a/paxos_protocol.h +++ b/include/paxos_protocol.h @@ -1,8 +1,8 @@ #ifndef paxos_protocol_h #define paxos_protocol_h -#include "types.h" -#include "rpc/rpc.h" +#include "include/types.h" +#include "include/rpc/rpc.h" struct prop_t { unsigned n; @@ -23,13 +23,14 @@ namespace paxos_protocol { }; using node_t = string; using nodes_t = std::vector; + using view_t = unsigned; using value_t = string; REMOTE_PROCEDURE_BASE(0x11000); REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t)); REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t)); REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t)); - REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned)); + REMOTE_PROCEDURE(4, heartbeat, (view_t &, string, view_t)); } #endif diff --git a/rpc/connection.h b/include/rpc/connection.h similarity index 92% rename from rpc/connection.h rename to include/rpc/connection.h index 03e92da..a6a7ffe 100644 --- a/rpc/connection.h +++ b/include/rpc/connection.h @@ -1,13 +1,13 @@ #ifndef connection_h #define connection_h -#include "types.h" +#include "include/types.h" #include #include -#include "t4.h" -#include "poll_mgr.h" -#include "file.h" -#include "threaded_log.h" +#include "include/t4.h" +#include "include/rpc/poll_mgr.h" +#include "include/rpc/file.h" +#include "include/debug.h" class connection; @@ -32,7 +32,7 @@ class connection : private aio_callback, public std::enable_shared_from_this to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0); const time_point create_time = steady_clock::now(); - const file_t fd; + socket_t fd; private: void write_cb(int s); diff --git a/rpc/fifo.h b/include/rpc/fifo.h similarity index 74% rename from rpc/fifo.h rename to include/rpc/fifo.h index 9e4933a..dd3d8c1 100644 --- a/rpc/fifo.h +++ b/include/rpc/fifo.h @@ -1,12 +1,12 @@ #ifndef fifo_h #define fifo_h -#include "types.h" +#include "include/types.h" template class fifo { public: - fifo(size_t limit=0) : max_(limit) {} + fifo(size_t max_size=0) : max_(max_size) {} bool enq(T e, bool blocking=true) { lock ml(m_); @@ -34,9 +34,9 @@ class fifo { private: std::list q_; std::mutex m_; - cond non_empty_c_; // q went non-empty - cond has_space_c_; // q is not longer overfull - size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit + cond non_empty_c_; + cond has_space_c_; + size_t max_; }; #endif diff --git a/rpc/file.h b/include/rpc/file.h similarity index 57% rename from rpc/file.h rename to include/rpc/file.h index 9ea313f..fd6d580 100644 --- a/rpc/file.h +++ b/include/rpc/file.h @@ -3,10 +3,12 @@ #include #include -#include "types.h" +#include "include/types.h" #include class file_t { + protected: + static const int nofile = -1; private: int fd_; @@ -20,37 +22,41 @@ class file_t { operator int & () { return flags_; } }; public: - inline file_t(int fd=-1) : fd_(fd) {} + inline file_t(int fd=nofile) : fd_(fd) {} inline file_t(const file_t &) = delete; - inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); } - inline ~file_t() { if (fd_ != -1) ::close(fd_); } + inline file_t(file_t && other) : fd_(nofile) { std::swap(fd_, other.fd_); } + inline ~file_t() { close(); } static inline void pipe(file_t *ends) { int fds[2]; + ends[0].close(); + ends[1].close(); VERIFY(::pipe(fds) == 0); ends[0].fd_ = fds[0]; ends[1].fd_ = fds[1]; } - inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; } + inline operator int() const { if (fd_ == nofile) throw "no fd"; return fd_; } inline flags_t flags() const { return {*this}; } inline void close() { - ::close(fd_); - fd_ = -1; + if (fd_ != nofile) + ::close(fd_); + fd_ = nofile; } - template - inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); } + template inline typename std::enable_if::value, ssize_t>::type + read(T & t) const { return ::read(fd_, &t, sizeof(T)); } inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); } - template - inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); } + template inline typename std::enable_if::value, ssize_t>::type + write(const T & t) const { return ::write(fd_, &t, sizeof(T)); } inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); } }; class socket_t : public file_t { public: - socket_t(int fd=-1) : file_t(fd) {} + socket_t(int fd=nofile) : file_t(fd) {} template int setsockopt(int level, int option, T && value) { return ::setsockopt(*this, level, option, &value, sizeof(T)); } + inline int shutdown(int how) { return ::shutdown(*this, how); } }; #endif diff --git a/include/rpc/marshall.h b/include/rpc/marshall.h new file mode 100644 index 0000000..8b31791 --- /dev/null +++ b/include/rpc/marshall.h @@ -0,0 +1,187 @@ +#ifndef marshall_h +#define marshall_h + +#include "include/types.h" +#include "include/rpc/rpc_protocol.h" + +class marshall : public string { + public: + template + marshall(const Args & ... args) { + UNPACK_STATEMENT(*this << args); + } + + template + static inline string datagram(const T & h, const marshall & payload="") { + using namespace rpc_protocol; + marshall m{rpc_sz_t(payload.size() + RPC_HEADER_SZ - sizeof(rpc_sz_t)), (T)h}; + VERIFY(m.size() <= RPC_HEADER_SZ); // Datagram header too large + m.resize(RPC_HEADER_SZ); + return m + payload; + } +}; + +class unmarshall : string { + private: + string::const_iterator next = cbegin(); + + public: + template + inline unmarshall(const string & s, Args && ...args) : string(s) { + UNPACK_STATEMENT(*this >> args); + } + + template + static inline unmarshall datagram(const string & datagram, H & h, Args && ... args) { + rpc_protocol::rpc_sz_t s; + VERIFY(unmarshall(datagram, s, h)); // Datagram header too large + return unmarshall(datagram.substr(rpc_protocol::RPC_HEADER_SZ), args...); + } + + template inline S get() { S s; *this >> s; return s; } + inline bool ok() const { return next <= end(); } + inline bool okdone() const { return next == end(); } + inline operator bool() const { return ok(); } + inline void read(void * t, size_t n) { + auto from = next; + next += string::const_iterator::difference_type(n); + if (next <= end()) + std::copy(from, next, (char *)t); + } +}; + +// +// Marshalling for plain old data +// + +#ifndef MARSHALL_RAW_NETWORK_ORDER_AS +#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \ +inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.append((const char *)&y, sizeof(_d_)); return m; } \ +inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } +#endif + +#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) + +MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t) +MARSHALL_RAW_NETWORK_ORDER(uint8_t) +MARSHALL_RAW_NETWORK_ORDER(int8_t) +MARSHALL_RAW_NETWORK_ORDER(uint16_t) +MARSHALL_RAW_NETWORK_ORDER(int16_t) +MARSHALL_RAW_NETWORK_ORDER(uint32_t) +MARSHALL_RAW_NETWORK_ORDER(int32_t) +MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint64_t) +MARSHALL_RAW_NETWORK_ORDER(uint64_t) +MARSHALL_RAW_NETWORK_ORDER(int64_t) + +// +// Marshalling for tuples (used to implement marshalling for structs) +// + +// In order to iterate over the tuple elements, we first need a template +// parameter pack containing the tuple's indices. The function templates named +// *_imp below accept an empty tag struct as their last argument, and use its +// template arguments to index the tuple. The operator<< overloads instantiate +// the appropriate tag struct to make this possible. + +template inline marshall & +tuple_marshall_imp(marshall & m, tuple & t, std::index_sequence) { + UNPACK_STATEMENT(m << std::get(t)); + return m; +} + +template inline marshall & +operator<<(marshall & m, tuple && t) { + return tuple_marshall_imp(m, t, std::index_sequence_for{}); +} + +template inline unmarshall & +tuple_unmarshall_imp(unmarshall & u, tuple t, std::index_sequence) { + UNPACK_STATEMENT(u >> std::get(t)); + return u; +} + +template inline unmarshall & +operator>>(unmarshall & u, tuple && t) { + return tuple_unmarshall_imp(u, t, std::index_sequence_for{}); +} + +// +// Marshalling for structs or classes containing a MEMBERS declaration +// + +// Implements struct marshalling via tuple marshalling of members. +template inline typename +enable_if::value, unmarshall>::type & +operator>>(unmarshall & u, A & a) { return u >> _tuple_(a); } + +template inline typename +enable_if::value, marshall>::type & +operator<<(marshall & m, const T a) { return m << _tuple_(a); } + +// +// Marshalling for STL containers +// + +// this overload is visible for type A only if A::cbegin and A::cend exist +template inline typename +enable_if::value, marshall>::type & +operator<<(marshall & m, const A & x) { + m << (uint32_t)x.size(); + for (const auto & a : x) + m << a; + return m; +} + +// visible for type A if A::emplace_back(a) makes sense +template inline typename +enable_if::value, unmarshall>::type & +operator>>(unmarshall & u, A & x) { + uint32_t n = u.get(); + x.clear(); + while (n--) + x.emplace_back(u.get()); + return u; +} + +// std::map +template inline unmarshall & +operator>>(unmarshall & u, std::map & x) { + uint32_t n = u.get(); + x.clear(); + while (n--) + x.emplace(u.get>()); + return u; +} + +// std::string +inline marshall & operator<<(marshall & m, const string & s) { + m << (uint32_t)s.size(); + m.append(s); + return m; +} + +inline unmarshall & operator>>(unmarshall & u, string & s) { + uint32_t sz; + if (u >> sz) { + s.resize(sz); + u.read(&s[0], sz); + } + return u; +} + +// +// Marshalling for strongly-typed enums +// + +template typename enable_if::value, marshall>::type & +operator<<(marshall & m, E e) { + return m << typename std::underlying_type::type(e); +} + +template typename enable_if::value, unmarshall>::type & +operator>>(unmarshall & u, E & e) { + e = E(u.get::type>()); + return u; +} + +#endif diff --git a/rpc/marshall_wrap.h b/include/rpc/marshall_wrap.h similarity index 97% rename from rpc/marshall_wrap.h rename to include/rpc/marshall_wrap.h index 04f5268..497c882 100644 --- a/rpc/marshall_wrap.h +++ b/include/rpc/marshall_wrap.h @@ -1,7 +1,7 @@ #ifndef marshall_wrap_h #define marshall_wrap_h -#include "marshall.h" +#include "include/rpc/marshall.h" typedef std::function handler; @@ -73,7 +73,7 @@ struct marshalled_func_imp { return new handler([=](unmarshall && u, marshall & m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. - ArgsStorage t{u._grab::type>()...}; + ArgsStorage t{u.get::type>()...}; // Verify successful unmarshalling of the entire input stream. if (!u.okdone()) return (RV)ErrorHandler::unmarshall_args_failure(); diff --git a/rpc/poll_mgr.h b/include/rpc/poll_mgr.h similarity index 97% rename from rpc/poll_mgr.h rename to include/rpc/poll_mgr.h index 6fe66c0..55ffefb 100644 --- a/rpc/poll_mgr.h +++ b/include/rpc/poll_mgr.h @@ -1,7 +1,7 @@ #ifndef poll_mgr_h #define poll_mgr_h -#include "types.h" +#include "include/types.h" #define MAX_POLL_FDS 128 diff --git a/rpc/rpc.h b/include/rpc/rpc.h similarity index 93% rename from rpc/rpc.h rename to include/rpc/rpc.h index 9464e57..b4fc68f 100644 --- a/rpc/rpc.h +++ b/include/rpc/rpc.h @@ -1,20 +1,20 @@ #ifndef rpc_h #define rpc_h -#include "types.h" +#include "include/types.h" -#include "rpc_protocol.h" -#include "thread_pool.h" -#include "marshall.h" -#include "marshall_wrap.h" -#include "connection.h" -#include "threaded_log.h" +#include "include/rpc/rpc_protocol.h" +#include "include/rpc/thread_pool.h" +#include "include/rpc/marshall.h" +#include "include/rpc/marshall_wrap.h" +#include "include/rpc/connection.h" +#include "include/debug.h" using std::chrono::milliseconds; namespace rpc { - static constexpr milliseconds to_max{12000}; - static constexpr milliseconds to_min{100}; + static constexpr auto to_max = milliseconds(12000); + static constexpr auto to_min = milliseconds(100); } template @@ -126,7 +126,9 @@ class rpcc : private connection_delegate { string rep; int intret = call_marshalled(proc, to, rep, marshall(args...)); if (intret >= 0) { - VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking + rpc_protocol::reply_header rh; + // guaranteed by static type checking? + VERIFY(unmarshall::datagram(rep, rh, r).okdone()); } return intret; } diff --git a/rpc/rpc_protocol.h b/include/rpc/rpc_protocol.h similarity index 92% rename from rpc/rpc_protocol.h rename to include/rpc/rpc_protocol.h index 4f03937..a717beb 100644 --- a/rpc/rpc_protocol.h +++ b/include/rpc/rpc_protocol.h @@ -1,13 +1,13 @@ #ifndef rpc_protocol_h #define rpc_protocol_h -#include "types.h" +#include "include/types.h" namespace rpc_protocol { using proc_id_t = uint32_t; using status = int32_t; - using rpc_sz_t = uint32_t; + using rpc_sz_t = uint64_t; using nonce_t = uint32_t; using xid_t = int32_t; @@ -51,7 +51,6 @@ namespace rpc_protocol { union header_t { request_header req; reply_header rep; }; const size_t RPC_HEADER_SZ = sizeof(header_t) + sizeof(rpc_sz_t); - const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation const size_t MAX_PDU = 10<<20; // maximum PDF is 10M #define REMOTE_PROCEDURE_BASE(_base_) \ diff --git a/rpc/thread_pool.h b/include/rpc/thread_pool.h similarity index 92% rename from rpc/thread_pool.h rename to include/rpc/thread_pool.h index 98ec655..0acb486 100644 --- a/rpc/thread_pool.h +++ b/include/rpc/thread_pool.h @@ -3,7 +3,7 @@ #include #include -#include "fifo.h" +#include "include/rpc/fifo.h" typedef std::function job_t; diff --git a/rsm.h b/include/rsm.h similarity index 93% rename from rsm.h rename to include/rsm.h index 487a856..68f1729 100644 --- a/rsm.h +++ b/include/rsm.h @@ -3,16 +3,16 @@ #ifndef rsm_h #define rsm_h -#include "types.h" -#include "rsm_protocol.h" -#include "rpc/rpc.h" +#include "include/types.h" +#include "include/rsm_protocol.h" +#include "include/rpc/rpc.h" #include -#include "config.h" +#include "include/config.h" class rsm_state_transfer { public: - virtual string marshal_state() = 0; - virtual void unmarshal_state(const string &) = 0; + virtual string marshall_state() = 0; + virtual void unmarshall_state(const string &) = 0; virtual ~rsm_state_transfer(); }; diff --git a/include/rsm_client.h b/include/rsm_client.h new file mode 100644 index 0000000..407775b --- /dev/null +++ b/include/rsm_client.h @@ -0,0 +1,47 @@ +#ifndef rsm_client_h +#define rsm_client_h + +#include "include/types.h" +#include "include/rsm_protocol.h" + +// +// rsm client interface. +// +// The client stubs package up an rpc, and then call the invoke procedure +// on the replicated state machine passing the RPC as an argument. This way +// the replicated state machine isn't service specific; any server can use it. +// + +class rsm_client { + protected: + string primary; + std::vector known_mems; + std::mutex rsm_client_mutex; + bool init_members(lock & rsm_client_mutex_lock); + rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req); + template int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req); + public: + rsm_client(string dst); + + template + inline int call(rpc_protocol::proc_checked_t

proc, R & r, const Args & ...a1) { + static_assert(is_valid_call::value, "RSM method invoked with incorrect argument types"); + return call_marshalled(proc, r, marshall(a1...)); + } +}; + +template +int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) { + string rep, res; + int intret = invoke(proc.id, rep, req); + VERIFY( intret == rsm_client_protocol::OK ); + auto u = unmarshall(rep, intret); + if (intret < 0) return intret; + if ((u >> res).okdone() && unmarshall(res, r).okdone()) + return intret; + LOG << "failed to unmarshall reply \"" << hex_string(rep) << "\" for proceduce " + << proc.name << " (0x" << std::hex << proc.id << ")."; + return rpc_protocol::unmarshall_reply_failure; +} + +#endif diff --git a/rsm_protocol.h b/include/rsm_protocol.h similarity index 95% rename from rsm_protocol.h rename to include/rsm_protocol.h index 5a5b7dd..7743f7f 100644 --- a/rsm_protocol.h +++ b/include/rsm_protocol.h @@ -1,8 +1,8 @@ #ifndef rsm_protocol_h #define rsm_protocol_h -#include "types.h" -#include "rpc/rpc.h" +#include "include/types.h" +#include "include/rpc/rpc.h" namespace rsm_client_protocol { enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY}; diff --git a/rsmtest_client.h b/include/rsmtest_client.h similarity index 87% rename from rsmtest_client.h rename to include/rsmtest_client.h index e7add37..cb611e7 100644 --- a/rsmtest_client.h +++ b/include/rsmtest_client.h @@ -3,8 +3,8 @@ #ifndef rsmtest_client_h #define rsmtest_client_h -#include "types.h" -#include "rsm_protocol.h" +#include "include/types.h" +#include "include/rsm_protocol.h" // Client interface to the rsmtest server class rsmtest_client { diff --git a/t4.h b/include/t4.h similarity index 91% rename from t4.h rename to include/t4.h index 19eaddc..7222c0d 100644 --- a/t4.h +++ b/include/t4.h @@ -1,8 +1,8 @@ #ifndef t4_h #define t4_h -#include "types.h" -#include "rpc/poll_mgr.h" +#include "include/types.h" +#include "include/rpc/poll_mgr.h" struct t4_state { std::mutex log_mutex; diff --git a/types.h b/include/types.h similarity index 78% rename from types.h rename to include/types.h index 53f496d..be6cdd5 100644 --- a/types.h +++ b/include/types.h @@ -54,13 +54,6 @@ template struct supports_emplace_back().emplace_back(std::declval()), void()) > : true_type {}; -template using enum_type_t = typename enable_if< - std::is_enum::value, typename std::underlying_type::type>::type; - -template constexpr inline enum_type_t from_enum(E e) noexcept { return (enum_type_t)e; } -template constexpr inline E to_enum(enum_type_t value) noexcept { return (E)value; } - - template struct is_tuple_convertible : false_type {}; template struct is_tuple_convertible inline auto _tuple_(T & t) { return t._tuple_(); } + +// specialized tuple adapter for std::pair + +template struct is_tuple_convertible> : true_type {}; +template inline auto _tuple_(std::pair & t) { return std::tie(t.first, t.second); } +template inline auto _tuple_(const std::pair & t) { return std::tie(t.first, t.second); } + // struct ordering and comparison operations; requires the use of MEMBERS. // usage: // @@ -138,7 +139,9 @@ LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=) // are fair game. struct pass { template inline pass(Args && ...) {} }; -#include "endian.h" +#define UNPACK_STATEMENT(_x_) (void)pass{(_x_)...} + +#include "include/endian.h" #ifndef __has_attribute #define __has_attribute(x) 0 @@ -150,4 +153,18 @@ struct pass { template inline pass(Args && ...) {} }; #define NORETURN #endif +template inline void +tuple_ostream_imp(std::ostream & m, tuple & t, std::index_sequence) { + UNPACK_STATEMENT(m << std::get(t)); +} + +template inline std::ostream & +operator<<(std::ostream & m, tuple && t) { + tuple_ostream_imp(m, t, std::index_sequence_for{}); + return m; +} + +template inline typename std::enable_if::value, std::ostream &>::type +operator<<(std::ostream & os, const T & t) { return os << _tuple_(t); } + #endif diff --git a/lock_client.cc b/lock_client.cc index ca21d9d..4b26a91 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -1,31 +1,31 @@ // RPC stubs for clients to talk to lock_server, and cache the locks. -#include "lock_client.h" +#include "include/lock_client.h" #include -void lock_state::wait(lock & mutex_lock) { +void lock_client::lock_state::wait(lock & mutex_lock) { auto self = std::this_thread::get_id(); c[self].wait(mutex_lock); c.erase(self); } -void lock_state::signal() { +void lock_client::lock_state::signal() { // signal anyone if (c.begin() != c.end()) c.begin()->second.notify_one(); } -void lock_state::signal(thread::id who) { +void lock_client::lock_state::signal(thread::id who) { if (c.count(who)) c[who].notify_one(); } -lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { +lock_client::lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); return lock_table[lid]; // creates the lock if it doesn't already exist } -lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) { +lock_client::lock_client(string xdst) { rlock_port = std::uniform_int_distribution(1024,32000+1024)(global->random_generator); id = "127.0.0.1:" + std::to_string(rlock_port); rlsrpc = std::make_unique(rlock_port); @@ -54,8 +54,6 @@ void lock_client::releaser() { sl.unlock(); int r; rsmc->call(lock_protocol::release, r, lid, id, st.xid); - if (lu) - lu->dorelease(lid); sl.lock(); } st.state = lock_state::none; diff --git a/lock_server.cc b/lock_server.cc index 86e5ad2..1241409 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,27 +1,17 @@ // the caching lock server implementation -#include "lock_server.h" +#include "include/lock_server.h" #include #include -lock_state::lock_state(): - held(false) -{ +lock_server::lock_state::lock_state(const lock_state & other) { + held = other.held; + held_by = other.held_by; + wanted_by = other.wanted_by; + old_requests = other.old_requests; } -lock_state::lock_state(const lock_state & other) { - *this = other; -} - -lock_state & lock_state::operator=(const lock_state & o) { - held = o.held; - held_by = o.held_by; - wanted_by = o.wanted_by; - old_requests = o.old_requests; - return *this; -} - -lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { +lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { lock sl(lock_table_lock); // this will create the lock if it doesn't already exist return lock_table[lid]; @@ -151,12 +141,12 @@ lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, c return lock_protocol::OK; } -string lock_server::marshal_state() { +string lock_server::marshall_state() { lock sl(lock_table_lock); - return marshall(nacquire, lock_table).content(); + return marshall(nacquire, lock_table); } -void lock_server::unmarshal_state(const string & state) { +void lock_server::unmarshall_state(const string & state) { lock sl(lock_table_lock); - unmarshall(state, false, nacquire, lock_table); + unmarshall(state, nacquire, lock_table); } diff --git a/lock_server.h b/lock_server.h deleted file mode 100644 index d182876..0000000 --- a/lock_server.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef lock_server_h -#define lock_server_h - -#include "types.h" -#include "lock_protocol.h" -#include "rsm.h" -#include "rpc/fifo.h" - -typedef std::pair holder_t; - -class lock_state { -public: - lock_state(); - lock_state(const lock_state & other); - bool held; - holder_t held_by; - std::list wanted_by; - std::map old_requests; - std::mutex m; - lock_state & operator=(const lock_state &); - - MEMBERS(held, held_by, wanted_by) -}; - -typedef std::map lock_map; - -class lock_server : private rsm_state_transfer { - private: - int nacquire; - std::mutex lock_table_lock; - lock_map lock_table; - lock_state & get_lock_state(lock_protocol::lockid_t lid); - fifo retry_fifo; - fifo revoke_fifo; - rsm *rsm_; - string marshal_state(); - void unmarshal_state(const string & state); - void revoker NORETURN (); - void retryer NORETURN (); - public: - lock_server(rsm & r); - lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); - lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); -}; - -#endif diff --git a/lock_smain.cc b/lock_smain.cc index 64f7985..52e6c4f 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -1,4 +1,6 @@ -#include "lock_server.h" +#include "include/lock_server.h" + +using namespace std::chrono; int main(int argc, char *argv[]) { global = new t4_state('s'); @@ -16,5 +18,5 @@ int main(int argc, char *argv[]) { rsm.start(); while(1) - std::this_thread::sleep_for(milliseconds(1000)); + std::this_thread::sleep_for(1000ms); } diff --git a/lock_tester.cc b/lock_tester.cc index fbdeeb4..63c0722 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -2,10 +2,12 @@ // Lock server tester // -#include "lock_client.h" +#include "include/lock_client.h" #include #include +using namespace std::chrono; + // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. static lock_client *lc[nt]; @@ -65,7 +67,7 @@ static void test2(int i) { lc[i]->acquire(a); LOG_NONMEMBER << "test2: client " << i << " acquire done"; check_grant(a); - std::this_thread::sleep_for(milliseconds(100)); + std::this_thread::sleep_for(100ms); LOG_NONMEMBER << "test2: client " << i << " release"; check_release(a); lc[i]->release(a); diff --git a/log.cc b/log.cc index decb827..36f10a3 100644 --- a/log.cc +++ b/log.cc @@ -1,89 +1,31 @@ -#include "log.h" -#include "paxos.h" +#include "include/log.h" +#include "include/paxos.h" -// Paxos must maintain some durable state (i.e., that survives power -// failures) to run Paxos correct. This module implements a log with -// all durable state to run Paxos. Since the values chosen correspond -// to views, the log contains all views since the beginning of time. +// Maintains durable state (i.e. surviving power failures) needed for correct +// operation of Paxos as a log. -log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) { - name = "paxos-" + _me + ".log"; - logread(); -} +log::log(string _me) : name("paxos-" + _me + ".log") {} -void log::logread(void) { - std::ifstream from(name); +void log::replay() { + auto from = unmarshall(read()); string type; - unsigned instance; - LOG << "logread"; + DEBUG_LOG << "Replaying paxos log from disk"; while (from >> type) { - if (type == "done") { - string v; - from >> instance; - from.get(); - getline(from, v); - pxs->values[instance] = v; - pxs->instance_h = instance; - LOG << "logread: instance: " << instance << " w. v = " - << pxs->values[instance]; - pxs->accepted_value.clear(); - pxs->promise.n = 0; - pxs->accepted.n = 0; - } else if (type == "propseen") { - from >> pxs->promise.n >> pxs->promise.m; - LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")"; - } else if (type == "accepted") { - string v; - from >> pxs->accepted.n >> pxs->accepted.m; - from.get(); - getline(from, v); - pxs->accepted_value = v; - LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value; + if (handlers.count(type)) { + handlers[type](from); } else { - LOG << "logread: unknown log record"; + DEBUG_LOG << "unknown log record"; VERIFY(0); } - } - from.close(); -} - -string log::dump() { - std::ifstream from(name); - string res; - string v; - while (std::getline(from, v)) - res += v + "\n"; - from.close(); - return res; -} - -void log::restore(string s) { - LOG << "restore: " << s; - std::ofstream f(name, std::ios::trunc); - f << s; - f.close(); -} - -// XXX should be an atomic operation -void log::loginstance(unsigned instance, string v) { - std::ofstream f(name, std::ios::app); - f << "done " << instance << " " << v << "\n"; - f.close(); + } } -// an acceptor should call logprop(promise) when it -// receives a prepare to which it responds prepare_ok(). -void log::logprop(prop_t promise) { - std::ofstream f(name, std::ios::app); - f << "propseen " << promise.n << " " << promise.m << "\n"; - f.close(); +string log::read() { + return (std::stringstream() << std::ifstream(name).rdbuf()).str(); } -// an acceptor should call logaccept(accepted, accepted_value) when it -// receives an accept RPC to which it replies accept_ok(). -void log::logaccept(prop_t n, string v) { - std::ofstream f(name, std::ios::app); - f << "accepted " << n.n << " " << n.m << " " << v << "\n"; - f.close(); +void log::write(string s) { + DEBUG_LOG << s; + std::ofstream(name, std::ios::trunc) << s; } diff --git a/log.h b/log.h deleted file mode 100644 index 201cb80..0000000 --- a/log.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef log_h -#define log_h - -#include "types.h" -#include "paxos_protocol.h" - -class proposer_acceptor; - -class log { - private: - string name; - proposer_acceptor *pxs; - public: - log (proposer_acceptor*, string _me); - string dump(); - void restore(string s); - void logread(void); - // Log a committed paxos instance - void loginstance(unsigned instance, string v); - // Log the highest proposal number that the local paxos acceptor has ever seen - void logprop(prop_t n_h); - // Log the proposal (proposal number and value) that the local paxos acceptor - // accept has ever accepted - void logaccept(prop_t n_a, string v); -}; - -#endif diff --git a/marshall.py b/marshall.py new file mode 100755 index 0000000..c4936fa --- /dev/null +++ b/marshall.py @@ -0,0 +1,132 @@ +import os, numpy as np +from clang.cindex import * +import subprocess + +libt4_path = '/Users/peteriannucci/invirt/third/libt4/' +OPTFLAGS = ['-O3'] +STDLIB = ['-stdlib=libc++'] +PEDANTRY = ['-Werror', '-Weverything', '-Wall', '-Wextra', '-pedantic-errors', '-pedantic', + '-Wno-c++98-compat-pedantic', '-Wno-padded', '-Weffc++', + '-Wno-non-virtual-dtor', '-Wno-weak-vtables'] +CXXFLAGS = ['-x', 'c++', '-I', libt4_path, '-std=c++1y'] + STDLIB + PEDANTRY + OPTFLAGS +CXXFLAGS += ['-DMARSHALL_RAW_NETWORK_ORDER_AS(a,b)=static void macro_marshall_##a##_as_##b(a first, b second);'] + +target = 'paxos.cc' + +libdir = subprocess.Popen(['llvm-config-mp-3.4', '--libdir'], stdout=subprocess.PIPE).communicate()[0].strip() +Config.set_library_path(libdir) +index = Index.create() +tu = index.parse(os.path.join(libt4_path, target), args=CXXFLAGS) + +def dive_all(cursor, kinds, pred=None, depth_limit=-1): + if cursor.kind in kinds and (pred is None or pred(cursor)): + yield cursor + if depth_limit != 0: + for c in cursor.get_children(): + for d in dive_all(c, kinds, pred, depth_limit-1): + yield d + +def get_label(cursor): + try: + cursor = dive_all(cursor, (CursorKind.CONVERSION_FUNCTION,), lambda c: c.displayname == 'operator label()', 1).next() + cursor = dive_all(cursor, (CursorKind.STRING_LITERAL,)).next() + return eval(cursor.get_tokens().next().spelling) + except: + return None + +def get_members(cursor): + try: + adapter = dive_all(cursor, (CursorKind.CXX_METHOD,), lambda c: c.displayname == '_tuple_()', 1).next() + fields = {x.displayname:x for x in dive_all(cursor, (CursorKind.FIELD_DECL,))} + return [fields[m.displayname] for m in dive_all(adapter, (CursorKind.MEMBER_REF_EXPR,))] + except: + return None + +class Marshallable: + def __init__(self, m, u): + self.marshall = m + self.unmarshall = u + +def MarshallRawNetworkOrderAs(what, as_what): + as_what = np.dtype(as_what).newbyteorder('B') + def m(x): + return np.array([x], as_what).tostring() + def u(s): + return np.fromstring(s[:as_what.itemsize], as_what).astype(what)[0], s[as_what.itemsize:] + mu = Marshallable(m, u) + mu.__repr__ = lambda : '' % np.dtype(what).name + return mu + +def RecordClass(displayname, members): + member_names = [f.displayname for f in members] + member_types = [f.type.get_canonical().spelling for f in members] + member_types_nice = [f.type.spelling for f in members] + member_marshallers = [types[t] for t in member_types] + class Record(object): + def __init__(self, *args, **kwargs): + if args: + assert len(args) == len(member_names) + self.__dict__.update(dict(zip(member_names, args))) + else: + assert set(kwargs.keys()) == set(member_names) + self.__dict__.update(kwargs) + __init__.__doc__ = 'Required arguments:\n' + '\n'.join('%s (%s)' % (n,t) for t,n in zip(member_types_nice, member_names)) + @staticmethod + def marshall(self): + return ''.join(marshaller.marshall(getattr(self, name)) for name, marshaller in zip(member_names, member_marshallers)) + @staticmethod + def unmarshall(s): + member_values = [] + for marshaller in member_marshallers: + member_value, s = marshaller.unmarshall(s) + member_values.append(member_value) + return Record(**dict(zip(member_names, member_values))), s + def __repr__(self): + return '<%s ' % displayname + ' '.join('%s=%s' % (k, repr(getattr(self, k))) for k in member_names) + '>' + Record.__name__ = displayname + return Record + +class MarshallString: + def __init__(self): + pass + def marshall(self, s): + return types['unsigned int'].marshall(len(s)) + s + def unmarshall(self, s): + l, s = types['unsigned int'].unmarshall(s) + return s[:l], s[l:] + +types = {} +labels = {} +types['std::__1::basic_string, std::__1::allocator >'] = MarshallString() + +numpy_type = {'bool': np.bool, 'char': np.int8, 'signed char': np.int8, 'unsigned char': np.uint8, 'short': np.short, 'unsigned short': np.ushort, + 'int': np.intc, 'unsigned int': np.uintc, 'long': np.int_, 'unsigned long': np.uint, 'long long': np.longlong, 'unsigned long long': np.ulonglong} + +def processDeclaration(cursor): + if cursor.kind == CursorKind.ENUM_DECL: + enum_type = cursor.enum_type.get_canonical().spelling + if enum_type != '': + processDeclaration(cursor.enum_type.get_canonical().get_declaration()) + assert enum_type in types, enum_type + types[cursor.type.get_canonical().spelling] = types[enum_type] + elif cursor.kind == CursorKind.FUNCTION_DECL: + if cursor.displayname.startswith('macro_marshall_'): + what, as_what = [x.type.get_canonical().spelling for x in cursor.get_arguments()] + types[what] = MarshallRawNetworkOrderAs(numpy_type[what], numpy_type[as_what]) + else: + members = get_members(cursor) + label = get_label(cursor) + if members is not None: + for f in members: + field_type = f.type.get_canonical().spelling + processDeclaration(f.type.get_canonical().get_declaration()) + assert field_type in types, field_type + types[cursor.type.get_canonical().spelling] = RecordClass(cursor.displayname, members) + if label is not None: + labels[cursor.type.get_canonical().spelling] = label + +for cursor in dive_all(tu.cursor, (CursorKind.FUNCTION_DECL,)): + processDeclaration(cursor) + +for cursor in dive_all(tu.cursor, set([CursorKind.STRUCT_DECL, CursorKind.CLASS_DECL, CursorKind.ENUM_DECL])): + processDeclaration(cursor) diff --git a/paxos.cc b/paxos.cc index c7f2d1d..108dfad 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,6 +1,7 @@ -#include "paxos.h" +#include "include/paxos.h" using namespace std::placeholders; +using namespace std::chrono; paxos_change::~paxos_change() {} @@ -28,12 +29,24 @@ proposer_acceptor::proposer_acceptor(paxos_change *_delegate, bool _first, const node_t & _me, const value_t & _value) : delegate(_delegate), me (_me) { - // at this point, the log has already been replayed - if (instance_h == 0 && _first) { - values[1] = _value; - l.loginstance(1, _value); - instance_h = 1; - } + l.handler([this] (auto entry) { + instance_h = entry.number; + values[entry.number] = entry.value; + accepted = promise = {0, me}; + accepted_value.clear(); + }); + l.handler([this] (auto entry) { + promise = entry.promise; + }); + l.handler([this] (auto entry) { + accepted = entry.number; + accepted_value = entry.value; + }); + + if (instance_h == 0 && _first) + l.append(log_instance{1, _value}); + + l.replay(); pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this); pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this); @@ -96,7 +109,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, auto cl = rpcc::bind_cached(i); if (!cl) continue; - int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100), + int status = cl->call_timeout(paxos_protocol::preparereq, 100ms, res, me, instance, proposal); if (status == paxos_protocol::OK) { LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n @@ -124,7 +137,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, bool accept = false; for (auto i : nodes) { if (auto cl = rpcc::bind_cached(i)) { - int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100), + int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms, accept, me, instance, proposal, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); @@ -136,7 +149,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const int res = 0; for (auto i : accepts) if (auto cl = rpcc::bind_cached(i)) - cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v); + cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v); } paxos_protocol::status @@ -149,7 +162,7 @@ proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, } else if (n > promise) { LOG << "looks good to me"; promise = n; - l.logprop(promise); + l.append(log_proposal{n}); r = prepareres{prepareres::accept, accepted, accepted_value}; } else { LOG << "I totally rejected this request. Ha."; @@ -169,7 +182,7 @@ proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t if (n >= promise) { accepted = n; accepted_value = v; - l.logaccept(accepted, accepted_value); + l.append(log_accept{n, v}); r = true; } return paxos_protocol::OK; @@ -197,9 +210,9 @@ void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & LOG << "instance=" << instance << " has v=" << value; if (instance > instance_h) { LOG << "highestacceptedinstance = " << instance; - values[instance] = value; - l.loginstance(instance, value); + l.append(log_instance{instance, value}); instance_h = instance; + values[instance] = value; accepted = promise = {0, me}; accepted_value.clear(); if (delegate) { diff --git a/rpc/connection.cc b/rpc/connection.cc index 8966cc2..7a6371a 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,10 +1,10 @@ -#include "connection.h" -#include "rpc_protocol.h" +#include "include/rpc/connection.h" +#include "include/rpc/rpc_protocol.h" #include #include #include #include -#include "marshall.h" +#include "include/rpc/marshall.h" connection_delegate::~connection_delegate() {} @@ -24,13 +24,12 @@ connection::~connection() { if (dead_) return; dead_ = true; - shutdown(fd,SHUT_RDWR); + fd.shutdown(SHUT_RDWR); } // after block_remove_fd, select will never wait on fd and no callbacks // will be active global->shared_mgr.block_remove_fd(fd); - VERIFY(dead_); - VERIFY(wpdu_.status == unused); + VERIFY(dead_ && wpdu_.status == unused); } shared_ptr connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) { @@ -58,7 +57,7 @@ bool connection::send(const string & b) { if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) { IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd; - shutdown(fd,SHUT_RDWR); + fd.shutdown(SHUT_RDWR); } if (!writepdu()) { @@ -103,7 +102,7 @@ bool connection::writepdu() { if (wpdu_.cursor == wpdu_.buf.size()) return true; - ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor)); + ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor)); if (n < 0) { if (errno != EAGAIN) { IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno; @@ -189,8 +188,8 @@ bool connection::readpdu() { connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest) : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest) { - tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); - tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); + tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1}); + tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1}); tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}); tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}); diff --git a/rpc/marshall.h b/rpc/marshall.h deleted file mode 100644 index bf3cbc7..0000000 --- a/rpc/marshall.h +++ /dev/null @@ -1,244 +0,0 @@ -#ifndef marshall_h -#define marshall_h - -#include "types.h" -#include "rpc_protocol.h" - -class marshall { - private: - string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); - size_t index_ = rpc_protocol::RPC_HEADER_SZ; - - public: - template - marshall(const Args & ... args) { - (void)pass{(*this << args)...}; - } - - void write(const void *p, size_t n) { - if (index_+n > buf_.size()) - buf_.resize(index_+n); - std::copy((char *)p, (char *)p+n, &buf_[index_]); - index_ += n; - } - - // with header - inline operator string() const { return buf_.substr(0,index_); } - // without header - inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); } - - // letting S be a defaulted template parameter forces the compiler to - // delay looking up operator<<(marshall &, rpc_sz_t) until we define it - // (i.e. we define an operator for marshalling uint32_t) - template inline void - write_header(const T & h) { - VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ); - size_t saved_sz = index_; - index_ = 0; - *this << (S)(saved_sz - sizeof(S)) << (T)h; - index_ = saved_sz; - } -}; - -class unmarshall { - private: - string buf_; - size_t index_ = rpc_protocol::RPC_HEADER_SZ; - bool ok_ = false; - - public: - template - unmarshall(const string & s, bool has_header, Args && ... args) - : buf_(s) { - if (!has_header) - buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0); - ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ); - (void)pass{(*this >> args)...}; - } - - inline bool ok() const { return ok_; } - inline bool okdone() const { return ok_ && index_ == buf_.size(); } - - void read(void * t, size_t n) { - if (index_+n > buf_.size()) - ok_ = false; - if (ok_) { - std::copy(&buf_[index_], &buf_[index_+n], (char *)t); - index_ += n; - } - } - - template inline void - read_header(T & h) { - VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ); - // first 4 bytes hold length field - index_ = sizeof(rpc_protocol::rpc_sz_t); - *this >> h; - index_ = rpc_protocol::RPC_HEADER_SZ; - } - - template inline T _grab() { T t; *this >> t; return t; } -}; - -// -// Marshalling for plain old data -// - -#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \ -inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.write(&y, sizeof(_d_)); return m; } \ -inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } - -#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) - -MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t) -MARSHALL_RAW_NETWORK_ORDER(uint8_t) -MARSHALL_RAW_NETWORK_ORDER(int8_t) -MARSHALL_RAW_NETWORK_ORDER(uint16_t) -MARSHALL_RAW_NETWORK_ORDER(int16_t) -MARSHALL_RAW_NETWORK_ORDER(uint32_t) -MARSHALL_RAW_NETWORK_ORDER(int32_t) -MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t) -MARSHALL_RAW_NETWORK_ORDER(uint64_t) -MARSHALL_RAW_NETWORK_ORDER(int64_t) - -// -// Marshalling for tuples (used to implement marshalling for structs) -// - -// In order to iterate over the tuple elements, we first need a template -// parameter pack containing the tuple's indices. The function templates named -// *_imp below accept an empty tag struct as their last argument, and use its -// template arguments to index the tuple. The operator<< overloads instantiate -// the appropriate tag struct to make this possible. - -template inline marshall & -tuple_marshall_imp(marshall & m, tuple & t, std::index_sequence) { - // Note that brace initialization is used for the empty structure "pack", - // forcing the comma-separated expressions expanded from the parameter pack - // to be evaluated in order. Order matters because the elements must be - // serialized consistently! The empty struct resulting from construction - // is discarded. - (void)pass{(m << std::get(t))...}; - return m; -} - -template inline marshall & -operator<<(marshall & m, tuple && t) { - return tuple_marshall_imp(m, t, std::index_sequence_for{}); -} - -template inline unmarshall & -tuple_unmarshall_imp(unmarshall & u, tuple t, std::index_sequence) { - (void)pass{(u >> std::get(t))...}; - return u; -} - -template inline unmarshall & -operator>>(unmarshall & u, tuple && t) { - return tuple_unmarshall_imp(u, t, std::index_sequence_for{}); -} - -// -// Marshalling for structs or classes containing a MEMBERS declaration -// - -// Implements struct marshalling via tuple marshalling of members. -template inline typename -enable_if::value, unmarshall>::type & -operator>>(unmarshall & u, T & a) { return u >> a._tuple_(); } - -template inline typename -enable_if::value, marshall>::type & -operator<<(marshall & m, const T a) { return m << a._tuple_(); } - -// -// Marshalling for STL containers -// - -// this overload is visible for type A only if A::cbegin and A::cend exist -template inline typename -enable_if::value, marshall>::type & -operator<<(marshall & m, const A & x) { - m << (uint32_t)x.size(); - for (const auto & a : x) - m << a; - return m; -} - -// visible for type A if A::emplace_back(a) makes sense -template inline typename -enable_if::value, unmarshall>::type & -operator>>(unmarshall & u, A & x) { - uint32_t n = u._grab(); - x.clear(); - while (n--) - x.emplace_back(u._grab()); - return u; -} - -// std::pair -template inline marshall & -operator<<(marshall & m, const std::pair & d) { - return m << d.first << d.second; -} - -template inline unmarshall & -operator>>(unmarshall & u, std::pair & d) { - return u >> d.first >> d.second; -} - -// std::map -template inline unmarshall & -operator>>(unmarshall & u, std::map & x) { - uint32_t n = u._grab(); - x.clear(); - while (n--) - x.emplace(u._grab>()); - return u; -} - -// std::string -inline marshall & operator<<(marshall & m, const string & s) { - m << (uint32_t)s.size(); - m.write(s.data(), s.size()); - return m; -} - -inline unmarshall & operator>>(unmarshall & u, string & s) { - uint32_t sz = u._grab(); - if (u.ok()) { - s.resize(sz); - u.read(&s[0], sz); - } - return u; -} - -// -// Marshalling for strongly-typed enums -// - -template typename enable_if::value, marshall>::type & -operator<<(marshall & m, E e) { - return m << from_enum(e); -} - -template typename enable_if::value, unmarshall>::type & -operator>>(unmarshall & u, E & e) { - e = to_enum(u._grab>()); - return u; -} - -// -// Recursive marshalling -// - -inline marshall & operator<<(marshall & m, marshall & n) { - return m << n.content(); -} - -inline unmarshall & operator>>(unmarshall & u, unmarshall & v) { - v = unmarshall(u._grab(), false); - return u; -} - -#endif diff --git a/rpc/poll_mgr.cc b/rpc/poll_mgr.cc index 102b8dc..a3ab08a 100644 --- a/rpc/poll_mgr.cc +++ b/rpc/poll_mgr.cc @@ -1,8 +1,8 @@ -#include "poll_mgr.h" +#include "include/rpc/poll_mgr.h" #include #include -#include "file.h" -#include "threaded_log.h" +#include "include/rpc/file.h" +#include "include/debug.h" #ifdef __linux__ #include diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 8f868f5..e33b25e 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -52,7 +52,7 @@ // x exited worker threads). // -#include "rpc.h" +#include "include/rpc/rpc.h" #include #include @@ -106,7 +106,7 @@ shared_ptr rpcc::bind_cached(const string & destination) { lock cl = lock(client->bind_m_); if (!client->bind_done_) { LOG_NONMEMBER << "bind(\"" << destination << "\")"; - int ret = client->bind(milliseconds(1000)); + int ret = client->bind(1000ms); if (ret < 0) { LOG_NONMEMBER << "bind failure! " << destination << " " << ret; client.reset(); @@ -149,7 +149,7 @@ int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, st caller ca(0, &rep); xid_t xid_rep; - marshall datagram = req; + string datagram; { lock ml(m_); @@ -164,9 +164,9 @@ int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, st ca.xid = xid_++; calls_[ca.xid] = &ca; - datagram.write_header(rpc_protocol::request_header{ + datagram = marshall::datagram(rpc_protocol::request_header{ ca.xid, proc.id, clt_nonce_, srv_nonce_, xid_rep_window_.front() - }); + }, req); xid_rep = xid_rep_window_.front(); } @@ -272,11 +272,9 @@ void rpcc::get_latest_connection(shared_ptr & ch) { // Runs in poll_mgr's thread as an upcall from the connection object to the // rpcc. Does not call blocking RPC handlers. bool rpcc::got_pdu(const shared_ptr &, const string & b) { - unmarshall rep(b, true); rpc_protocol::reply_header h; - rep.read_header(h); - if (!rep.ok()) { + if (!unmarshall::datagram(b, h)) { IF_LEVEL(1) LOG << "unmarshall header failed!!!"; return true; } @@ -354,21 +352,20 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { } void rpcs::dispatch(shared_ptr c, const string & buf) { - unmarshall req(buf, true); - rpc_protocol::request_header h; - req.read_header(h); - proc_id_t proc = h.proc; - if (!req.ok()) { + auto req = unmarshall::datagram(buf, h); + + if (!req) { IF_LEVEL(1) LOG << "unmarshall header failed"; return; } + proc_id_t proc = h.proc; + IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep " << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; - marshall rep; rpc_protocol::reply_header rh{h.xid,0}; // is client sending to an old instance of server? @@ -376,8 +373,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce << " (current " << nonce_ << ") proc " << std::hex << proc; rh.ret = rpc_protocol::oldsrv_failure; - rep.write_header(rh); - c->send(rep); + c->send(marshall::datagram(rh)); return; } @@ -418,17 +414,19 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, stored_reply)) { case NEW: // new request - rh.ret = (*f)(std::forward(req), rep); - if (rh.ret == rpc_protocol::unmarshall_args_failure) { - LOG << "failed to unmarshall the arguments. You are " - << "probably calling RPC 0x" << std::hex << proc << " with the wrong " - << "types of arguments."; - VERIFY(0); - } - VERIFY(rh.ret >= 0); + { + marshall rep; + rh.ret = (*f)(std::forward(req), rep); + if (rh.ret == rpc_protocol::unmarshall_args_failure) { + LOG << "failed to unmarshall the arguments. You are " + << "probably calling RPC 0x" << std::hex << proc << " with the wrong " + << "types of arguments."; + VERIFY(0); + } + VERIFY(rh.ret >= 0); - rep.write_header(rh); - stored_reply = rep; + stored_reply = marshall::datagram(rh, rep); + } IF_LEVEL(2) LOG << "sending and saving reply of size " << stored_reply.size() << " for rpc " << h.xid << ", proc " << std::hex << proc << " ret " << std::dec @@ -453,8 +451,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { case FORGOTTEN: // very old request and we don't have the response anymore IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce; rh.ret = rpc_protocol::atmostonce_failure; - rep.write_header(rh); - c->send(rep); + c->send(marshall::datagram(rh)); break; } } diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 0435ab1..968fc22 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -1,13 +1,13 @@ // RPC test and pseudo-documentation. // generates print statements on failures, but eventually says "rpctest OK" -#include "types.h" -#include "rpc.h" +#include "include/types.h" +#include "include/rpc/rpc.h" #include #include #include #include -#include "threaded_log.h" +#include "include/debug.h" #define NUM_CL 2 @@ -21,19 +21,8 @@ using std::endl; using namespace std::chrono; using std::vector; -// server-side handlers. they must be methods of some class -// to simplify rpcs::reg(). a server process can have handlers -// from multiple classes. -class srv { - public: - int handle_22(string & r, const string a, const string b); - int handle_fast(int & r, const int a); - int handle_slow(int & r, const int a); - int handle_bigrep(string & r, const size_t a); -}; - namespace srv_protocol { - using status = rpc_protocol::status; + enum status : rpc_protocol::status {OK}; REMOTE_PROCEDURE_BASE(0); REMOTE_PROCEDURE(22, _22, (string &, string, string)); REMOTE_PROCEDURE(23, fast, (int &, int)); @@ -41,6 +30,17 @@ namespace srv_protocol { REMOTE_PROCEDURE(25, bigrep, (string &, size_t)); } +// server-side handlers. they must be methods of some class +// to simplify rpcs::reg(). a server process can have handlers +// from multiple classes. +class srv { + public: + srv_protocol::status handle_22(string & r, const string a, const string b); + srv_protocol::status handle_fast(int & r, const int a); + srv_protocol::status handle_slow(int & r, const int a); + srv_protocol::status handle_bigrep(string & r, const size_t a); +}; + // a handler. a and b are arguments, r is the result. // there can be multiple arguments but only one result. // the caller also gets to see the int return value @@ -48,26 +48,26 @@ namespace srv_protocol { // rpcs::reg() decides how to unmarshall by looking // at these argument types, so this function definition // does what a .x file does in SunRPC. -int srv::handle_22(string & r, const string a, string b) { +srv_protocol::status srv::handle_22(string & r, const string a, string b) { r = a + b; - return 0; + return srv_protocol::OK; } -int srv::handle_fast(int & r, const int a) { +srv_protocol::status srv::handle_fast(int & r, const int a) { r = a + 1; - return 0; + return srv_protocol::OK; } -int srv::handle_slow(int & r, const int a) { - int us = std::uniform_int_distribution<>(0,500)(global->random_generator); - std::this_thread::sleep_for(microseconds(us)); +srv_protocol::status srv::handle_slow(int & r, const int a) { + auto duration = std::uniform_int_distribution<>(0,500)(global->random_generator) * 1us; + std::this_thread::sleep_for(duration); r = a + 2; - return 0; + return srv_protocol::OK; } -int srv::handle_bigrep(string & r, const size_t len) { +srv_protocol::status srv::handle_bigrep(string & r, const size_t len) { r = string(len, 'x'); - return 0; + return srv_protocol::OK; } static srv service; @@ -84,8 +84,7 @@ static void startserver() { static void testmarshall() { marshall m; rpc_protocol::request_header rh{1,2,3,4,5}; - m.write_header(rh); - VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ); + VERIFY(marshall::datagram(rh, m).size()==rpc_protocol::RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; size_t sz = 101010101; @@ -97,12 +96,11 @@ static void testmarshall() { m << sz; m << bin; - string b = m; - VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size()); + string b = marshall::datagram(rh, m); + VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint64_t)+sizeof(uint32_t)+bin.size()); - unmarshall un(b, true); rpc_protocol::request_header rh1; - un.read_header(rh1); + auto un = unmarshall::datagram(b, rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; unsigned long long l1; @@ -143,7 +141,7 @@ static void client1(size_t cl) { int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg); auto end = steady_clock::now(); - auto diff = duration_cast(end - start).count(); + auto diff = (end - start) / 1ms; if (ret != 0) cout << diff << " ms have elapsed!!!" << endl; VERIFY(ret == 0); @@ -172,7 +170,7 @@ static void client3(void *xx) { for(int i = 0; i < 4; i++){ int rep = 0; - int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i); + int ret = c->call_timeout(srv_protocol::slow, 300ms, rep, i); VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2); } } @@ -190,14 +188,14 @@ static void simple_tests(rpcc *c) { cout << " -- string concat RPC .. ok" << endl; // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul); + intret = c->call_timeout(srv_protocol::bigrep, 20000ms, rep, 70000ul); VERIFY(intret == 0); VERIFY(rep.size() == 70000); cout << " -- small request, big reply .. ok" << endl; // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77); + intret = c->call_timeout(srv_protocol::fast, 300ms, xx, 77); VERIFY(intret == 0 && xx == 78); cout << " -- no spurious timeout .. ok" << endl; @@ -205,7 +203,7 @@ static void simple_tests(rpcc *c) { { string arg(1000, 'x'); string rep2; - c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x"); + c->call_timeout(srv_protocol::_22, 300ms, rep2, arg, (string)"x"); VERIFY(rep2.size() == 1001); cout << " -- no spurious timeout .. ok" << endl; } @@ -221,7 +219,7 @@ static void simple_tests(rpcc *c) { string non_existent = "127.0.0.1:7661"; rpcc *c1 = new rpcc(non_existent); time_t t0 = time(0); - intret = c1->bind(milliseconds(300)); + intret = c1->bind(300ms); time_t t1 = time(0); VERIFY(intret < 0 && (t1 - t0) <= 4); cout << " -- rpc timeout .. ok" << endl; @@ -283,7 +281,7 @@ static void failure_test() { delete server; client1 = new rpcc(*dst); - VERIFY (client1->bind(milliseconds(3000)) < 0); + VERIFY (client1->bind(3000ms) < 0); cout << " -- create new client and try to bind to failed server .. failed ok" << endl; delete client1; @@ -421,5 +419,5 @@ int main(int argc, char *argv[]) { } while (1) - std::this_thread::sleep_for(milliseconds(100)); + std::this_thread::sleep_for(100ms); } diff --git a/rpc/thread_pool.cc b/rpc/thread_pool.cc index 2c4ab06..1c104d8 100644 --- a/rpc/thread_pool.cc +++ b/rpc/thread_pool.cc @@ -1,4 +1,4 @@ -#include "thread_pool.h" +#include "include/rpc/thread_pool.h" thread_pool::thread_pool(size_t sz) : th_(sz) { for (auto & t : th_) diff --git a/rsm.cc b/rsm.cc index c651e86..7104a09 100644 --- a/rsm.cc +++ b/rsm.cc @@ -78,11 +78,12 @@ // The rule is that a module releases its internal locks before it // upcalls, but can keep its locks when calling down. -#include "rsm.h" -#include "rsm_client.h" +#include "include/rsm.h" +#include "include/rsm_client.h" #include using std::vector; +using namespace std::chrono; rsm_state_transfer::~rsm_state_transfer() {} @@ -130,7 +131,7 @@ void rsm::recovery() { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary? + std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary? ml.lock(); } } @@ -174,7 +175,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { rsm_mutex_lock.lock(); // Start accepting synchronization request (statetransferreq) now! insync = true; - cfg->get_view(vid_insync, backups); + backups = cfg->get_view(vid_insync); backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr())); LOG << "backups " << backups; sync_cond.wait(rsm_mutex_lock); @@ -208,7 +209,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.unlock(); cl = rpcc::bind_cached(m); if (cl) { - ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100), + ret = cl->call_timeout(rsm_protocol::transferreq, 100ms, r, cfg->myaddr(), last_myvs, vid_insync); } rsm_mutex_lock.lock(); @@ -218,7 +219,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) return false; } if (stf && last_myvs != r.last) { - stf->unmarshal_state(r.state); + stf->unmarshall_state(r.state); } last_myvs = r.last; LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")"; @@ -247,7 +248,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); auto cl = rpcc::bind_cached(m); if (cl) - ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs); + ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs); rsm_mutex_lock.lock(); if (cl == 0 || ret != rsm_protocol::OK) { @@ -290,8 +291,8 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r handler *h = procs[procno]; VERIFY(h); marshall rep; - auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep); - r = marshall(ret, rep.content()).content(); + auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep); + r = marshall(ret, rep); } static void logHexString(locked_ostream && log, const string & s) { @@ -322,23 +323,23 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id if (primary != myaddr) return rsm_client_protocol::NOTPRIMARY; LOG << "Assigning a viewstamp"; - cfg->get_view(vid_commit, m); + m = cfg->get_view(vid_commit); // assign the RPC the next viewstamp number vs = myvs; myvs++; } - // send an invoke RPC to all slaves in the current view with a timeout of 1 second + // send an invoke RPC to all slaves in the current view with a timeout LOG << "Invoking slaves"; - for (unsigned i = 0; i < m.size(); i++) { - if (m[i] != myaddr) { + for (auto & mm : m) { + if (mm != myaddr) { // if invoke on slave fails, return rsm_client_protocol::BUSY - LOG << "Sending invoke to " << m[i]; - auto cl = rpcc::bind_cached(m[i]); + LOG << "Sending invoke to " << mm; + auto cl = rpcc::bind_cached(mm); if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; - auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req); + auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req); LOG << "Invoke returned " << ret; if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; @@ -364,7 +365,6 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) { LOG << "invoke procno 0x" << std::hex << proc; lock ml(invoke_mutex); - vector m; string myaddr; { lock ml2(rsm_mutex); @@ -377,7 +377,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp myaddr = cfg->myaddr(); if (primary == myaddr) return rsm_protocol::ERR; - cfg->get_view(vid_commit, m); + vector m = cfg->get_view(vid_commit); if (std::find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number @@ -404,7 +404,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const strin if (!insync || vid != vid_insync) return rsm_protocol::BUSY; if (stf && last != last_myvs) - r.state = stf->marshal_state(); + r.state = stf->marshall_state(); r.last = last_myvs; return rsm_protocol::OK; } @@ -465,9 +465,8 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last // primary failure // rsm_client_protocol::status rsm::client_members(vector & r, int) { - vector m; lock ml(rsm_mutex); - cfg->get_view(vid_commit, m); + vector m = cfg->get_view(vid_commit); m.push_back(primary); r = m; LOG << "return " << m << " m " << primary; @@ -478,9 +477,7 @@ rsm_client_protocol::status rsm::client_members(vector & r, int) { // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - vector c, p; - cfg->get_view(vid, c); - cfg->get_view(vid - 1, p); + vector c = cfg->get_view(vid), p = cfg->get_view(vid - 1); VERIFY (c.size() > 0); if (isamember(primary,c)) { @@ -489,9 +486,9 @@ void rsm::set_primary(unsigned vid) { } VERIFY(p.size() > 0); - for (unsigned i = 0; i < p.size(); i++) { - if (isamember(p[i], c)) { - primary = p[i]; + for (auto & pp : p) { + if (isamember(pp, c)) { + primary = pp; LOG << "primary is " << primary; return; } @@ -509,12 +506,10 @@ bool rsm::amiprimary() { void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { VERIFY(rsm_mutex_lock); - vector m; - cfg->get_view(vid_commit, m); - for (unsigned i = 0; i < m.size(); i++) { - if (m[i] != cfg->myaddr()) { - LOG << "member " << m[i] << " " << heal; - if (auto cl = rpcc::bind_cached(m[i])) + for (auto & mm : cfg->get_view(vid_commit)) { + if (mm != cfg->myaddr()) { + LOG << "member " << mm << " " << heal; + if (auto cl = rpcc::bind_cached(mm)) cl->set_reachable(heal); } } diff --git a/rsm_client.cc b/rsm_client.cc index 4a484ae..b24f056 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -1,7 +1,9 @@ -#include "rsm_client.h" +#include "include/rsm_client.h" #include #include +using namespace std::chrono; + rsm_client::rsm_client(string dst) : primary(dst) { LOG << "create rsm_client"; lock ml(rsm_client_mutex); @@ -9,12 +11,7 @@ rsm_client::rsm_client(string dst) : primary(dst) { LOG << "done"; } -void rsm_client::primary_failure(lock &) { - primary = known_mems.back(); - known_mems.pop_back(); -} - -rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) { +rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) { lock ml(rsm_client_mutex); while (1) { LOG << "proc " << std::hex << proc << " primary " << primary; @@ -24,7 +21,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s auto cl = rpcc::bind_cached(prim); auto ret = rsm_client_protocol::OK; if (cl) - ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req); + ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req); ml.lock(); if (!cl) @@ -35,7 +32,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { LOG << "rsm is busy " << prim; - std::this_thread::sleep_for(milliseconds(300)); + std::this_thread::sleep_for(300ms); continue; } if (ret == rsm_client_protocol::NOTPRIMARY) { @@ -45,7 +42,8 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s } prim_fail: LOG << "primary " << prim << " failed ret " << std::dec << ret; - primary_failure(ml); + primary = known_mems.back(); + known_mems.pop_back(); LOG << "retry new primary " << prim; } } @@ -57,14 +55,13 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) { shared_ptr cl; { rsm_client_mutex_lock.unlock(); - cl = rpcc::bind_cached(prim); - if (cl) - ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0); + if ((cl = rpcc::bind_cached(prim))) + ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0); rsm_client_mutex_lock.lock(); } - if (cl == 0 || ret != rsm_protocol::OK) + if (ret != rsm_protocol::OK) return false; - if (known_mems.size() < 1) { + if (!known_mems.size()) { LOG << "do not know any members!"; VERIFY(0); } diff --git a/rsm_client.h b/rsm_client.h deleted file mode 100644 index 32dde43..0000000 --- a/rsm_client.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef rsm_client_h -#define rsm_client_h - -#include "types.h" -#include "rsm_protocol.h" - -// -// rsm client interface. -// -// The client stubs package up an rpc, and then call the invoke procedure -// on the replicated state machine passing the RPC as an argument. This way -// the replicated state machine isn't service specific; any server can use it. -// - -class rsm_client { - protected: - string primary; - std::vector known_mems; - std::mutex rsm_client_mutex; - void primary_failure(lock & rsm_client_mutex_lock); - bool init_members(lock & rsm_client_mutex_lock); - rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req); - template int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req); - public: - rsm_client(string dst); - - template - inline int call(rpc_protocol::proc_checked_t

proc, R & r, const Args & ...a1) { - static_assert(is_valid_call::value, "RSM method invoked with incorrect argument types"); - return call_marshalled(proc, r, marshall(a1...)); - } -}; - -inline string hexify(const string & s) { - string bytes; - for (char ch : s) { - bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]); - bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]); - } - return bytes; -} - -template -int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) { - string rep; - int intret = invoke(proc.id, rep, req.content()); - VERIFY( intret == rsm_client_protocol::OK ); - unmarshall u(rep, false, intret); - if (intret < 0) return intret; - string res; - u >> res; - if (!u.okdone()) { - LOG << "failed to unmarshall the reply."; - LOG << "You probably forgot to set the reply string in " - << "rsm::client_invoke, or you may have called RPC " - << proc.name << " (0x" << std::hex << proc.id << ") with the wrong return type"; - LOG << "here's what I got: \"" << hexify(rep) << "\""; - VERIFY(0); - return rpc_protocol::unmarshall_reply_failure; - } - if(!unmarshall(res, false, r).okdone()) { - LOG << "failed to unmarshall the reply."; - LOG << "You are probably calling RPC " << proc.name << " (0x" - << std::hex << proc.id << ") with the wrong return type."; - LOG << "here's what I got: \"" << hexify(res) << "\""; - VERIFY(0); - return rpc_protocol::unmarshall_reply_failure; - } - return intret; -} - -#endif diff --git a/rsm_tester.cc b/rsm_tester.cc index 5507f80..42aaebe 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -2,9 +2,9 @@ // RSM test client // -#include "types.h" -#include "rsm_protocol.h" -#include "rsmtest_client.h" +#include "include/types.h" +#include "include/rsm_protocol.h" +#include "include/rsmtest_client.h" int main(int argc, char *argv[]) { global = new t4_state('t'); @@ -15,13 +15,11 @@ int main(int argc, char *argv[]) { rsmtest_client *lc = new rsmtest_client(argv[1]); string command(argv[2]); - if (command == "partition") { + if (command == "partition") LOG_NONMEMBER << "net_repair returned " << lc->net_repair(std::stoi(argv[3])); - } else if (command == "breakpoint") { - int b = std::stoi(argv[3]); - LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b); - } else { + else if (command == "breakpoint") + LOG_NONMEMBER << "breakpoint " << argv[3] << " returned " << lc->breakpoint(std::stoi(argv[3])); + else LOG_NONMEMBER << "Unknown command " << argv[2]; - } return 0; } diff --git a/rsmtest_client.cc b/rsmtest_client.cc index e9c8001..48744d3 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -1,23 +1,21 @@ // RPC stubs for clients to talk to rsmtest_server -#include "rsmtest_client.h" +#include "include/rsmtest_client.h" #include rsmtest_client::rsmtest_client(string dst) { if (!(cl = rpcc::bind_cached(dst))) - LOG << "rsmtest_client: call bind"; + LOG << "could not bind to " << dst; } rsm_test_protocol::status rsmtest_client::net_repair(int heal) { rsm_test_protocol::status r = rsm_test_protocol::ERR; - auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::net_repair, r, heal); - VERIFY (ret == rsm_test_protocol::OK); + VERIFY (cl->call(rsm_test_protocol::net_repair, r, heal) == rsm_test_protocol::OK); return r; } rsm_test_protocol::status rsmtest_client::breakpoint(int b) { rsm_test_protocol::status r = rsm_test_protocol::ERR; - auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::breakpoint, r, b); - VERIFY (ret == rsm_test_protocol::OK); + VERIFY (cl->call(rsm_test_protocol::breakpoint, r, b) == rsm_test_protocol::OK); return r; } diff --git a/t4.cc b/t4.cc index 834136b..9b90f5d 100644 --- a/t4.cc +++ b/t4.cc @@ -1,6 +1,6 @@ -#include "t4.h" +#include "include/t4.h" #include -#include "rpc/rpc.h" +#include "include/rpc/rpc.h" using namespace std::chrono; @@ -9,7 +9,7 @@ t4_state *global; t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) { uint32_t seed = std::random_device()(); auto time = system_clock::now().time_since_epoch(); - auto ticks = duration_cast(time).count(); + auto ticks = time / 1ns; seed ^= (uint32_t)ticks; auto pid = getpid(); seed ^= (uint32_t)pid; @@ -17,7 +17,7 @@ t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) { seed ^= (uint32_t)tid; random_generator.seed(seed); // make sure the clock will read differently next time! - std::this_thread::sleep_for(microseconds(1)); + std::this_thread::sleep_for(1us); } t4_state::~t4_state() { -- 1.7.9.5