-CXXFLAGS ?= -g -MMD -Werror -I. -std=c++1y
-LDFLAGS ?=
-CXX ?= g++
-CC ?= g++
-EXTRA_TARGETS ?=
+CLANGXX ?= clang++
+GXX ?= g++
+USE_CLANG ?= 1
+CC = $(CXX)
-all: lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
+OPTFLAGS = -O3
+CXXFLAGS = -ggdb3 -MMD -I. -std=c++1y $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS)
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
- rm -f $@
- ar cq $@ $^
- ranlib rpc/librpc.a
+POSTBUILD ?=
-rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a t4.o
+PRODUCTS = lock_server lock_tester rsm_tester rpc/rpctest
-lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o rpc/librpc.a t4.o
+ifeq "$(USE_CLANG)" "1"
-lock_server : lock_smain.o threaded_log.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
+PEDANTRY = -Werror -Weverything -Wall -Wextra -pedantic-errors -pedantic \
+ -Wno-c++98-compat-pedantic -Wno-padded -Weffc++ \
+ -Wno-non-virtual-dtor -Wno-weak-vtables
+STDLIB = -stdlib=libc++
+CXX = $(CLANGXX)
-rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a t4.o
+else
-%.o: %.cc
- $(CXX) $(CXXFLAGS) -c $< -o $@
+PEDANTRY = -pedantic -Wall -Wextra -fno-default-inline -Werror
+STDLIB = -pthread
+CXX = $(GXX)
+
+endif
+
+all: $(PRODUCTS) $(POSTBUILD)
+
+LIBRPC_OBJECTS = rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
+
+rpc/librpc.a: $(foreach x,$(LIBRPC_OBJECTS),rpc/librpc.a($(x)))
+
+rpc/rpctest: rpc/rpctest.o debug.o rpc/librpc.a t4.o
+
+lock_tester : lock_tester.o lock_client.o debug.o rsm_client.o rpc/librpc.a t4.o
+
+lock_server : lock_smain.o debug.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a t4.o
+
+rsm_tester: rsm_tester.o rsmtest_client.o debug.o rpc/librpc.a t4.o
-include *.d
-include rpc/*.d
clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester rsm_tester
.PHONY: clean $(EXTRA_TARGETS)
-clean:
- rm -rf $(clean_files)
+clean:
+ -rm -rf $(clean_files)
-USE_CLANG = 1
+CLANGXX = clang++-mp-3.4
+GXX = g++-mp-4.8
-PEDANTRY =
-STDLIB =
-OPTFLAGS = -O3 #-fno-omit-frame-pointer -fsanitize=address ,thread,undefined -fsanitize-memory-track-origins
-CXXFLAGS = -std=c++1y -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
-LDFLAGS = -std=c++1y $(STDLIB) $(OPTFLAGS)
-
-ifeq "$(USE_CLANG)" "1"
-
-PEDANTRY += \
- -Weverything -pedantic-errors -Werror -Wno-c++98-compat-pedantic \
- -Wno-padded -pedantic -Wall -Wextra -Weffc++
-STDLIB += -stdlib=libc++
-CXX = clang++-mp-3.4
-
-else
-
-PEDANTRY += -pedantic -Wall -Wextra -fno-default-inline -Werror
-STDLIB += -pthread
-CXX = g++-mp-4.8
-
-endif
-
-CC := $(CXX)
-EXTRA_TARGETS = signatures
+POSTBUILD = signatures
socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
signatures : lock_server lock_tester rpc/rpctest
- echo $^ | sudo xargs -n 1 $(socketfilterfw) -s || true
- echo $^ | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
+ echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) -s || true
+ echo $(realpath $^) | sudo xargs -n 1 $(socketfilterfw) --unblockapp || true
-#include "config.h"
+#include "include/config.h"
using std::vector;
-
-// The config module maintains views. As a node joins or leaves a
-// view, the next view will be the same as previous view, except with
-// the new node added or removed. The first view contains only node
-// 1. If node 2 joins after the first node (it will download the views
-// from node 1), it will learn about view 1 with the first node as the
-// only member. It will then invoke Paxos to create the next view.
-// It will tell Paxos to ask the nodes in view 1 to agree on the value
-// {1, 2}. If Paxos returns success, then it moves to view 2 with
-// {1,2} as the members. When node 3 joins, the config module runs
-// Paxos with the nodes in view 2 and the proposed value to be
-// {1,2,3}. And so on. When a node discovers that some node of the
-// current view is not responding, it kicks off Paxos to propose a new
-// value (the current view minus the node that isn't responding). The
-// config module uses Paxos to create a total order of views, and it
-// is ensured that the majority of the previous view agrees to the
-// next view. The Paxos log contains all the values (i.e., views)
-// agreed on.
+using namespace std::chrono;
+
+// The config module maintains views. As a node joins or leaves a view, the
+// next view will be the same as previous view, except with the new node added
+// or removed. The first view contains only node 1. If node 2 joins after the
+// first node (it will download the views from node 1), it will learn about
+// view 1 with the first node as the only member. It will then invoke Paxos to
+// create the next view. It will tell Paxos to ask the nodes in view 1 to
+// agree on the value {1, 2}. If Paxos returns success, then it moves to view
+// 2 with {1,2} as the members. When node 3 joins, the config module runs Paxos
+// with the nodes in view 2 and the proposed value to be {1,2,3}. And so on.
+// When a node discovers that some node of the current view is not responding,
+// it kicks off Paxos to propose a new value (the current view minus the node
+// that isn't responding). The config module uses Paxos to create a total order
+// of views, and it is ensured that the majority of the previous view agrees to
+// the next view. The Paxos log contains all the values (i.e., views) agreed
+// on.
//
-// The RSM module informs config to add nodes. The config module
-// runs a heartbeater thread that checks in with nodes. If a node
-// doesn't respond, the config module will invoke Paxos's proposer to
-// remove the node. Higher layers will learn about this change when a
-// Paxos acceptor accepts the new proposed value through
-// paxos_commit().
+// The RSM module informs config to add nodes. The config module runs a
+// heartbeater thread that checks in with nodes. If a node doesn't respond,
+// the config module will invoke Paxos's proposer to remove the node. Higher
+// layers will learn about this change when a Paxos acceptor accepts the new
+// proposed value through paxos_commit().
//
-// To be able to bring other nodes up to date to the latest formed
-// view, each node will have a complete history of all view numbers
-// and their values that it knows about. At any time a node can reboot
-// and when it re-joins, it may be many views behind; by remembering
-// all views, the other nodes can bring this re-joined node up to
-// date.
+// To be able to bring other nodes up to date to the latest formed view, each
+// node will have a complete history of all view numbers and their values that
+// it knows about. At any time a node can reboot and when it re-joins, it may
+// be many views behind; by remembering all views, the other nodes can bring
+// this re-joined node up to date.
config_view_change::~config_view_change() {}
reconstruct(cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> & m) {
+vector<string> config::get_view(unsigned instance) {
lock cfg_mutex_lock(cfg_mutex);
- get_view(instance, m, cfg_mutex_lock);
+ return get_view(instance, cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
+vector<string> config::get_view(unsigned instance, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
string value = paxos.value(instance);
LOG << "get_view(" << instance << "): returns " << value;
- m = explode(value);
+ return explode(value);
}
void config::reconstruct(lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
my_view_id = paxos.instance();
if (my_view_id > 0) {
- get_view(my_view_id, mems, cfg_mutex_lock);
+ mems = get_view(my_view_id, cfg_mutex_lock);
LOG << "view " << my_view_id << " " << mems;
}
}
bool config::ismember(const string & m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
- vector<string> v;
- get_view(vid, v, cfg_mutex_lock);
- return isamember(m, v);
+ return isamember(m, get_view(vid, cfg_mutex_lock));
}
bool config::add(const string & new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
while (1) {
- auto next_timeout = steady_clock::now() + milliseconds(300);
+ auto next_timeout = steady_clock::now() + 300ms;
LOG << "go to sleep";
config_cond.wait_until(cfg_mutex_lock, next_timeout);
unsigned vid = my_view_id;
- vector<string> cmems;
- get_view(vid, cmems, cfg_mutex_lock);
+ vector<string> cmems = get_view(vid, cfg_mutex_lock);
LOG << "current membership " << cmems;
if (!isamember(me, cmems)) {
}
}
-paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
+paxos_protocol::status config::heartbeat(paxos_protocol::view_t & r, string m, paxos_protocol::view_t vid) {
lock cfg_mutex_lock(cfg_mutex);
- r = (int) my_view_id;
- LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
+ r = my_view_id;
+ LOG << "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
if (vid == my_view_id)
return paxos_protocol::OK;
else if (paxos.isrunning()) {
config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
- unsigned vid = my_view_id;
+ paxos_protocol::view_t vid = my_view_id, r;
LOG << "heartbeat to " << m << " (" << vid << ")";
cfg_mutex_lock.unlock();
- int r = 0, ret = rpc_protocol::bind_failure;
+ int ret = rpc_protocol::bind_failure;
if (auto cl = rpcc::bind_cached(m))
- ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
+ ret = cl->call_timeout(paxos_protocol::heartbeat, 100ms, r, me, vid);
cfg_mutex_lock.lock();
heartbeat_t res = OK;
-#include "t4.h"
-#include "types.h"
-#include "threaded_log.h"
+#include "include/t4.h"
+#include "include/types.h"
+#include "include/debug.h"
using namespace std::chrono;
int tid = global->thread_name_map[thread];
if (tid==0)
tid = global->thread_name_map[thread] = ++global->next_thread_num;
- auto utime = duration_cast<microseconds>(
- system_clock::now().time_since_epoch()).count() % 1000000000;
+ auto utime = (system_clock::now().time_since_epoch() / 1us) % 1000000000;
f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
f << std::setfill(' ') << global->log_thread_prefix << std::left << std::setw(2) << tid;
f << " " << std::setw(20) << file << " " << std::setw(18) << func;
lock _log_lock() {
return lock(global->log_mutex);
}
+
+string hex_string(const string & s) {
+ string bytes;
+ for (char ch : s) {
+ bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]);
+ bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]);
+ }
+ return bytes;
+}
#ifndef config_h
#define config_h
-#include "types.h"
-#include "paxos.h"
+#include "include/types.h"
+#include "include/paxos.h"
class config_view_change {
public:
std::vector<string> mems;
std::mutex cfg_mutex;
cond config_cond;
- paxos_protocol::status heartbeat(int & r, string m, unsigned instance);
- void get_view(unsigned instance, std::vector<string> & m, lock & cfg_mutex_lock);
+ paxos_protocol::status heartbeat(paxos_protocol::view_t & r, string m, unsigned instance);
+ std::vector<string> get_view(unsigned instance, lock & cfg_mutex_lock);
bool remove(const string &, lock & cfg_mutex_lock);
void reconstruct(lock & cfg_mutex_lock);
typedef enum {
unsigned view_id() { return my_view_id; }
const string & myaddr() const { return me; }
string dump() { return paxos.dump(); }
- void get_view(unsigned instance, std::vector<string> & m);
+ std::vector<string> get_view(unsigned instance);
void restore(const string & s);
bool add(const string &, unsigned view_id);
bool ismember(const string & m, unsigned view_id);
-#ifndef threaded_log_h
-#define threaded_log_h
+#ifndef debug_h
+#define debug_h
#include <string>
#include <ostream>
#define LOG_NONMEMBER _log_prefix(locked_ostream{std::cerr, _log_lock()}, __FILE__, __func__)
#define LOG _log_member(LOG_NONMEMBER, (const void *)this)
+#define DEBUG_LOG LOG
#define IF_LEVEL(level) if(_log_debug_level() >= abs(level))
+string hex_string(const string & s);
+
#endif
#define endian_h
#include <cinttypes>
+#include <arpa/inet.h>
constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1};
#ifdef __cplusplus
-#include "types.h"
-#include "lock_protocol.h"
-#include "rpc/fifo.h"
-#include "rsm_client.h"
-#include "maybe.h"
-
-class lock_release_user {
- public:
- virtual void dorelease(lock_protocol::lockid_t) = 0;
- virtual ~lock_release_user() {}
-};
-
-class lock_state {
-public:
- enum {
- none = 0,
- retrying,
- free,
- locked,
- acquiring,
- releasing
- } state = none;
- std::thread::id held_by;
- std::list<thread::id> wanted_by;
- std::mutex m;
- std::map<thread::id, cond> c;
- lock_protocol::xid_t xid;
- void wait(lock & mutex_lock);
- void signal();
- void signal(thread::id who);
-};
-
-typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
+#include "include/types.h"
+#include "include/lock_protocol.h"
+#include "include/rpc/fifo.h"
+#include "include/rsm_client.h"
+#include "include/maybe.h"
// Clients that caches locks. The server can revoke locks using
// lock_revoke_server.
class lock_client {
private:
+ class lock_state {
+ public:
+ enum {
+ none = 0,
+ retrying,
+ free,
+ locked,
+ acquiring,
+ releasing
+ } state = none;
+ std::thread::id held_by;
+ std::list<thread::id> wanted_by;
+ std::mutex m;
+ std::map<thread::id, cond> c;
+ lock_protocol::xid_t xid;
+ void wait(lock & mutex_lock);
+ void signal();
+ void signal(thread::id who);
+ };
+
unique_ptr<rpcs> rlsrpc;
thread releaser_thread;
unique_ptr<rsm_client> rsmc;
- lock_release_user *lu;
in_port_t rlock_port;
string hostname;
string id;
std::mutex xid_mutex;
- lock_protocol::xid_t next_xid;
+ lock_protocol::xid_t next_xid=0;
fifo<maybe<lock_protocol::lockid_t>> release_fifo;
std::mutex lock_table_lock;
- lock_map lock_table;
+ std::map<lock_protocol::lockid_t, lock_state> lock_table;
lock_state & get_lock_state(lock_protocol::lockid_t lid);
public:
- lock_client(string xdst, lock_release_user *l = 0);
+ lock_client(string xdst);
~lock_client();
lock_protocol::status acquire(lock_protocol::lockid_t);
lock_protocol::status release(lock_protocol::lockid_t);
#ifndef lock_protocol_h
#define lock_protocol_h
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
typedef string callback_t;
--- /dev/null
+#ifndef lock_server_h
+#define lock_server_h
+
+#include "include/types.h"
+#include "include/lock_protocol.h"
+#include "include/rsm.h"
+#include "include/rpc/fifo.h"
+
+class lock_server : private rsm_state_transfer {
+ private:
+ using holder_t=std::pair<callback_t, lock_protocol::xid_t>;
+
+ struct lock_state {
+ inline lock_state() {}
+ lock_state(const lock_state & other);
+ bool held=false;
+ holder_t held_by;
+ std::list<holder_t> wanted_by;
+ std::map<callback_t, lock_protocol::xid_t> old_requests;
+ std::mutex m;
+
+ MEMBERS(held, held_by, wanted_by)
+ };
+
+ int nacquire;
+ std::mutex lock_table_lock;
+ std::map<lock_protocol::lockid_t, lock_state> lock_table;
+ lock_state & get_lock_state(lock_protocol::lockid_t lid);
+ fifo<lock_protocol::lockid_t> retry_fifo, revoke_fifo;
+ rsm *rsm_;
+ string marshall_state();
+ void unmarshall_state(const string & state);
+ void revoker NORETURN ();
+ void retryer NORETURN ();
+ public:
+ lock_server(rsm & r);
+ lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+ lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+};
+
+#endif
--- /dev/null
+#ifndef log_h
+#define log_h
+
+#include "include/types.h"
+#include "include/paxos_protocol.h"
+#include "include/rpc/marshall.h"
+
+class log {
+ private:
+ string name;
+ std::map<string, std::function<void(unmarshall &)>> handlers;
+
+ public:
+ log(string _me);
+ string read();
+ void write(string s);
+ void replay();
+
+ struct label : public string { label(const char * s) : string(s) {} };
+#define LABEL(_x_) inline operator log::label () const { return _x_; }
+
+ // XXX should be an atomic operation
+ template <class T>
+ void append(const T & t) {
+ std::ofstream(name, std::ios::app) << marshall{label(t), t};
+ }
+
+ template <class T>
+ void handler(std::function<void(T)> h) {
+ handlers[label(T())] = [h, this] (unmarshall & from) {
+ auto entry = from.get<T>();
+ h(entry);
+ DEBUG_LOG << entry;
+ };
+ }
+};
+
+#endif
#ifndef paxos_h
#define paxos_h
-#include "types.h"
-#include "rpc/rpc.h"
-#include "paxos_protocol.h"
-#include "log.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
+#include "include/paxos_protocol.h"
+#include "include/log.h"
using prepareres = paxos_protocol::prepareres;
using node_t = paxos_protocol::node_t;
unsigned instance_h = 0; // number of the highest instance we have decided
std::map<unsigned,value_t> values; // vals of each instance
- friend class log;
- class log l = {this, me};
+ class log l = {me};
void commit(unsigned instance, const value_t & v, lock & acceptor_mutex_lock);
void breakpoint1();
void breakpoint2();
+ // Log a committed paxos instance
+ struct log_instance {
+ unsigned number;
+ string value;
+ MEMBERS(number, value)
+ LABEL("done")
+ };
+
+ // Log the highest proposal number that the local paxos acceptor has
+ // ever seen; called from paxos when responding to preparereq with
+ // accept
+ struct log_proposal {
+ prop_t promise;
+ MEMBERS(promise)
+ LABEL("propseen")
+ };
+
+ // Log the highest proposal (proposal number and value) that the local
+ // paxos acceptor accept has ever accepted; called from paxos when
+ // responding to acceptreq with true
+ struct log_accept {
+ prop_t number;
+ string value;
+ MEMBERS(number, value)
+ LABEL("accepted")
+ };
+
public:
proposer_acceptor(paxos_change *delegate, bool _first, const node_t & _me, const value_t & _value);
unsigned instance() { return instance_h; }
const value_t & value(unsigned instance) { return values[instance]; }
- string dump() { return l.dump(); }
- void restore(const string & s) { l.restore(s); l.logread(); }
+ string dump() { return l.read(); }
+ void restore(const string & s) { l.write(s); l.replay(); }
rpcs *get_rpcs() { return &pxs; }
bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
#ifndef paxos_protocol_h
#define paxos_protocol_h
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
struct prop_t {
unsigned n;
};
using node_t = string;
using nodes_t = std::vector<node_t>;
+ using view_t = unsigned;
using value_t = string;
REMOTE_PROCEDURE_BASE(0x11000);
REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t));
REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t));
REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t));
- REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
+ REMOTE_PROCEDURE(4, heartbeat, (view_t &, string, view_t));
}
#endif
#ifndef connection_h
#define connection_h
-#include "types.h"
+#include "include/types.h"
#include <arpa/inet.h>
#include <netinet/in.h>
-#include "t4.h"
-#include "poll_mgr.h"
-#include "file.h"
-#include "threaded_log.h"
+#include "include/t4.h"
+#include "include/rpc/poll_mgr.h"
+#include "include/rpc/file.h"
+#include "include/debug.h"
class connection;
static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
const time_point create_time = steady_clock::now();
- const file_t fd;
+ socket_t fd;
private:
void write_cb(int s);
#ifndef fifo_h
#define fifo_h
-#include "types.h"
+#include "include/types.h"
template<class T>
class fifo {
public:
- fifo(size_t limit=0) : max_(limit) {}
+ fifo(size_t max_size=0) : max_(max_size) {}
bool enq(T e, bool blocking=true) {
lock ml(m_);
private:
std::list<T> q_;
std::mutex m_;
- cond non_empty_c_; // q went non-empty
- cond has_space_c_; // q is not longer overfull
- size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
+ cond non_empty_c_;
+ cond has_space_c_;
+ size_t max_;
};
#endif
#include <fcntl.h>
#include <unistd.h>
-#include "types.h"
+#include "include/types.h"
#include <sys/socket.h>
class file_t {
+ protected:
+ static const int nofile = -1;
private:
int fd_;
operator int & () { return flags_; }
};
public:
- inline file_t(int fd=-1) : fd_(fd) {}
+ inline file_t(int fd=nofile) : fd_(fd) {}
inline file_t(const file_t &) = delete;
- inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
- inline ~file_t() { if (fd_ != -1) ::close(fd_); }
+ inline file_t(file_t && other) : fd_(nofile) { std::swap(fd_, other.fd_); }
+ inline ~file_t() { close(); }
static inline void pipe(file_t *ends) {
int fds[2];
+ ends[0].close();
+ ends[1].close();
VERIFY(::pipe(fds) == 0);
ends[0].fd_ = fds[0];
ends[1].fd_ = fds[1];
}
- inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
+ inline operator int() const { if (fd_ == nofile) throw "no fd"; return fd_; }
inline flags_t flags() const { return {*this}; }
inline void close() {
- ::close(fd_);
- fd_ = -1;
+ if (fd_ != nofile)
+ ::close(fd_);
+ fd_ = nofile;
}
- template <class T>
- inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
+ template <class T> inline typename std::enable_if<std::is_pod<T>::value, ssize_t>::type
+ read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); }
- template <class T>
- inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
+ template <class T> inline typename std::enable_if<std::is_pod<T>::value, ssize_t>::type
+ write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); }
};
class socket_t : public file_t {
public:
- socket_t(int fd=-1) : file_t(fd) {}
+ socket_t(int fd=nofile) : file_t(fd) {}
template <class T>
int setsockopt(int level, int option, T && value) {
return ::setsockopt(*this, level, option, &value, sizeof(T));
}
+ inline int shutdown(int how) { return ::shutdown(*this, how); }
};
#endif
--- /dev/null
+#ifndef marshall_h
+#define marshall_h
+
+#include "include/types.h"
+#include "include/rpc/rpc_protocol.h"
+
+class marshall : public string {
+ public:
+ template <typename... Args>
+ marshall(const Args & ... args) {
+ UNPACK_STATEMENT(*this << args);
+ }
+
+ template <class T>
+ static inline string datagram(const T & h, const marshall & payload="") {
+ using namespace rpc_protocol;
+ marshall m{rpc_sz_t(payload.size() + RPC_HEADER_SZ - sizeof(rpc_sz_t)), (T)h};
+ VERIFY(m.size() <= RPC_HEADER_SZ); // Datagram header too large
+ m.resize(RPC_HEADER_SZ);
+ return m + payload;
+ }
+};
+
+class unmarshall : string {
+ private:
+ string::const_iterator next = cbegin();
+
+ public:
+ template <typename... Args>
+ inline unmarshall(const string & s, Args && ...args) : string(s) {
+ UNPACK_STATEMENT(*this >> args);
+ }
+
+ template <class H, typename... Args>
+ static inline unmarshall datagram(const string & datagram, H & h, Args && ... args) {
+ rpc_protocol::rpc_sz_t s;
+ VERIFY(unmarshall(datagram, s, h)); // Datagram header too large
+ return unmarshall(datagram.substr(rpc_protocol::RPC_HEADER_SZ), args...);
+ }
+
+ template <class S> inline S get() { S s; *this >> s; return s; }
+ inline bool ok() const { return next <= end(); }
+ inline bool okdone() const { return next == end(); }
+ inline operator bool() const { return ok(); }
+ inline void read(void * t, size_t n) {
+ auto from = next;
+ next += string::const_iterator::difference_type(n);
+ if (next <= end())
+ std::copy(from, next, (char *)t);
+ }
+};
+
+//
+// Marshalling for plain old data
+//
+
+#ifndef MARSHALL_RAW_NETWORK_ORDER_AS
+#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
+inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.append((const char *)&y, sizeof(_d_)); return m; } \
+inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+#endif
+
+#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
+
+MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint8_t)
+MARSHALL_RAW_NETWORK_ORDER(int8_t)
+MARSHALL_RAW_NETWORK_ORDER(uint16_t)
+MARSHALL_RAW_NETWORK_ORDER(int16_t)
+MARSHALL_RAW_NETWORK_ORDER(uint32_t)
+MARSHALL_RAW_NETWORK_ORDER(int32_t)
+MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(uint64_t)
+MARSHALL_RAW_NETWORK_ORDER(int64_t)
+
+//
+// Marshalling for tuples (used to implement marshalling for structs)
+//
+
+// In order to iterate over the tuple elements, we first need a template
+// parameter pack containing the tuple's indices. The function templates named
+// *_imp below accept an empty tag struct as their last argument, and use its
+// template arguments to index the tuple. The operator<< overloads instantiate
+// the appropriate tag struct to make this possible.
+
+template <class... Args, size_t... Indices> inline marshall &
+tuple_marshall_imp(marshall & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
+ UNPACK_STATEMENT(m << std::get<Indices>(t));
+ return m;
+}
+
+template <class... Args> inline marshall &
+operator<<(marshall & m, tuple<Args...> && t) {
+ return tuple_marshall_imp(m, t, std::index_sequence_for<Args...>{});
+}
+
+template <class... Args, size_t... Indices> inline unmarshall &
+tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, std::index_sequence<Indices...>) {
+ UNPACK_STATEMENT(u >> std::get<Indices>(t));
+ return u;
+}
+
+template <class... Args> inline unmarshall &
+operator>>(unmarshall & u, tuple<Args & ...> && t) {
+ return tuple_unmarshall_imp(u, t, std::index_sequence_for<Args...>{});
+}
+
+//
+// Marshalling for structs or classes containing a MEMBERS declaration
+//
+
+// Implements struct marshalling via tuple marshalling of members.
+template <class A> inline typename
+enable_if<is_tuple_convertible<A>::value, unmarshall>::type &
+operator>>(unmarshall & u, A & a) { return u >> _tuple_(a); }
+
+template <class T> inline typename
+enable_if<is_tuple_convertible<T>::value, marshall>::type &
+operator<<(marshall & m, const T a) { return m << _tuple_(a); }
+
+//
+// Marshalling for STL containers
+//
+
+// this overload is visible for type A only if A::cbegin and A::cend exist
+template <class A> inline typename
+enable_if<is_const_iterable<A>::value, marshall>::type &
+operator<<(marshall & m, const A & x) {
+ m << (uint32_t)x.size();
+ for (const auto & a : x)
+ m << a;
+ return m;
+}
+
+// visible for type A if A::emplace_back(a) makes sense
+template <class A> inline typename
+enable_if<supports_emplace_back<A>::value, unmarshall>::type &
+operator>>(unmarshall & u, A & x) {
+ uint32_t n = u.get<uint32_t>();
+ x.clear();
+ while (n--)
+ x.emplace_back(u.get<typename A::value_type>());
+ return u;
+}
+
+// std::map<A, B>
+template <class A, class B> inline unmarshall &
+operator>>(unmarshall & u, std::map<A,B> & x) {
+ uint32_t n = u.get<uint32_t>();
+ x.clear();
+ while (n--)
+ x.emplace(u.get<std::pair<A,B>>());
+ return u;
+}
+
+// std::string
+inline marshall & operator<<(marshall & m, const string & s) {
+ m << (uint32_t)s.size();
+ m.append(s);
+ return m;
+}
+
+inline unmarshall & operator>>(unmarshall & u, string & s) {
+ uint32_t sz;
+ if (u >> sz) {
+ s.resize(sz);
+ u.read(&s[0], sz);
+ }
+ return u;
+}
+
+//
+// Marshalling for strongly-typed enums
+//
+
+template <class E> typename enable_if<std::is_enum<E>::value, marshall>::type &
+operator<<(marshall & m, E e) {
+ return m << typename std::underlying_type<E>::type(e);
+}
+
+template <class E> typename enable_if<std::is_enum<E>::value, unmarshall>::type &
+operator>>(unmarshall & u, E & e) {
+ e = E(u.get<typename std::underlying_type<E>::type>());
+ return u;
+}
+
+#endif
#ifndef marshall_wrap_h
#define marshall_wrap_h
-#include "marshall.h"
+#include "include/rpc/marshall.h"
typedef std::function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
return new handler([=](unmarshall && u, marshall & m) -> RV {
// Unmarshall each argument with the correct type and store the
// result in a tuple.
- ArgsStorage t{u._grab<typename std::decay<Args>::type>()...};
+ ArgsStorage t{u.get<typename std::decay<Args>::type>()...};
// Verify successful unmarshalling of the entire input stream.
if (!u.okdone())
return (RV)ErrorHandler::unmarshall_args_failure();
#ifndef poll_mgr_h
#define poll_mgr_h
-#include "types.h"
+#include "include/types.h"
#define MAX_POLL_FDS 128
#ifndef rpc_h
#define rpc_h
-#include "types.h"
+#include "include/types.h"
-#include "rpc_protocol.h"
-#include "thread_pool.h"
-#include "marshall.h"
-#include "marshall_wrap.h"
-#include "connection.h"
-#include "threaded_log.h"
+#include "include/rpc/rpc_protocol.h"
+#include "include/rpc/thread_pool.h"
+#include "include/rpc/marshall.h"
+#include "include/rpc/marshall_wrap.h"
+#include "include/rpc/connection.h"
+#include "include/debug.h"
using std::chrono::milliseconds;
namespace rpc {
- static constexpr milliseconds to_max{12000};
- static constexpr milliseconds to_min{100};
+ static constexpr auto to_max = milliseconds(12000);
+ static constexpr auto to_min = milliseconds(100);
}
template<class P, class R, class ...Args>
string rep;
int intret = call_marshalled(proc, to, rep, marshall(args...));
if (intret >= 0) {
- VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking
+ rpc_protocol::reply_header rh;
+ // guaranteed by static type checking?
+ VERIFY(unmarshall::datagram(rep, rh, r).okdone());
}
return intret;
}
#ifndef rpc_protocol_h
#define rpc_protocol_h
-#include "types.h"
+#include "include/types.h"
namespace rpc_protocol {
using proc_id_t = uint32_t;
using status = int32_t;
- using rpc_sz_t = uint32_t;
+ using rpc_sz_t = uint64_t;
using nonce_t = uint32_t;
using xid_t = int32_t;
union header_t { request_header req; reply_header rep; };
const size_t RPC_HEADER_SZ = sizeof(header_t) + sizeof(rpc_sz_t);
- const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
#define REMOTE_PROCEDURE_BASE(_base_) \
#include <functional>
#include <vector>
-#include "fifo.h"
+#include "include/rpc/fifo.h"
typedef std::function<void()> job_t;
#ifndef rsm_h
#define rsm_h
-#include "types.h"
-#include "rsm_protocol.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+#include "include/rpc/rpc.h"
#include <arpa/inet.h>
-#include "config.h"
+#include "include/config.h"
class rsm_state_transfer {
public:
- virtual string marshal_state() = 0;
- virtual void unmarshal_state(const string &) = 0;
+ virtual string marshall_state() = 0;
+ virtual void unmarshall_state(const string &) = 0;
virtual ~rsm_state_transfer();
};
--- /dev/null
+#ifndef rsm_client_h
+#define rsm_client_h
+
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+
+//
+// rsm client interface.
+//
+// The client stubs package up an rpc, and then call the invoke procedure
+// on the replicated state machine passing the RPC as an argument. This way
+// the replicated state machine isn't service specific; any server can use it.
+//
+
+class rsm_client {
+ protected:
+ string primary;
+ std::vector<string> known_mems;
+ std::mutex rsm_client_mutex;
+ bool init_members(lock & rsm_client_mutex_lock);
+ rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req);
+ template<class R> int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req);
+ public:
+ rsm_client(string dst);
+
+ template<class P, class R, class ...Args>
+ inline int call(rpc_protocol::proc_checked_t<P> proc, R & r, const Args & ...a1) {
+ static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
+ return call_marshalled(proc, r, marshall(a1...));
+ }
+};
+
+template<class R>
+int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) {
+ string rep, res;
+ int intret = invoke(proc.id, rep, req);
+ VERIFY( intret == rsm_client_protocol::OK );
+ auto u = unmarshall(rep, intret);
+ if (intret < 0) return intret;
+ if ((u >> res).okdone() && unmarshall(res, r).okdone())
+ return intret;
+ LOG << "failed to unmarshall reply \"" << hex_string(rep) << "\" for proceduce "
+ << proc.name << " (0x" << std::hex << proc.id << ").";
+ return rpc_protocol::unmarshall_reply_failure;
+}
+
+#endif
#ifndef rsm_protocol_h
#define rsm_protocol_h
-#include "types.h"
-#include "rpc/rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
namespace rsm_client_protocol {
enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY};
#ifndef rsmtest_client_h
#define rsmtest_client_h
-#include "types.h"
-#include "rsm_protocol.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
// Client interface to the rsmtest server
class rsmtest_client {
#ifndef t4_h
#define t4_h
-#include "types.h"
-#include "rpc/poll_mgr.h"
+#include "include/types.h"
+#include "include/rpc/poll_mgr.h"
struct t4_state {
std::mutex log_mutex;
decltype(std::declval<A &>().emplace_back(std::declval<typename A::value_type>()), void())
> : true_type {};
-template<typename E> using enum_type_t = typename enable_if<
- std::is_enum<E>::value, typename std::underlying_type<E>::type>::type;
-
-template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
-template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
-
-
template <class A, typename I=void> struct is_tuple_convertible : false_type {};
template<class A> struct is_tuple_convertible<A,
inline auto _tuple_() { return std::tie(__VA_ARGS__); } \
inline auto _tuple_() const { return std::tie(__VA_ARGS__); }
+template <class T> inline auto _tuple_(T & t) { return t._tuple_(); }
+
+// specialized tuple adapter for std::pair
+
+template <class A, class B> struct is_tuple_convertible<std::pair<A, B>> : true_type {};
+template <class A, class B> inline auto _tuple_(std::pair<A, B> & t) { return std::tie(t.first, t.second); }
+template <class A, class B> inline auto _tuple_(const std::pair<A, B> & t) { return std::tie(t.first, t.second); }
+
// struct ordering and comparison operations; requires the use of MEMBERS.
// usage:
//
// are fair game.
struct pass { template <typename... Args> inline pass(Args && ...) {} };
-#include "endian.h"
+#define UNPACK_STATEMENT(_x_) (void)pass{(_x_)...}
+
+#include "include/endian.h"
#ifndef __has_attribute
#define __has_attribute(x) 0
#define NORETURN
#endif
+template <class... Args, size_t... Indices> inline void
+tuple_ostream_imp(std::ostream & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
+ UNPACK_STATEMENT(m << std::get<Indices>(t));
+}
+
+template <class... Args> inline std::ostream &
+operator<<(std::ostream & m, tuple<Args...> && t) {
+ tuple_ostream_imp(m, t, std::index_sequence_for<Args...>{});
+ return m;
+}
+
+template <class T> inline typename std::enable_if<is_tuple_convertible<T>::value, std::ostream &>::type
+operator<<(std::ostream & os, const T & t) { return os << _tuple_(t); }
+
#endif
// RPC stubs for clients to talk to lock_server, and cache the locks.
-#include "lock_client.h"
+#include "include/lock_client.h"
#include <arpa/inet.h>
-void lock_state::wait(lock & mutex_lock) {
+void lock_client::lock_state::wait(lock & mutex_lock) {
auto self = std::this_thread::get_id();
c[self].wait(mutex_lock);
c.erase(self);
}
-void lock_state::signal() {
+void lock_client::lock_state::signal() {
// signal anyone
if (c.begin() != c.end())
c.begin()->second.notify_one();
}
-void lock_state::signal(thread::id who) {
+void lock_client::lock_state::signal(thread::id who) {
if (c.count(who))
c[who].notify_one();
}
-lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
+lock_client::lock_state & lock_client::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
return lock_table[lid]; // creates the lock if it doesn't already exist
}
-lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xid(0) {
+lock_client::lock_client(string xdst) {
rlock_port = std::uniform_int_distribution<in_port_t>(1024,32000+1024)(global->random_generator);
id = "127.0.0.1:" + std::to_string(rlock_port);
rlsrpc = std::make_unique<rpcs>(rlock_port);
sl.unlock();
int r;
rsmc->call(lock_protocol::release, r, lid, id, st.xid);
- if (lu)
- lu->dorelease(lid);
sl.lock();
}
st.state = lock_state::none;
// the caching lock server implementation
-#include "lock_server.h"
+#include "include/lock_server.h"
#include <unistd.h>
#include <arpa/inet.h>
-lock_state::lock_state():
- held(false)
-{
+lock_server::lock_state::lock_state(const lock_state & other) {
+ held = other.held;
+ held_by = other.held_by;
+ wanted_by = other.wanted_by;
+ old_requests = other.old_requests;
}
-lock_state::lock_state(const lock_state & other) {
- *this = other;
-}
-
-lock_state & lock_state::operator=(const lock_state & o) {
- held = o.held;
- held_by = o.held_by;
- wanted_by = o.wanted_by;
- old_requests = o.old_requests;
- return *this;
-}
-
-lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
+lock_server::lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
lock sl(lock_table_lock);
// this will create the lock if it doesn't already exist
return lock_table[lid];
return lock_protocol::OK;
}
-string lock_server::marshal_state() {
+string lock_server::marshall_state() {
lock sl(lock_table_lock);
- return marshall(nacquire, lock_table).content();
+ return marshall(nacquire, lock_table);
}
-void lock_server::unmarshal_state(const string & state) {
+void lock_server::unmarshall_state(const string & state) {
lock sl(lock_table_lock);
- unmarshall(state, false, nacquire, lock_table);
+ unmarshall(state, nacquire, lock_table);
}
+++ /dev/null
-#ifndef lock_server_h
-#define lock_server_h
-
-#include "types.h"
-#include "lock_protocol.h"
-#include "rsm.h"
-#include "rpc/fifo.h"
-
-typedef std::pair<callback_t, lock_protocol::xid_t> holder_t;
-
-class lock_state {
-public:
- lock_state();
- lock_state(const lock_state & other);
- bool held;
- holder_t held_by;
- std::list<holder_t> wanted_by;
- std::map<callback_t, lock_protocol::xid_t> old_requests;
- std::mutex m;
- lock_state & operator=(const lock_state &);
-
- MEMBERS(held, held_by, wanted_by)
-};
-
-typedef std::map<lock_protocol::lockid_t, lock_state> lock_map;
-
-class lock_server : private rsm_state_transfer {
- private:
- int nacquire;
- std::mutex lock_table_lock;
- lock_map lock_table;
- lock_state & get_lock_state(lock_protocol::lockid_t lid);
- fifo<lock_protocol::lockid_t> retry_fifo;
- fifo<lock_protocol::lockid_t> revoke_fifo;
- rsm *rsm_;
- string marshal_state();
- void unmarshal_state(const string & state);
- void revoker NORETURN ();
- void retryer NORETURN ();
- public:
- lock_server(rsm & r);
- lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
- lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
-};
-
-#endif
-#include "lock_server.h"
+#include "include/lock_server.h"
+
+using namespace std::chrono;
int main(int argc, char *argv[]) {
global = new t4_state('s');
rsm.start();
while(1)
- std::this_thread::sleep_for(milliseconds(1000));
+ std::this_thread::sleep_for(1000ms);
}
// Lock server tester
//
-#include "lock_client.h"
+#include "include/lock_client.h"
#include <arpa/inet.h>
#include <unistd.h>
+using namespace std::chrono;
+
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
static lock_client *lc[nt];
lc[i]->acquire(a);
LOG_NONMEMBER << "test2: client " << i << " acquire done";
check_grant(a);
- std::this_thread::sleep_for(milliseconds(100));
+ std::this_thread::sleep_for(100ms);
LOG_NONMEMBER << "test2: client " << i << " release";
check_release(a);
lc[i]->release(a);
-#include "log.h"
-#include "paxos.h"
+#include "include/log.h"
+#include "include/paxos.h"
-// Paxos must maintain some durable state (i.e., that survives power
-// failures) to run Paxos correct. This module implements a log with
-// all durable state to run Paxos. Since the values chosen correspond
-// to views, the log contains all views since the beginning of time.
+// Maintains durable state (i.e. surviving power failures) needed for correct
+// operation of Paxos as a log.
-log::log(proposer_acceptor *_acc, string _me) : pxs (_acc) {
- name = "paxos-" + _me + ".log";
- logread();
-}
+log::log(string _me) : name("paxos-" + _me + ".log") {}
-void log::logread(void) {
- std::ifstream from(name);
+void log::replay() {
+ auto from = unmarshall(read());
string type;
- unsigned instance;
- LOG << "logread";
+ DEBUG_LOG << "Replaying paxos log from disk";
while (from >> type) {
- if (type == "done") {
- string v;
- from >> instance;
- from.get();
- getline(from, v);
- pxs->values[instance] = v;
- pxs->instance_h = instance;
- LOG << "logread: instance: " << instance << " w. v = "
- << pxs->values[instance];
- pxs->accepted_value.clear();
- pxs->promise.n = 0;
- pxs->accepted.n = 0;
- } else if (type == "propseen") {
- from >> pxs->promise.n >> pxs->promise.m;
- LOG << "logread: high update: " << pxs->promise.n << "(" << pxs->promise.m << ")";
- } else if (type == "accepted") {
- string v;
- from >> pxs->accepted.n >> pxs->accepted.m;
- from.get();
- getline(from, v);
- pxs->accepted_value = v;
- LOG << "logread: prop update " << pxs->accepted.n << "(" << pxs->accepted.m << ") with v = " << pxs->accepted_value;
+ if (handlers.count(type)) {
+ handlers[type](from);
} else {
- LOG << "logread: unknown log record";
+ DEBUG_LOG << "unknown log record";
VERIFY(0);
}
- }
- from.close();
-}
-
-string log::dump() {
- std::ifstream from(name);
- string res;
- string v;
- while (std::getline(from, v))
- res += v + "\n";
- from.close();
- return res;
-}
-
-void log::restore(string s) {
- LOG << "restore: " << s;
- std::ofstream f(name, std::ios::trunc);
- f << s;
- f.close();
-}
-
-// XXX should be an atomic operation
-void log::loginstance(unsigned instance, string v) {
- std::ofstream f(name, std::ios::app);
- f << "done " << instance << " " << v << "\n";
- f.close();
+ }
}
-// an acceptor should call logprop(promise) when it
-// receives a prepare to which it responds prepare_ok().
-void log::logprop(prop_t promise) {
- std::ofstream f(name, std::ios::app);
- f << "propseen " << promise.n << " " << promise.m << "\n";
- f.close();
+string log::read() {
+ return (std::stringstream() << std::ifstream(name).rdbuf()).str();
}
-// an acceptor should call logaccept(accepted, accepted_value) when it
-// receives an accept RPC to which it replies accept_ok().
-void log::logaccept(prop_t n, string v) {
- std::ofstream f(name, std::ios::app);
- f << "accepted " << n.n << " " << n.m << " " << v << "\n";
- f.close();
+void log::write(string s) {
+ DEBUG_LOG << s;
+ std::ofstream(name, std::ios::trunc) << s;
}
+++ /dev/null
-#ifndef log_h
-#define log_h
-
-#include "types.h"
-#include "paxos_protocol.h"
-
-class proposer_acceptor;
-
-class log {
- private:
- string name;
- proposer_acceptor *pxs;
- public:
- log (proposer_acceptor*, string _me);
- string dump();
- void restore(string s);
- void logread(void);
- // Log a committed paxos instance
- void loginstance(unsigned instance, string v);
- // Log the highest proposal number that the local paxos acceptor has ever seen
- void logprop(prop_t n_h);
- // Log the proposal (proposal number and value) that the local paxos acceptor
- // accept has ever accepted
- void logaccept(prop_t n_a, string v);
-};
-
-#endif
--- /dev/null
+import os, numpy as np
+from clang.cindex import *
+import subprocess
+
+libt4_path = '/Users/peteriannucci/invirt/third/libt4/'
+OPTFLAGS = ['-O3']
+STDLIB = ['-stdlib=libc++']
+PEDANTRY = ['-Werror', '-Weverything', '-Wall', '-Wextra', '-pedantic-errors', '-pedantic',
+ '-Wno-c++98-compat-pedantic', '-Wno-padded', '-Weffc++',
+ '-Wno-non-virtual-dtor', '-Wno-weak-vtables']
+CXXFLAGS = ['-x', 'c++', '-I', libt4_path, '-std=c++1y'] + STDLIB + PEDANTRY + OPTFLAGS
+CXXFLAGS += ['-DMARSHALL_RAW_NETWORK_ORDER_AS(a,b)=static void macro_marshall_##a##_as_##b(a first, b second);']
+
+target = 'paxos.cc'
+
+libdir = subprocess.Popen(['llvm-config-mp-3.4', '--libdir'], stdout=subprocess.PIPE).communicate()[0].strip()
+Config.set_library_path(libdir)
+index = Index.create()
+tu = index.parse(os.path.join(libt4_path, target), args=CXXFLAGS)
+
+def dive_all(cursor, kinds, pred=None, depth_limit=-1):
+ if cursor.kind in kinds and (pred is None or pred(cursor)):
+ yield cursor
+ if depth_limit != 0:
+ for c in cursor.get_children():
+ for d in dive_all(c, kinds, pred, depth_limit-1):
+ yield d
+
+def get_label(cursor):
+ try:
+ cursor = dive_all(cursor, (CursorKind.CONVERSION_FUNCTION,), lambda c: c.displayname == 'operator label()', 1).next()
+ cursor = dive_all(cursor, (CursorKind.STRING_LITERAL,)).next()
+ return eval(cursor.get_tokens().next().spelling)
+ except:
+ return None
+
+def get_members(cursor):
+ try:
+ adapter = dive_all(cursor, (CursorKind.CXX_METHOD,), lambda c: c.displayname == '_tuple_()', 1).next()
+ fields = {x.displayname:x for x in dive_all(cursor, (CursorKind.FIELD_DECL,))}
+ return [fields[m.displayname] for m in dive_all(adapter, (CursorKind.MEMBER_REF_EXPR,))]
+ except:
+ return None
+
+class Marshallable:
+ def __init__(self, m, u):
+ self.marshall = m
+ self.unmarshall = u
+
+def MarshallRawNetworkOrderAs(what, as_what):
+ as_what = np.dtype(as_what).newbyteorder('B')
+ def m(x):
+ return np.array([x], as_what).tostring()
+ def u(s):
+ return np.fromstring(s[:as_what.itemsize], as_what).astype(what)[0], s[as_what.itemsize:]
+ mu = Marshallable(m, u)
+ mu.__repr__ = lambda : '<Marshallable(%s)>' % np.dtype(what).name
+ return mu
+
+def RecordClass(displayname, members):
+ member_names = [f.displayname for f in members]
+ member_types = [f.type.get_canonical().spelling for f in members]
+ member_types_nice = [f.type.spelling for f in members]
+ member_marshallers = [types[t] for t in member_types]
+ class Record(object):
+ def __init__(self, *args, **kwargs):
+ if args:
+ assert len(args) == len(member_names)
+ self.__dict__.update(dict(zip(member_names, args)))
+ else:
+ assert set(kwargs.keys()) == set(member_names)
+ self.__dict__.update(kwargs)
+ __init__.__doc__ = 'Required arguments:\n' + '\n'.join('%s (%s)' % (n,t) for t,n in zip(member_types_nice, member_names))
+ @staticmethod
+ def marshall(self):
+ return ''.join(marshaller.marshall(getattr(self, name)) for name, marshaller in zip(member_names, member_marshallers))
+ @staticmethod
+ def unmarshall(s):
+ member_values = []
+ for marshaller in member_marshallers:
+ member_value, s = marshaller.unmarshall(s)
+ member_values.append(member_value)
+ return Record(**dict(zip(member_names, member_values))), s
+ def __repr__(self):
+ return '<%s ' % displayname + ' '.join('%s=%s' % (k, repr(getattr(self, k))) for k in member_names) + '>'
+ Record.__name__ = displayname
+ return Record
+
+class MarshallString:
+ def __init__(self):
+ pass
+ def marshall(self, s):
+ return types['unsigned int'].marshall(len(s)) + s
+ def unmarshall(self, s):
+ l, s = types['unsigned int'].unmarshall(s)
+ return s[:l], s[l:]
+
+types = {}
+labels = {}
+types['std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >'] = MarshallString()
+
+numpy_type = {'bool': np.bool, 'char': np.int8, 'signed char': np.int8, 'unsigned char': np.uint8, 'short': np.short, 'unsigned short': np.ushort,
+ 'int': np.intc, 'unsigned int': np.uintc, 'long': np.int_, 'unsigned long': np.uint, 'long long': np.longlong, 'unsigned long long': np.ulonglong}
+
+def processDeclaration(cursor):
+ if cursor.kind == CursorKind.ENUM_DECL:
+ enum_type = cursor.enum_type.get_canonical().spelling
+ if enum_type != '<dependent type>':
+ processDeclaration(cursor.enum_type.get_canonical().get_declaration())
+ assert enum_type in types, enum_type
+ types[cursor.type.get_canonical().spelling] = types[enum_type]
+ elif cursor.kind == CursorKind.FUNCTION_DECL:
+ if cursor.displayname.startswith('macro_marshall_'):
+ what, as_what = [x.type.get_canonical().spelling for x in cursor.get_arguments()]
+ types[what] = MarshallRawNetworkOrderAs(numpy_type[what], numpy_type[as_what])
+ else:
+ members = get_members(cursor)
+ label = get_label(cursor)
+ if members is not None:
+ for f in members:
+ field_type = f.type.get_canonical().spelling
+ processDeclaration(f.type.get_canonical().get_declaration())
+ assert field_type in types, field_type
+ types[cursor.type.get_canonical().spelling] = RecordClass(cursor.displayname, members)
+ if label is not None:
+ labels[cursor.type.get_canonical().spelling] = label
+
+for cursor in dive_all(tu.cursor, (CursorKind.FUNCTION_DECL,)):
+ processDeclaration(cursor)
+
+for cursor in dive_all(tu.cursor, set([CursorKind.STRUCT_DECL, CursorKind.CLASS_DECL, CursorKind.ENUM_DECL])):
+ processDeclaration(cursor)
-#include "paxos.h"
+#include "include/paxos.h"
using namespace std::placeholders;
+using namespace std::chrono;
paxos_change::~paxos_change() {}
bool _first, const node_t & _me, const value_t & _value)
: delegate(_delegate), me (_me)
{
- // at this point, the log has already been replayed
- if (instance_h == 0 && _first) {
- values[1] = _value;
- l.loginstance(1, _value);
- instance_h = 1;
- }
+ l.handler<log_instance>([this] (auto entry) {
+ instance_h = entry.number;
+ values[entry.number] = entry.value;
+ accepted = promise = {0, me};
+ accepted_value.clear();
+ });
+ l.handler<log_proposal>([this] (auto entry) {
+ promise = entry.promise;
+ });
+ l.handler<log_accept>([this] (auto entry) {
+ accepted = entry.number;
+ accepted_value = entry.value;
+ });
+
+ if (instance_h == 0 && _first)
+ l.append(log_instance{1, _value});
+
+ l.replay();
pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
auto cl = rpcc::bind_cached(i);
if (!cl)
continue;
- int status = cl->call_timeout(paxos_protocol::preparereq, milliseconds(100),
+ int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
res, me, instance, proposal);
if (status == paxos_protocol::OK) {
LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
bool accept = false;
for (auto i : nodes) {
if (auto cl = rpcc::bind_cached(i)) {
- int status = cl->call_timeout(paxos_protocol::acceptreq, milliseconds(100),
+ int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
accept, me, instance, proposal, v);
if (status == paxos_protocol::OK && accept)
accepts.push_back(i);
int res = 0;
for (auto i : accepts)
if (auto cl = rpcc::bind_cached(i))
- cl->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
+ cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
}
paxos_protocol::status
} else if (n > promise) {
LOG << "looks good to me";
promise = n;
- l.logprop(promise);
+ l.append(log_proposal{n});
r = prepareres{prepareres::accept, accepted, accepted_value};
} else {
LOG << "I totally rejected this request. Ha.";
if (n >= promise) {
accepted = n;
accepted_value = v;
- l.logaccept(accepted, accepted_value);
+ l.append(log_accept{n, v});
r = true;
}
return paxos_protocol::OK;
LOG << "instance=" << instance << " has v=" << value;
if (instance > instance_h) {
LOG << "highestacceptedinstance = " << instance;
- values[instance] = value;
- l.loginstance(instance, value);
+ l.append(log_instance{instance, value});
instance_h = instance;
+ values[instance] = value;
accepted = promise = {0, me};
accepted_value.clear();
if (delegate) {
-#include "connection.h"
-#include "rpc_protocol.h"
+#include "include/rpc/connection.h"
+#include "include/rpc/rpc_protocol.h"
#include <cerrno>
#include <csignal>
#include <netinet/tcp.h>
#include <unistd.h>
-#include "marshall.h"
+#include "include/rpc/marshall.h"
connection_delegate::~connection_delegate() {}
if (dead_)
return;
dead_ = true;
- shutdown(fd,SHUT_RDWR);
+ fd.shutdown(SHUT_RDWR);
}
// after block_remove_fd, select will never wait on fd and no callbacks
// will be active
global->shared_mgr.block_remove_fd(fd);
- VERIFY(dead_);
- VERIFY(wpdu_.status == unused);
+ VERIFY(dead_ && wpdu_.status == unused);
}
shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
- shutdown(fd,SHUT_RDWR);
+ fd.shutdown(SHUT_RDWR);
}
if (!writepdu()) {
if (wpdu_.cursor == wpdu_.buf.size())
return true;
- ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
+ ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
if (n < 0) {
if (errno != EAGAIN) {
IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
: tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
{
- tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
- tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
+ tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1});
+ tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1});
tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
+++ /dev/null
-#ifndef marshall_h
-#define marshall_h
-
-#include "types.h"
-#include "rpc_protocol.h"
-
-class marshall {
- private:
- string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0);
- size_t index_ = rpc_protocol::RPC_HEADER_SZ;
-
- public:
- template <typename... Args>
- marshall(const Args & ... args) {
- (void)pass{(*this << args)...};
- }
-
- void write(const void *p, size_t n) {
- if (index_+n > buf_.size())
- buf_.resize(index_+n);
- std::copy((char *)p, (char *)p+n, &buf_[index_]);
- index_ += n;
- }
-
- // with header
- inline operator string() const { return buf_.substr(0,index_); }
- // without header
- inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
-
- // letting S be a defaulted template parameter forces the compiler to
- // delay looking up operator<<(marshall &, rpc_sz_t) until we define it
- // (i.e. we define an operator for marshalling uint32_t)
- template <class T, class S=rpc_protocol::rpc_sz_t> inline void
- write_header(const T & h) {
- VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ);
- size_t saved_sz = index_;
- index_ = 0;
- *this << (S)(saved_sz - sizeof(S)) << (T)h;
- index_ = saved_sz;
- }
-};
-
-class unmarshall {
- private:
- string buf_;
- size_t index_ = rpc_protocol::RPC_HEADER_SZ;
- bool ok_ = false;
-
- public:
- template <typename... Args>
- unmarshall(const string & s, bool has_header, Args && ... args)
- : buf_(s) {
- if (!has_header)
- buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
- ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
- (void)pass{(*this >> args)...};
- }
-
- inline bool ok() const { return ok_; }
- inline bool okdone() const { return ok_ && index_ == buf_.size(); }
-
- void read(void * t, size_t n) {
- if (index_+n > buf_.size())
- ok_ = false;
- if (ok_) {
- std::copy(&buf_[index_], &buf_[index_+n], (char *)t);
- index_ += n;
- }
- }
-
- template <class T> inline void
- read_header(T & h) {
- VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ);
- // first 4 bytes hold length field
- index_ = sizeof(rpc_protocol::rpc_sz_t);
- *this >> h;
- index_ = rpc_protocol::RPC_HEADER_SZ;
- }
-
- template <class T> inline T _grab() { T t; *this >> t; return t; }
-};
-
-//
-// Marshalling for plain old data
-//
-
-#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
-inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.write(&y, sizeof(_d_)); return m; } \
-inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
-
-#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
-
-MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t)
-MARSHALL_RAW_NETWORK_ORDER(uint8_t)
-MARSHALL_RAW_NETWORK_ORDER(int8_t)
-MARSHALL_RAW_NETWORK_ORDER(uint16_t)
-MARSHALL_RAW_NETWORK_ORDER(int16_t)
-MARSHALL_RAW_NETWORK_ORDER(uint32_t)
-MARSHALL_RAW_NETWORK_ORDER(int32_t)
-MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t)
-MARSHALL_RAW_NETWORK_ORDER(uint64_t)
-MARSHALL_RAW_NETWORK_ORDER(int64_t)
-
-//
-// Marshalling for tuples (used to implement marshalling for structs)
-//
-
-// In order to iterate over the tuple elements, we first need a template
-// parameter pack containing the tuple's indices. The function templates named
-// *_imp below accept an empty tag struct as their last argument, and use its
-// template arguments to index the tuple. The operator<< overloads instantiate
-// the appropriate tag struct to make this possible.
-
-template <class... Args, size_t... Indices> inline marshall &
-tuple_marshall_imp(marshall & m, tuple<Args...> & t, std::index_sequence<Indices...>) {
- // Note that brace initialization is used for the empty structure "pack",
- // forcing the comma-separated expressions expanded from the parameter pack
- // to be evaluated in order. Order matters because the elements must be
- // serialized consistently! The empty struct resulting from construction
- // is discarded.
- (void)pass{(m << std::get<Indices>(t))...};
- return m;
-}
-
-template <class... Args> inline marshall &
-operator<<(marshall & m, tuple<Args...> && t) {
- return tuple_marshall_imp(m, t, std::index_sequence_for<Args...>{});
-}
-
-template <class... Args, size_t... Indices> inline unmarshall &
-tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, std::index_sequence<Indices...>) {
- (void)pass{(u >> std::get<Indices>(t))...};
- return u;
-}
-
-template <class... Args> inline unmarshall &
-operator>>(unmarshall & u, tuple<Args & ...> && t) {
- return tuple_unmarshall_imp(u, t, std::index_sequence_for<Args...>{});
-}
-
-//
-// Marshalling for structs or classes containing a MEMBERS declaration
-//
-
-// Implements struct marshalling via tuple marshalling of members.
-template <class T> inline typename
-enable_if<is_tuple_convertible<T>::value, unmarshall>::type &
-operator>>(unmarshall & u, T & a) { return u >> a._tuple_(); }
-
-template <class T> inline typename
-enable_if<is_tuple_convertible<T>::value, marshall>::type &
-operator<<(marshall & m, const T a) { return m << a._tuple_(); }
-
-//
-// Marshalling for STL containers
-//
-
-// this overload is visible for type A only if A::cbegin and A::cend exist
-template <class A> inline typename
-enable_if<is_const_iterable<A>::value, marshall>::type &
-operator<<(marshall & m, const A & x) {
- m << (uint32_t)x.size();
- for (const auto & a : x)
- m << a;
- return m;
-}
-
-// visible for type A if A::emplace_back(a) makes sense
-template <class A> inline typename
-enable_if<supports_emplace_back<A>::value, unmarshall>::type &
-operator>>(unmarshall & u, A & x) {
- uint32_t n = u._grab<uint32_t>();
- x.clear();
- while (n--)
- x.emplace_back(u._grab<typename A::value_type>());
- return u;
-}
-
-// std::pair<A, B>
-template <class A, class B> inline marshall &
-operator<<(marshall & m, const std::pair<A,B> & d) {
- return m << d.first << d.second;
-}
-
-template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, std::pair<A,B> & d) {
- return u >> d.first >> d.second;
-}
-
-// std::map<A, B>
-template <class A, class B> inline unmarshall &
-operator>>(unmarshall & u, std::map<A,B> & x) {
- uint32_t n = u._grab<uint32_t>();
- x.clear();
- while (n--)
- x.emplace(u._grab<std::pair<A,B>>());
- return u;
-}
-
-// std::string
-inline marshall & operator<<(marshall & m, const string & s) {
- m << (uint32_t)s.size();
- m.write(s.data(), s.size());
- return m;
-}
-
-inline unmarshall & operator>>(unmarshall & u, string & s) {
- uint32_t sz = u._grab<uint32_t>();
- if (u.ok()) {
- s.resize(sz);
- u.read(&s[0], sz);
- }
- return u;
-}
-
-//
-// Marshalling for strongly-typed enums
-//
-
-template <class E> typename enable_if<std::is_enum<E>::value, marshall>::type &
-operator<<(marshall & m, E e) {
- return m << from_enum(e);
-}
-
-template <class E> typename enable_if<std::is_enum<E>::value, unmarshall>::type &
-operator>>(unmarshall & u, E & e) {
- e = to_enum<E>(u._grab<enum_type_t<E>>());
- return u;
-}
-
-//
-// Recursive marshalling
-//
-
-inline marshall & operator<<(marshall & m, marshall & n) {
- return m << n.content();
-}
-
-inline unmarshall & operator>>(unmarshall & u, unmarshall & v) {
- v = unmarshall(u._grab<string>(), false);
- return u;
-}
-
-#endif
-#include "poll_mgr.h"
+#include "include/rpc/poll_mgr.h"
#include <errno.h>
#include <sys/select.h>
-#include "file.h"
-#include "threaded_log.h"
+#include "include/rpc/file.h"
+#include "include/debug.h"
#ifdef __linux__
#include <sys/epoll.h>
// x exited worker threads).
//
-#include "rpc.h"
+#include "include/rpc/rpc.h"
#include <arpa/inet.h>
#include <netinet/tcp.h>
lock cl = lock(client->bind_m_);
if (!client->bind_done_) {
LOG_NONMEMBER << "bind(\"" << destination << "\")";
- int ret = client->bind(milliseconds(1000));
+ int ret = client->bind(1000ms);
if (ret < 0) {
LOG_NONMEMBER << "bind failure! " << destination << " " << ret;
client.reset();
caller ca(0, &rep);
xid_t xid_rep;
- marshall datagram = req;
+ string datagram;
{
lock ml(m_);
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- datagram.write_header(rpc_protocol::request_header{
+ datagram = marshall::datagram(rpc_protocol::request_header{
ca.xid, proc.id, clt_nonce_, srv_nonce_, xid_rep_window_.front()
- });
+ }, req);
xid_rep = xid_rep_window_.front();
}
// Runs in poll_mgr's thread as an upcall from the connection object to the
// rpcc. Does not call blocking RPC handlers.
bool rpcc::got_pdu(const shared_ptr<connection> &, const string & b) {
- unmarshall rep(b, true);
rpc_protocol::reply_header h;
- rep.read_header(h);
- if (!rep.ok()) {
+ if (!unmarshall::datagram(b, h)) {
IF_LEVEL(1) LOG << "unmarshall header failed!!!";
return true;
}
}
void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
- unmarshall req(buf, true);
-
rpc_protocol::request_header h;
- req.read_header(h);
- proc_id_t proc = h.proc;
- if (!req.ok()) {
+ auto req = unmarshall::datagram(buf, h);
+
+ if (!req) {
IF_LEVEL(1) LOG << "unmarshall header failed";
return;
}
+ proc_id_t proc = h.proc;
+
IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep "
<< std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
- marshall rep;
rpc_protocol::reply_header rh{h.xid,0};
// is client sending to an old instance of server?
IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
<< " (current " << nonce_ << ") proc " << std::hex << proc;
rh.ret = rpc_protocol::oldsrv_failure;
- rep.write_header(rh);
- c->send(rep);
+ c->send(marshall::datagram(rh));
return;
}
switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, stored_reply)) {
case NEW: // new request
- rh.ret = (*f)(std::forward<unmarshall>(req), rep);
- if (rh.ret == rpc_protocol::unmarshall_args_failure) {
- LOG << "failed to unmarshall the arguments. You are "
- << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
- << "types of arguments.";
- VERIFY(0);
- }
- VERIFY(rh.ret >= 0);
+ {
+ marshall rep;
+ rh.ret = (*f)(std::forward<unmarshall>(req), rep);
+ if (rh.ret == rpc_protocol::unmarshall_args_failure) {
+ LOG << "failed to unmarshall the arguments. You are "
+ << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
+ << "types of arguments.";
+ VERIFY(0);
+ }
+ VERIFY(rh.ret >= 0);
- rep.write_header(rh);
- stored_reply = rep;
+ stored_reply = marshall::datagram(rh, rep);
+ }
IF_LEVEL(2) LOG << "sending and saving reply of size " << stored_reply.size() << " for rpc "
<< h.xid << ", proc " << std::hex << proc << " ret " << std::dec
case FORGOTTEN: // very old request and we don't have the response anymore
IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce;
rh.ret = rpc_protocol::atmostonce_failure;
- rep.write_header(rh);
- c->send(rep);
+ c->send(marshall::datagram(rh));
break;
}
}
// RPC test and pseudo-documentation.
// generates print statements on failures, but eventually says "rpctest OK"
-#include "types.h"
-#include "rpc.h"
+#include "include/types.h"
+#include "include/rpc/rpc.h"
#include <arpa/inet.h>
#include <getopt.h>
#include <unistd.h>
#include <string.h>
-#include "threaded_log.h"
+#include "include/debug.h"
#define NUM_CL 2
using namespace std::chrono;
using std::vector;
-// server-side handlers. they must be methods of some class
-// to simplify rpcs::reg(). a server process can have handlers
-// from multiple classes.
-class srv {
- public:
- int handle_22(string & r, const string a, const string b);
- int handle_fast(int & r, const int a);
- int handle_slow(int & r, const int a);
- int handle_bigrep(string & r, const size_t a);
-};
-
namespace srv_protocol {
- using status = rpc_protocol::status;
+ enum status : rpc_protocol::status {OK};
REMOTE_PROCEDURE_BASE(0);
REMOTE_PROCEDURE(22, _22, (string &, string, string));
REMOTE_PROCEDURE(23, fast, (int &, int));
REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
}
+// server-side handlers. they must be methods of some class
+// to simplify rpcs::reg(). a server process can have handlers
+// from multiple classes.
+class srv {
+ public:
+ srv_protocol::status handle_22(string & r, const string a, const string b);
+ srv_protocol::status handle_fast(int & r, const int a);
+ srv_protocol::status handle_slow(int & r, const int a);
+ srv_protocol::status handle_bigrep(string & r, const size_t a);
+};
+
// a handler. a and b are arguments, r is the result.
// there can be multiple arguments but only one result.
// the caller also gets to see the int return value
// rpcs::reg() decides how to unmarshall by looking
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
-int srv::handle_22(string & r, const string a, string b) {
+srv_protocol::status srv::handle_22(string & r, const string a, string b) {
r = a + b;
- return 0;
+ return srv_protocol::OK;
}
-int srv::handle_fast(int & r, const int a) {
+srv_protocol::status srv::handle_fast(int & r, const int a) {
r = a + 1;
- return 0;
+ return srv_protocol::OK;
}
-int srv::handle_slow(int & r, const int a) {
- int us = std::uniform_int_distribution<>(0,500)(global->random_generator);
- std::this_thread::sleep_for(microseconds(us));
+srv_protocol::status srv::handle_slow(int & r, const int a) {
+ auto duration = std::uniform_int_distribution<>(0,500)(global->random_generator) * 1us;
+ std::this_thread::sleep_for(duration);
r = a + 2;
- return 0;
+ return srv_protocol::OK;
}
-int srv::handle_bigrep(string & r, const size_t len) {
+srv_protocol::status srv::handle_bigrep(string & r, const size_t len) {
r = string(len, 'x');
- return 0;
+ return srv_protocol::OK;
}
static srv service;
static void testmarshall() {
marshall m;
rpc_protocol::request_header rh{1,2,3,4,5};
- m.write_header(rh);
- VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
+ VERIFY(marshall::datagram(rh, m).size()==rpc_protocol::RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
size_t sz = 101010101;
m << sz;
m << bin;
- string b = m;
- VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size());
+ string b = marshall::datagram(rh, m);
+ VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint64_t)+sizeof(uint32_t)+bin.size());
- unmarshall un(b, true);
rpc_protocol::request_header rh1;
- un.read_header(rh1);
+ auto un = unmarshall::datagram(b, rh1);
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
unsigned long long l1;
int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
auto end = steady_clock::now();
- auto diff = duration_cast<milliseconds>(end - start).count();
+ auto diff = (end - start) / 1ms;
if (ret != 0)
cout << diff << " ms have elapsed!!!" << endl;
VERIFY(ret == 0);
for(int i = 0; i < 4; i++){
int rep = 0;
- int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
+ int ret = c->call_timeout(srv_protocol::slow, 300ms, rep, i);
VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
}
}
cout << " -- string concat RPC .. ok" << endl;
// small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
+ intret = c->call_timeout(srv_protocol::bigrep, 20000ms, rep, 70000ul);
VERIFY(intret == 0);
VERIFY(rep.size() == 70000);
cout << " -- small request, big reply .. ok" << endl;
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
- intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
+ intret = c->call_timeout(srv_protocol::fast, 300ms, xx, 77);
VERIFY(intret == 0 && xx == 78);
cout << " -- no spurious timeout .. ok" << endl;
{
string arg(1000, 'x');
string rep2;
- c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
+ c->call_timeout(srv_protocol::_22, 300ms, rep2, arg, (string)"x");
VERIFY(rep2.size() == 1001);
cout << " -- no spurious timeout .. ok" << endl;
}
string non_existent = "127.0.0.1:7661";
rpcc *c1 = new rpcc(non_existent);
time_t t0 = time(0);
- intret = c1->bind(milliseconds(300));
+ intret = c1->bind(300ms);
time_t t1 = time(0);
VERIFY(intret < 0 && (t1 - t0) <= 4);
cout << " -- rpc timeout .. ok" << endl;
delete server;
client1 = new rpcc(*dst);
- VERIFY (client1->bind(milliseconds(3000)) < 0);
+ VERIFY (client1->bind(3000ms) < 0);
cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
delete client1;
}
while (1)
- std::this_thread::sleep_for(milliseconds(100));
+ std::this_thread::sleep_for(100ms);
}
-#include "thread_pool.h"
+#include "include/rpc/thread_pool.h"
thread_pool::thread_pool(size_t sz) : th_(sz) {
for (auto & t : th_)
// The rule is that a module releases its internal locks before it
// upcalls, but can keep its locks when calling down.
-#include "rsm.h"
-#include "rsm_client.h"
+#include "include/rsm.h"
+#include "include/rsm_client.h"
#include <unistd.h>
using std::vector;
+using namespace std::chrono;
rsm_state_transfer::~rsm_state_transfer() {}
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
- std::this_thread::sleep_for(milliseconds(3000)); // XXX make another node in cfg primary?
+ std::this_thread::sleep_for(3000ms); // XXX make another node in cfg primary?
ml.lock();
}
}
rsm_mutex_lock.lock();
// Start accepting synchronization request (statetransferreq) now!
insync = true;
- cfg->get_view(vid_insync, backups);
+ backups = cfg->get_view(vid_insync);
backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
rsm_mutex_lock.unlock();
cl = rpcc::bind_cached(m);
if (cl) {
- ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
+ ret = cl->call_timeout(rsm_protocol::transferreq, 100ms,
r, cfg->myaddr(), last_myvs, vid_insync);
}
rsm_mutex_lock.lock();
return false;
}
if (stf && last_myvs != r.last) {
- stf->unmarshal_state(r.state);
+ stf->unmarshall_state(r.state);
}
last_myvs = r.last;
LOG << "transfer from " << m << " success, vs(" << last_myvs.vid << "," << last_myvs.seqno << ")";
rsm_mutex_lock.unlock();
auto cl = rpcc::bind_cached(m);
if (cl)
- ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs);
+ ret = cl->call_timeout(rsm_protocol::joinreq, 12000ms, log, cfg->myaddr(), last_myvs);
rsm_mutex_lock.lock();
if (cl == 0 || ret != rsm_protocol::OK) {
handler *h = procs[procno];
VERIFY(h);
marshall rep;
- auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
- r = marshall(ret, rep.content()).content();
+ auto ret = (rsm_protocol::status)(*h)(unmarshall(req), rep);
+ r = marshall(ret, rep);
}
static void logHexString(locked_ostream && log, const string & s) {
if (primary != myaddr)
return rsm_client_protocol::NOTPRIMARY;
LOG << "Assigning a viewstamp";
- cfg->get_view(vid_commit, m);
+ m = cfg->get_view(vid_commit);
// assign the RPC the next viewstamp number
vs = myvs;
myvs++;
}
- // send an invoke RPC to all slaves in the current view with a timeout of 1 second
+ // send an invoke RPC to all slaves in the current view with a timeout
LOG << "Invoking slaves";
- for (unsigned i = 0; i < m.size(); i++) {
- if (m[i] != myaddr) {
+ for (auto & mm : m) {
+ if (mm != myaddr) {
// if invoke on slave fails, return rsm_client_protocol::BUSY
- LOG << "Sending invoke to " << m[i];
- auto cl = rpcc::bind_cached(m[i]);
+ LOG << "Sending invoke to " << mm;
+ auto cl = rpcc::bind_cached(mm);
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
- auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
+ auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, 100ms, ignored_rval, procno, vs, req);
LOG << "Invoke returned " << ret;
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
LOG << "invoke procno 0x" << std::hex << proc;
lock ml(invoke_mutex);
- vector<string> m;
string myaddr;
{
lock ml2(rsm_mutex);
myaddr = cfg->myaddr();
if (primary == myaddr)
return rsm_protocol::ERR;
- cfg->get_view(vid_commit, m);
+ vector<string> m = cfg->get_view(vid_commit);
if (std::find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
if (stf && last != last_myvs)
- r.state = stf->marshal_state();
+ r.state = stf->marshall_state();
r.last = last_myvs;
return rsm_protocol::OK;
}
// primary failure
//
rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
- vector<string> m;
lock ml(rsm_mutex);
- cfg->get_view(vid_commit, m);
+ vector<string> m = cfg->get_view(vid_commit);
m.push_back(primary);
r = m;
LOG << "return " << m << " m " << primary;
// otherwise, the lowest number node of the previous view.
// caller should hold rsm_mutex
void rsm::set_primary(unsigned vid) {
- vector<string> c, p;
- cfg->get_view(vid, c);
- cfg->get_view(vid - 1, p);
+ vector<string> c = cfg->get_view(vid), p = cfg->get_view(vid - 1);
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
}
VERIFY(p.size() > 0);
- for (unsigned i = 0; i < p.size(); i++) {
- if (isamember(p[i], c)) {
- primary = p[i];
+ for (auto & pp : p) {
+ if (isamember(pp, c)) {
+ primary = pp;
LOG << "primary is " << primary;
return;
}
void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
VERIFY(rsm_mutex_lock);
- vector<string> m;
- cfg->get_view(vid_commit, m);
- for (unsigned i = 0; i < m.size(); i++) {
- if (m[i] != cfg->myaddr()) {
- LOG << "member " << m[i] << " " << heal;
- if (auto cl = rpcc::bind_cached(m[i]))
+ for (auto & mm : cfg->get_view(vid_commit)) {
+ if (mm != cfg->myaddr()) {
+ LOG << "member " << mm << " " << heal;
+ if (auto cl = rpcc::bind_cached(mm))
cl->set_reachable(heal);
}
}
-#include "rsm_client.h"
+#include "include/rsm_client.h"
#include <arpa/inet.h>
#include <unistd.h>
+using namespace std::chrono;
+
rsm_client::rsm_client(string dst) : primary(dst) {
LOG << "create rsm_client";
lock ml(rsm_client_mutex);
LOG << "done";
}
-void rsm_client::primary_failure(lock &) {
- primary = known_mems.back();
- known_mems.pop_back();
-}
-
-rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
+rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) {
lock ml(rsm_client_mutex);
while (1) {
LOG << "proc " << std::hex << proc << " primary " << primary;
auto cl = rpcc::bind_cached(prim);
auto ret = rsm_client_protocol::OK;
if (cl)
- ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
+ ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req);
ml.lock();
if (!cl)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
LOG << "rsm is busy " << prim;
- std::this_thread::sleep_for(milliseconds(300));
+ std::this_thread::sleep_for(300ms);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
}
prim_fail:
LOG << "primary " << prim << " failed ret " << std::dec << ret;
- primary_failure(ml);
+ primary = known_mems.back();
+ known_mems.pop_back();
LOG << "retry new primary " << prim;
}
}
shared_ptr<rpcc> cl;
{
rsm_client_mutex_lock.unlock();
- cl = rpcc::bind_cached(prim);
- if (cl)
- ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
+ if ((cl = rpcc::bind_cached(prim)))
+ ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0);
rsm_client_mutex_lock.lock();
}
- if (cl == 0 || ret != rsm_protocol::OK)
+ if (ret != rsm_protocol::OK)
return false;
- if (known_mems.size() < 1) {
+ if (!known_mems.size()) {
LOG << "do not know any members!";
VERIFY(0);
}
+++ /dev/null
-#ifndef rsm_client_h
-#define rsm_client_h
-
-#include "types.h"
-#include "rsm_protocol.h"
-
-//
-// rsm client interface.
-//
-// The client stubs package up an rpc, and then call the invoke procedure
-// on the replicated state machine passing the RPC as an argument. This way
-// the replicated state machine isn't service specific; any server can use it.
-//
-
-class rsm_client {
- protected:
- string primary;
- std::vector<string> known_mems;
- std::mutex rsm_client_mutex;
- void primary_failure(lock & rsm_client_mutex_lock);
- bool init_members(lock & rsm_client_mutex_lock);
- rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req);
- template<class R> int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req);
- public:
- rsm_client(string dst);
-
- template<class P, class R, class ...Args>
- inline int call(rpc_protocol::proc_checked_t<P> proc, R & r, const Args & ...a1) {
- static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
- return call_marshalled(proc, r, marshall(a1...));
- }
-};
-
-inline string hexify(const string & s) {
- string bytes;
- for (char ch : s) {
- bytes.push_back("0123456789abcdef"[(uint8_t)ch >> 4]);
- bytes.push_back("0123456789abcdef"[(uint8_t)ch & 15]);
- }
- return bytes;
-}
-
-template<class R>
-int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) {
- string rep;
- int intret = invoke(proc.id, rep, req.content());
- VERIFY( intret == rsm_client_protocol::OK );
- unmarshall u(rep, false, intret);
- if (intret < 0) return intret;
- string res;
- u >> res;
- if (!u.okdone()) {
- LOG << "failed to unmarshall the reply.";
- LOG << "You probably forgot to set the reply string in "
- << "rsm::client_invoke, or you may have called RPC "
- << proc.name << " (0x" << std::hex << proc.id << ") with the wrong return type";
- LOG << "here's what I got: \"" << hexify(rep) << "\"";
- VERIFY(0);
- return rpc_protocol::unmarshall_reply_failure;
- }
- if(!unmarshall(res, false, r).okdone()) {
- LOG << "failed to unmarshall the reply.";
- LOG << "You are probably calling RPC " << proc.name << " (0x"
- << std::hex << proc.id << ") with the wrong return type.";
- LOG << "here's what I got: \"" << hexify(res) << "\"";
- VERIFY(0);
- return rpc_protocol::unmarshall_reply_failure;
- }
- return intret;
-}
-
-#endif
// RSM test client
//
-#include "types.h"
-#include "rsm_protocol.h"
-#include "rsmtest_client.h"
+#include "include/types.h"
+#include "include/rsm_protocol.h"
+#include "include/rsmtest_client.h"
int main(int argc, char *argv[]) {
global = new t4_state('t');
rsmtest_client *lc = new rsmtest_client(argv[1]);
string command(argv[2]);
- if (command == "partition") {
+ if (command == "partition")
LOG_NONMEMBER << "net_repair returned " << lc->net_repair(std::stoi(argv[3]));
- } else if (command == "breakpoint") {
- int b = std::stoi(argv[3]);
- LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b);
- } else {
+ else if (command == "breakpoint")
+ LOG_NONMEMBER << "breakpoint " << argv[3] << " returned " << lc->breakpoint(std::stoi(argv[3]));
+ else
LOG_NONMEMBER << "Unknown command " << argv[2];
- }
return 0;
}
// RPC stubs for clients to talk to rsmtest_server
-#include "rsmtest_client.h"
+#include "include/rsmtest_client.h"
#include <arpa/inet.h>
rsmtest_client::rsmtest_client(string dst) {
if (!(cl = rpcc::bind_cached(dst)))
- LOG << "rsmtest_client: call bind";
+ LOG << "could not bind to " << dst;
}
rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
rsm_test_protocol::status r = rsm_test_protocol::ERR;
- auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::net_repair, r, heal);
- VERIFY (ret == rsm_test_protocol::OK);
+ VERIFY (cl->call(rsm_test_protocol::net_repair, r, heal) == rsm_test_protocol::OK);
return r;
}
rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
rsm_test_protocol::status r = rsm_test_protocol::ERR;
- auto ret = (rsm_test_protocol::status)cl->call(rsm_test_protocol::breakpoint, r, b);
- VERIFY (ret == rsm_test_protocol::OK);
+ VERIFY (cl->call(rsm_test_protocol::breakpoint, r, b) == rsm_test_protocol::OK);
return r;
}
-#include "t4.h"
+#include "include/t4.h"
#include <unistd.h>
-#include "rpc/rpc.h"
+#include "include/rpc/rpc.h"
using namespace std::chrono;
t4_state::t4_state(char log_prefix) : log_thread_prefix(log_prefix) {
uint32_t seed = std::random_device()();
auto time = system_clock::now().time_since_epoch();
- auto ticks = duration_cast<nanoseconds>(time).count();
+ auto ticks = time / 1ns;
seed ^= (uint32_t)ticks;
auto pid = getpid();
seed ^= (uint32_t)pid;
seed ^= (uint32_t)tid;
random_generator.seed(seed);
// make sure the clock will read differently next time!
- std::this_thread::sleep_for(microseconds(1));
+ std::this_thread::sleep_for(1us);
}
t4_state::~t4_state() {