From f0dcb6b97d6d40f67698d1f71ac26970f1776f82 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Sat, 16 Nov 2013 12:56:29 -0500 Subject: [PATCH] Cosmetic improvements. --- Makefile.osx | 2 +- config.cc | 45 ++++++++---------- config.h | 24 +++++----- lock_client.cc | 10 ++-- lock_client.h | 2 +- lock_server.cc | 29 +++++------ lock_server.h | 16 +++---- lock_smain.cc | 7 +-- paxos.cc | 2 +- paxos.h | 2 +- rpc/connection.cc | 2 +- rpc/connection.h | 2 +- rpc/marshall.h | 47 +++++++++--------- rpc/marshall_wrap.h | 6 +-- rpc/rpc.cc | 132 +++++++++++++++++---------------------------------- rpc/rpc.h | 62 ++++++++++++------------ rpc/rpc_protocol.h | 4 +- rpc/rpctest.cc | 14 +++--- rpc/thr_pool.cc | 2 +- rpc/thr_pool.h | 2 +- rsm.cc | 52 ++++++++------------ rsm.h | 33 ++++++------- rsm_client.cc | 2 +- rsm_client.h | 17 +++---- threaded_log.h | 4 +- types.h | 10 ++-- 26 files changed, 226 insertions(+), 304 deletions(-) diff --git a/Makefile.osx b/Makefile.osx index 80d85a3..74218df 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -2,7 +2,7 @@ PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \ -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \ -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \ -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++ -OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins +OPTFLAGS = -O3 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins STDLIB = -stdlib=libc++ #STDLIB = CXX = clang++-mp-3.4 diff --git a/config.cc b/config.cc index 2b40078..7df1bbc 100644 --- a/config.cc +++ b/config.cc @@ -33,7 +33,7 @@ // all views, the other nodes can bring this re-joined node up to // date. -config::config(const string &_first, const string &_me, config_view_change *_vc) +config::config(const string & _first, const string & _me, config_view_change *_vc) : my_view_id(0), first(_first), me(_me), vc(_vc), paxos(this, me == _first, me, me) { @@ -43,32 +43,25 @@ config::config(const string &_first, const string &_me, config_view_change *_vc) thread(&config::heartbeater, this).detach(); } -void config::restore(const string &s) { +void config::restore(const string & s) { lock cfg_mutex_lock(cfg_mutex); paxos.restore(s); reconstruct(cfg_mutex_lock); } -void config::get_view(unsigned instance, vector &m) { +void config::get_view(unsigned instance, vector & m) { lock cfg_mutex_lock(cfg_mutex); get_view(instance, m, cfg_mutex_lock); } -void config::get_view(unsigned instance, vector &m, lock &) { +void config::get_view(unsigned instance, vector & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); string value = paxos.value(instance); LOG("get_view(" << instance << "): returns " << value); - m = members(value); -} - -vector config::members(const string &value) const { - return explode(value); + m = explode(value); } -string config::value(const vector &members) const { - return implode(members); -} - -void config::reconstruct(lock &cfg_mutex_lock) { +void config::reconstruct(lock & cfg_mutex_lock) { VERIFY(cfg_mutex_lock); my_view_id = paxos.instance(); if (my_view_id > 0) { @@ -78,10 +71,10 @@ void config::reconstruct(lock &cfg_mutex_lock) { } // Called by Paxos's acceptor. -void config::paxos_commit(unsigned instance, const string &value) { +void config::paxos_commit(unsigned instance, const string & value) { lock cfg_mutex_lock(cfg_mutex); - vector newmem = members(value); + vector newmem = explode(value); LOG("instance " << instance << ": " << newmem); for (auto mem : mems) { @@ -101,14 +94,14 @@ void config::paxos_commit(unsigned instance, const string &value) { } } -bool config::ismember(const string &m, unsigned vid) { +bool config::ismember(const string & m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); vector v; get_view(vid, v, cfg_mutex_lock); return isamember(m, v); } -bool config::add(const string &new_m, unsigned vid) { +bool config::add(const string & new_m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); LOG("adding " << new_m << " to " << vid); if (vid != my_view_id) { @@ -118,18 +111,19 @@ bool config::add(const string &new_m, unsigned vid) { LOG("calling down to paxos layer"); vector m(mems), cmems(mems); m.push_back(new_m); - LOG("old mems " << cmems << " " << value(cmems)); - LOG("new mems " << m << " " << value(m)); + LOG("old mems " << cmems << " " << implode(cmems)); + LOG("new mems " << m << " " << implode(m)); unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos.run(nextvid, cmems, value(m)); + bool r = paxos.run(nextvid, cmems, implode(m)); cfg_mutex_lock.lock(); LOG("paxos proposer returned " << (r ? "success" : "failure")); return r; } // caller should hold cfg_mutex -bool config::remove(const string &m, lock &cfg_mutex_lock) { +bool config::remove(const string & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); LOG("my_view_id " << my_view_id << " remove? " << m); vector n; for (auto mem : mems) { @@ -139,7 +133,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) { vector cmems = mems; unsigned nextvid = my_view_id + 1; cfg_mutex_lock.unlock(); - bool r = paxos.run(nextvid, cmems, value(n)); + bool r = paxos.run(nextvid, cmems, implode(n)); cfg_mutex_lock.lock(); LOG("proposer returned " << (r ? "success" : "failure")); return r; @@ -183,7 +177,7 @@ void config::heartbeater() { } } -paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { +paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) { lock cfg_mutex_lock(cfg_mutex); r = (int) my_view_id; LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id); @@ -196,7 +190,8 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) { return paxos_protocol::ERR; } -config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { +config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) { + VERIFY(cfg_mutex_lock); unsigned vid = my_view_id; LOG("heartbeat to " << m << " (" << vid << ")"); handle h(m); diff --git a/config.h b/config.h index 7124a6e..895de1b 100644 --- a/config.h +++ b/config.h @@ -20,29 +20,27 @@ class config : public paxos_change { vector mems; mutex cfg_mutex; cond config_cond; - paxos_protocol::status heartbeat(int &r, string m, unsigned instance); - string value(const vector &mems) const; - vector members(const string &v) const; - void get_view(unsigned instance, vector &m, lock &cfg_mutex_lock); - bool remove(const string &, lock &cfg_mutex_lock); - void reconstruct(lock &cfg_mutex_lock); + paxos_protocol::status heartbeat(int & r, string m, unsigned instance); + void get_view(unsigned instance, vector & m, lock & cfg_mutex_lock); + bool remove(const string &, lock & cfg_mutex_lock); + void reconstruct(lock & cfg_mutex_lock); typedef enum { OK, // response and same view # VIEWERR, // response but different view # FAILURE, // no response } heartbeat_t; - heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock); + heartbeat_t doheartbeat(const string & m, lock & cfg_mutex_lock); public: - config(const string &_first, const string &_me, config_view_change *_vc); + config(const string & _first, const string & _me, config_view_change *_vc); unsigned view_id() { return my_view_id; } - const string &myaddr() const { return me; } + const string & myaddr() const { return me; } string dump() { return paxos.dump(); } - void get_view(unsigned instance, vector &m); - void restore(const string &s); + void get_view(unsigned instance, vector & m); + void restore(const string & s); bool add(const string &, unsigned view_id); - bool ismember(const string &m, unsigned view_id); + bool ismember(const string & m, unsigned view_id); void heartbeater NORETURN (); - void paxos_commit(unsigned instance, const string &v); + void paxos_commit(unsigned instance, const string & v); rpcs *get_rpcs() { return paxos.get_rpcs(); } void breakpoint(int b) { paxos.breakpoint(b); } }; diff --git a/lock_client.cc b/lock_client.cc index 3c5aa89..beca1cc 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -52,7 +52,7 @@ void lock_client::releaser() { release_fifo.deq(&lid); LOG("Releaser: " << lid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id()); st.state = lock_state::releasing; @@ -79,7 +79,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) { } lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); auto self = this_thread::get_id(); @@ -144,7 +144,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { } lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); auto self = this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); @@ -167,7 +167,7 @@ lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { LOG("Revoke handler " << lid << " " << xid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); if (st.state == lock_state::releasing || st.state == lock_state::none) @@ -189,7 +189,7 @@ rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_ } rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) { - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::acquiring); st.state = lock_state::retrying; diff --git a/lock_client.h b/lock_client.h index 73dfffe..654cf4f 100644 --- a/lock_client.h +++ b/lock_client.h @@ -55,7 +55,7 @@ class lock_client { fifo release_fifo; mutex lock_table_lock; lock_map lock_table; - lock_state &get_lock_state(lock_protocol::lockid_t lid); + lock_state & get_lock_state(lock_protocol::lockid_t lid); public: static in_port_t last_port; lock_client(string xdst, lock_release_user *l = 0); diff --git a/lock_server.cc b/lock_server.cc index 0c3a6e9..90ad5b2 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -10,11 +10,11 @@ lock_state::lock_state(): { } -lock_state::lock_state(const lock_state &other) { +lock_state::lock_state(const lock_state & other) { *this = other; } -lock_state& lock_state::operator=(const lock_state& o) { +lock_state & lock_state::operator=(const lock_state & o) { held = o.held; held_by = o.held_by; wanted_by = o.wanted_by; @@ -28,10 +28,14 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { return lock_table[lid]; } -lock_server::lock_server(rsm *r) : rsm_ (r) { +lock_server::lock_server(rsm & r) : rsm_ (&r) { thread(&lock_server::revoker, this).detach(); thread(&lock_server::retryer, this).detach(); - rsm_->set_state_transfer(this); + r.set_state_transfer(this); + + r.reg(lock_protocol::acquire, &lock_server::acquire, this); + r.reg(lock_protocol::release, &lock_server::release, this); + r.reg(lock_protocol::stat, &lock_server::stat, this); } void lock_server::revoker () { @@ -42,7 +46,7 @@ void lock_server::revoker () { if (rsm_ && !rsm_->amiprimary()) continue; - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); holder_t held_by; { lock sl(st.m); @@ -70,7 +74,7 @@ void lock_server::retryer() { continue; LOG("Sending retry for " << lid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); holder_t front; { lock sl(st.m); @@ -95,7 +99,7 @@ void lock_server::retryer() { lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); holder_t h = holder_t(id, xid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); // deal with duplicated requests @@ -152,7 +156,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) { LOG("lid=" << lid << " client=" << id << "," << xid); - lock_state &st = get_lock_state(lid); + lock_state & st = get_lock_state(lid); lock sl(st.m); if (st.held && st.held_by == holder_t(id, xid)) { st.held = false; @@ -165,18 +169,15 @@ lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, c string lock_server::marshal_state() { lock sl(lock_table_lock); - marshall rep; - rep << nacquire << lock_table; - return rep.content(); + return marshall(nacquire, lock_table).content(); } void lock_server::unmarshal_state(const string & state) { lock sl(lock_table_lock); - unmarshall rep(state, false); - rep >> nacquire >> lock_table; + unmarshall(state, false, nacquire, lock_table); } -lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) { +lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) { LOG("stat request for " << lid); VERIFY(0); r = nacquire; diff --git a/lock_server.h b/lock_server.h index d3ec580..1f30f87 100644 --- a/lock_server.h +++ b/lock_server.h @@ -11,13 +11,13 @@ typedef pair holder_t; class lock_state { public: lock_state(); - lock_state(const lock_state &other); + lock_state(const lock_state & other); bool held; holder_t held_by; list wanted_by; map old_requests; mutex m; - lock_state& operator=(const lock_state&); + lock_state & operator=(const lock_state &); MEMBERS(held, held_by, wanted_by) }; @@ -26,21 +26,21 @@ MARSHALLABLE_STRUCT(lock_state) typedef map lock_map; -class lock_server : public rsm_state_transfer { +class lock_server : private rsm_state_transfer { private: int nacquire; mutex lock_table_lock; lock_map lock_table; - lock_state &get_lock_state(lock_protocol::lockid_t lid); + lock_state & get_lock_state(lock_protocol::lockid_t lid); fifo retry_fifo; fifo revoke_fifo; rsm *rsm_; - public: - lock_server(rsm *r = 0); - void revoker NORETURN (); - void retryer NORETURN (); string marshal_state(); void unmarshal_state(const string & state); + void revoker NORETURN (); + void retryer NORETURN (); + public: + lock_server(rsm & r); lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id); diff --git a/lock_smain.cc b/lock_smain.cc index 0a3c209..2c9828b 100644 --- a/lock_smain.cc +++ b/lock_smain.cc @@ -18,12 +18,7 @@ int main(int argc, char *argv[]) { } rsm rsm(argv[1], argv[2]); - lock_server ls(&rsm); - rsm.set_state_transfer(&ls); - - rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls); - rsm.reg(lock_protocol::release, &lock_server::release, &ls); - rsm.reg(lock_protocol::stat, &lock_server::stat, &ls); + lock_server ls(rsm); rsm.start(); diff --git a/paxos.cc b/paxos.cc index a88a7a5..dad5ecf 100644 --- a/paxos.cc +++ b/paxos.cc @@ -6,7 +6,7 @@ bool isamember(const node_t & m, const nodes_t & nodes) { } // check if l2 contains a majority of the elements of l1 -bool majority(const nodes_t &l1, const nodes_t &l2) { +bool majority(const nodes_t & l1, const nodes_t & l2) { auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2)); return overlap >= (l1.size() >> 1) + 1; } diff --git a/paxos.h b/paxos.h index db18e6c..f1123e6 100644 --- a/paxos.h +++ b/paxos.h @@ -66,7 +66,7 @@ class proposer_acceptor { unsigned instance() { return instance_h; } const value_t & value(unsigned instance) { return values[instance]; } string dump() { return l.dump(); } - void restore(const string &s) { l.restore(s); l.logread(); } + void restore(const string & s) { l.restore(s); l.logread(); } rpcs *get_rpcs() { return &pxs; } bool run(unsigned instance, const nodes_t & cnodes, const value_t & v); diff --git a/rpc/connection.cc b/rpc/connection.cc index b29e136..4e49305 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -31,7 +31,7 @@ connection::~connection() { VERIFY(!wpdu_.buf.size()); } -shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) { +shared_ptr connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) { socket_t s = socket(AF_INET, SOCK_STREAM, 0); s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { diff --git a/rpc/connection.h b/rpc/connection.h index 8f7d494..3133299 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -28,7 +28,7 @@ class connection : private aio_callback, public enable_shared_from_this to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); + static shared_ptr to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0); const time_point create_time = steady_clock::now(); const file_t fd; diff --git a/rpc/marshall.h b/rpc/marshall.h index 6e0c94a..7b716e2 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -4,9 +4,6 @@ #include "types.h" #include "rpc_protocol.h" -class marshall; -class unmarshall; - // // Marshall and unmarshall objects // @@ -18,7 +15,7 @@ class marshall { public: template - marshall(const Args&... args) { + marshall(const Args & ... args) { (void)pass{(*this << args)...}; } @@ -35,7 +32,7 @@ class marshall { inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); } // letting S be a defaulted template parameter forces the compiler to - // delay looking up operator<<(marshall&, rpc_sz_t) until we define it + // delay looking up operator<<(marshall &, rpc_sz_t) until we define it // (i.e. we define an operator for marshalling uint32_t) template inline void pack_header(const T & h) { @@ -54,11 +51,13 @@ class unmarshall { bool ok_ = false; public: - unmarshall(const string &s, bool has_header) + template + unmarshall(const string & s, bool has_header, Args && ... args) : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) { if (!has_header) buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0); ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ); + (void)pass{(*this >> args)...}; } bool ok() const { return ok_; } @@ -89,8 +88,8 @@ class unmarshall { // #define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \ -inline marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \ -inline unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } +inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \ +inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } #define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) @@ -133,13 +132,13 @@ operator<<(marshall & m, tuple && t) { } template inline unmarshall & -tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices) { +tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices) { (void)pass{(u >> get(t))...}; return u; } template unmarshall & -operator>>(unmarshall & u, tuple && t) { +operator>>(unmarshall & u, tuple && t) { using Indices = typename make_tuple_indices::type; return tuple_unmarshall_imp(u, t, Indices()); } @@ -150,8 +149,8 @@ operator>>(unmarshall & u, tuple && t) { // Implements struct marshalling via tuple marshalling of members. #define MARSHALLABLE_STRUCT(_c_) \ -inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \ -inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); } +inline unmarshall & operator>>(unmarshall & u, _c_ & a) { return u >> a._tuple_(); } \ +inline marshall & operator<<(marshall & m, const _c_ a) { return m << a._tuple_(); } // our first two marshallable structs... MARSHALLABLE_STRUCT(rpc_protocol::request_header) @@ -164,9 +163,9 @@ MARSHALLABLE_STRUCT(rpc_protocol::reply_header) // this overload is visible for type A only if A::cbegin and A::cend exist template inline typename enable_if::value, marshall>::type & -operator<<(marshall &m, const A &x) { +operator<<(marshall & m, const A & x) { m << (unsigned int)x.size(); - for (const auto &a : x) + for (const auto & a : x) m << a; return m; } @@ -174,7 +173,7 @@ operator<<(marshall &m, const A &x) { // visible for type A if A::emplace_back(a) makes sense template inline typename enable_if::value, unmarshall>::type & -operator>>(unmarshall &u, A &x) { +operator>>(unmarshall & u, A & x) { unsigned n = u._grab(); x.clear(); while (n--) @@ -184,18 +183,18 @@ operator>>(unmarshall &u, A &x) { // std::pair template inline marshall & -operator<<(marshall &m, const pair &d) { +operator<<(marshall & m, const pair & d) { return m << d.first << d.second; } template inline unmarshall & -operator>>(unmarshall &u, pair &d) { +operator>>(unmarshall & u, pair & d) { return u >> d.first >> d.second; } // std::map template inline unmarshall & -operator>>(unmarshall &u, map &x) { +operator>>(unmarshall & u, map & x) { unsigned n = u._grab(); x.clear(); while (n--) @@ -204,13 +203,13 @@ operator>>(unmarshall &u, map &x) { } // std::string -inline marshall & operator<<(marshall &m, const string &s) { +inline marshall & operator<<(marshall & m, const string & s) { m << (uint32_t)s.size(); m.rawbytes(s.data(), s.size()); return m; } -inline unmarshall & operator>>(unmarshall &u, string &s) { +inline unmarshall & operator>>(unmarshall & u, string & s) { uint32_t sz = u._grab(); if (u.ok()) { s.resize(sz); @@ -224,12 +223,12 @@ inline unmarshall & operator>>(unmarshall &u, string &s) { // template typename enable_if::value, marshall>::type & -operator<<(marshall &m, E e) { +operator<<(marshall & m, E e) { return m << from_enum(e); } template typename enable_if::value, unmarshall>::type & -operator>>(unmarshall &u, E &e) { +operator>>(unmarshall & u, E & e) { e = to_enum(u._grab>()); return u; } @@ -238,11 +237,11 @@ operator>>(unmarshall &u, E &e) { // Recursive marshalling // -inline marshall & operator<<(marshall &m, marshall &n) { +inline marshall & operator<<(marshall & m, marshall & n) { return m << n.content(); } -inline unmarshall & operator>>(unmarshall &u, unmarshall &v) { +inline unmarshall & operator>>(unmarshall & u, unmarshall & v) { v = unmarshall(u._grab(), false); return u; } diff --git a/rpc/marshall_wrap.h b/rpc/marshall_wrap.h index 8e10a75..fa55f17 100644 --- a/rpc/marshall_wrap.h +++ b/rpc/marshall_wrap.h @@ -3,7 +3,7 @@ #include "marshall.h" -typedef function handler; +typedef function handler; // // Automatic marshalling wrappers for RPC handlers @@ -71,7 +71,7 @@ template -struct marshalled_func_imp { +struct marshalled_func_imp { static inline handler *wrap(F f, C *c=nullptr) { // This type definition corresponds to an empty struct with // template parameters running from 0 up to (# args) - 1. @@ -82,7 +82,7 @@ struct marshalled_func_imp { using ArgsStorage = tuple::type...>; // Allocate a handler (i.e. function) to hold the lambda // which will unmarshall RPCs and call f. - return new handler([=](unmarshall &u, marshall &m) -> RV { + return new handler([=](unmarshall && u, marshall & m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. ArgsStorage t{u._grab::type>()...}; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 0c3a97d..7937785 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -65,29 +65,17 @@ inline void set_rand_seed() { srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); } -static sockaddr_in make_sockaddr(const string &hostandport); +static sockaddr_in make_sockaddr(const string & hostandport); -rpcc::rpcc(const string & d, bool retrans) : - dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), - retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1) +rpcc::rpcc(const string & d) : dst_(make_sockaddr(d)) { - if (retrans) { - set_rand_seed(); - clt_nonce_ = (nonce_t)random(); - } else { - // special client nonce 0 means this client does not - // require at-most-once logic from the server - // because it uses tcp and never retries a failed connection - clt_nonce_ = 0; - } + set_rand_seed(); + clt_nonce_ = (nonce_t)random(); char *loss_env = getenv("RPC_LOSSY"); if (loss_env) lossytest_ = atoi(loss_env); - // xid starts with 1 and latest received reply starts with 0 - xid_rep_window_.push_back(0); - IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_); } @@ -102,7 +90,7 @@ rpcc::~rpcc() { int rpcc::bind(milliseconds to) { nonce_t r; - int ret = call_timeout(rpc_protocol::bind, to, r); + rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r); if (ret == 0) { lock ml(m_); bind_done_ = true; @@ -118,7 +106,7 @@ void rpcc::cancel(void) { lock ml(m_); if (calls_.size()) { LOG("force callers to fail"); - for (auto &p : calls_) { + for (auto & p : calls_) { caller *ca = p.second; IF_LEVEL(2) LOG("force caller to fail"); @@ -137,7 +125,7 @@ void rpcc::cancel(void) { } } -int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { +int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { caller ca(0, &rep); xid_t xid_rep; @@ -169,7 +157,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { while (1) { if (transmit) { - get_refconn(ch); + get_latest_connection(ch); if (ch) { if (reachable_) { request forgot; @@ -212,11 +200,9 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { if (nextdeadline >= finaldeadline) break; - if (retrans_ && (!ch || ch->isdead())) { - // since connection is dead, retransmit - // on the new connection + // retransmit on new connection if connection is dead + if (!ch || ch->isdead()) transmit = true; - } } { @@ -226,7 +212,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { // may need to update the xid again here, in case the // packet times out before it's even sent by the channel. // I don't think there's any harm in maybe doing it twice - update_xid_rep(ca.xid); + update_xid_rep(ca.xid, ml); if (destroy_wait_) destroy_wait_c_.notify_one(); @@ -253,7 +239,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { return (ca.done? ca.intret : rpc_protocol::timeout_failure); } -void rpcc::get_refconn(shared_ptr & ch) { +void rpcc::get_latest_connection(shared_ptr & ch) { lock ml(chan_m_); if (!chan_ || chan_->isdead()) chan_ = connection::to_dst(dst_, this, lossytest_); @@ -281,7 +267,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) lock ml(m_); - update_xid_rep(h.xid); + update_xid_rep(h.xid, ml); if (calls_.find(h.xid) == calls_.end()) { IF_LEVEL(2) LOG("xid " << h.xid << " no pending request"); @@ -302,10 +288,8 @@ rpcc::got_pdu(const shared_ptr &, const string & b) return true; } -// assumes thread holds mutex m -void -rpcc::update_xid_rep(int xid) -{ +void rpcc::update_xid_rep(xid_t xid, lock & m_lock) { + VERIFY(m_lock); if (xid <= xid_rep_window_.front()) return; @@ -325,15 +309,13 @@ compress: } } -rpcs::rpcs(in_port_t p1) - : port_(p1), reachable_ (true) +rpcs::rpcs(in_port_t p1) : port_(p1) { set_rand_seed(); nonce_ = (nonce_t)random(); IF_LEVEL(2) LOG("created with nonce " << nonce_); reg(rpc_protocol::bind, &rpcs::rpcbind, this); - dispatchpool_ = unique_ptr(new thread_pool(6, false)); } void rpcs::start() { @@ -345,7 +327,6 @@ rpcs::~rpcs() { // must delete listener before dispatchpool listener_ = nullptr; dispatchpool_ = nullptr; - free_reply_window(); } bool rpcs::got_pdu(const shared_ptr & c, const string & b) { @@ -357,13 +338,6 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b)); } -void rpcs::reg1(proc_id_t proc, handler *h) { - lock pl(procs_m_); - VERIFY(procs_.count(proc) == 0); - procs_[proc] = h; - VERIFY(procs_.count(proc) >= 1); -} - void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); @@ -405,41 +379,33 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { f = procs_[proc]; } - rpcs::rpcstate_t stat; - string b1; - - if (h.clt_nonce) { - // have i seen this client before? - { - lock rwl(reply_window_m_); - // if we don't know about this clt_nonce, create a cleanup object - if (reply_window_.find(h.clt_nonce) == reply_window_.end()) { - VERIFY (reply_window_[h.clt_nonce].size() == 0); // create - reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid - IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << - " chan " << c->fd << ", total clients " << (reply_window_.size()-1)); - } - } - - // save the latest good connection to the client - { - lock rwl(conns_m_); - if (conns_.find(h.clt_nonce) == conns_.end()) - conns_[h.clt_nonce] = c; - else if (conns_[h.clt_nonce]->create_time < c->create_time) - conns_[h.clt_nonce] = c; + // have i seen this client before? + { + lock rwl(reply_window_m_); + // if we don't know about this clt_nonce, create a cleanup object + if (reply_window_.find(h.clt_nonce) == reply_window_.end()) { + VERIFY (reply_window_[h.clt_nonce].size() == 0); // create + reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid + IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << + " chan " << c->fd << ", total clients " << (reply_window_.size()-1)); } + } - stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1); - } else { - // this client does not require at most once logic - stat = NEW; + // save the latest good connection to the client + { + lock rwl(conns_m_); + if (conns_.find(h.clt_nonce) == conns_.end()) + conns_[h.clt_nonce] = c; + else if (conns_[h.clt_nonce]->create_time < c->create_time) + conns_[h.clt_nonce] = c; } - switch (stat) { + string b1; + + switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) { case NEW: // new request - rh.ret = (*f)(req, rep); - if (rh.ret == rpc_protocol::unmarshal_args_failure) { + rh.ret = (*f)(forward(req), rep); + if (rh.ret == rpc_protocol::unmarshall_args_failure) { LOG("failed to unmarshall the arguments. You are " << "probably calling RPC 0x" << hex << proc << " with the wrong " << "types of arguments."); @@ -453,10 +419,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce); - if (h.clt_nonce > 0) { - // only record replies for clients that require at-most-once logic - add_reply(h.clt_nonce, h.xid, b1); - } + add_reply(h.clt_nonce, h.xid, b1); // get the latest connection to the client { @@ -496,12 +459,12 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { // DONE: seen this xid, previous reply returned in b. // FORGOTTEN: might have seen this xid, but deleted previous reply. rpcs::rpcstate_t -rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, +rpcs::check_duplicate_and_update(nonce_t clt_nonce, xid_t xid, xid_t xid_rep, string & b) { lock rwl(reply_window_m_); - list &l = reply_window_[clt_nonce]; + list & l = reply_window_[clt_nonce]; VERIFY(l.size() > 0); VERIFY(xid >= xid_rep); @@ -543,12 +506,10 @@ rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, // rpcs::dispatch calls add_reply when it is sending a reply to an RPC, // and passes the return value in b. // add_reply() should remember b. -// free_reply_window() and checkduplicate_and_update are responsible for -// cleaning up the remembered values. void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { lock rwl(reply_window_m_); // remember the RPC reply value - list &l = reply_window_[clt_nonce]; + list & l = reply_window_[clt_nonce]; list::iterator it = l.begin(); // skip to our place in the list for (it++; it != l.end() && it->xid < xid; it++); @@ -561,18 +522,13 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { } } -void rpcs::free_reply_window(void) { - lock rwl(reply_window_m_); - reply_window_.clear(); -} - -int rpcs::rpcbind(nonce_t &r) { +rpc_protocol::status rpcs::rpcbind(nonce_t & r) { IF_LEVEL(2) LOG("called return nonce " << nonce_); r = nonce_; return 0; } -static sockaddr_in make_sockaddr(const string &hostandport) { +static sockaddr_in make_sockaddr(const string & hostandport) { string host = "127.0.0.1"; string port = hostandport; auto colon = hostandport.find(':'); diff --git a/rpc/rpc.h b/rpc/rpc.h index 3ae7737..211c717 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -52,29 +52,30 @@ class rpcc : private connection_delegate { cond c; }; - void get_refconn(shared_ptr & ch); - void update_xid_rep(xid_t xid); + void get_latest_connection(shared_ptr & ch); + void update_xid_rep(xid_t xid, lock & m_lock); sockaddr_in dst_; nonce_t clt_nonce_; - nonce_t srv_nonce_; - bool bind_done_; - xid_t xid_; - int lossytest_; - bool retrans_; - bool reachable_; + nonce_t srv_nonce_ = 0; + bool bind_done_ = false; + int lossytest_ = 0; + bool reachable_ = true; shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; - bool destroy_wait_; + bool destroy_wait_ = false; cond destroy_wait_c_; map calls_; - list xid_rep_window_; + + // xid starts with 1 and latest received reply starts with 0 + xid_t xid_ = 1; + list xid_rep_window_ = {0}; struct request { void clear() { buf.clear(); xid = -1; } @@ -83,22 +84,21 @@ class rpcc : private connection_delegate { xid_t xid = -1; }; request dup_req_; - int xid_rep_done_; + int xid_rep_done_ = -1; - int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to); + int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req); template - int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) { + inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) { string rep; - int intret = call1(proc, req, rep, to); - unmarshall u(rep, true); + int intret = call1(proc, to, rep, req); if (intret < 0) return intret; - u >> r; + unmarshall u(rep, true, r); if (u.okdone() != true) { LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << "calling RPC 0x" << hex << proc << " with the wrong return type."); VERIFY(0); - return rpc_protocol::unmarshal_reply_failure; + return rpc_protocol::unmarshall_reply_failure; } return intret; } @@ -107,7 +107,7 @@ class rpcc : private connection_delegate { public: - rpcc(const string & d, bool retrans=true); + rpcc(const string & d); ~rpcc(); nonce_t id() { return clt_nonce_; } @@ -119,15 +119,14 @@ class rpcc : private connection_delegate { void cancel(); template - inline int call(proc_t

proc, R & r, const Args&... args) { + inline int call(proc_t

proc, R & r, const Args & ... args) { return call_timeout(proc, rpc::to_max, r, args...); } template - inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args&... args) { + inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args & ... args) { static_assert(is_valid_call::value, "RPC called with incorrect argument types"); - marshall m{args...}; - return call_m(proc.id, m, r, to); + return call_m(proc.id, to, r, forward(marshall(args...))); } }; @@ -167,16 +166,15 @@ class rpcs : private connection_delegate { // indexed by client nonce. map> reply_window_; - void free_reply_window(void); void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); - rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, + rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid, xid_t rep_xid, string & b); // latest connection to the client map> conns_; - bool reachable_; + bool reachable_ = true; // map proc # to function map procs_; @@ -187,14 +185,11 @@ class rpcs : private connection_delegate { void dispatch(shared_ptr c, const string & buf); - // internal handler registration - void reg1(proc_id_t proc, handler *); - - unique_ptr dispatchpool_; + unique_ptr dispatchpool_{new thread_pool(6, false)}; unique_ptr listener_; // RPC handler for clients binding - rpc_protocol::status rpcbind(nonce_t &r); + rpc_protocol::status rpcbind(nonce_t & r); bool got_pdu(const shared_ptr & c, const string & b); @@ -209,10 +204,13 @@ class rpcs : private connection_delegate { static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); struct ReturnOnFailure { static inline int unmarshall_args_failure() { - return rpc_protocol::unmarshal_args_failure; + return rpc_protocol::unmarshall_args_failure; } }; - reg1(proc.id, marshalled_func::wrap(f, c)); + lock pl(procs_m_); + VERIFY(procs_.count(proc.id) == 0); + procs_[proc.id] = marshalled_func::wrap(f, c); + VERIFY(procs_.count(proc.id) >= 1); } void start(); diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h index 4a3ff32..65b7523 100644 --- a/rpc/rpc_protocol.h +++ b/rpc/rpc_protocol.h @@ -13,8 +13,8 @@ namespace rpc_protocol { enum : status { timeout_failure = -1, - unmarshal_args_failure = -2, - unmarshal_reply_failure = -3, + unmarshall_args_failure = -2, + unmarshall_reply_failure = -3, atmostonce_failure = -4, oldsrv_failure = -5, bind_failure = -6, diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 4dd2af2..d4f53f4 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -23,9 +23,9 @@ static in_port_t port; class srv { public: int handle_22(string & r, const string a, const string b); - int handle_fast(int &r, const int a); - int handle_slow(int &r, const int a); - int handle_bigrep(string &r, const size_t a); + int handle_fast(int & r, const int a); + int handle_slow(int & r, const int a); + int handle_bigrep(string & r, const size_t a); }; namespace srv_protocol { @@ -44,23 +44,23 @@ namespace srv_protocol { // rpcs::reg() decides how to unmarshall by looking // at these argument types, so this function definition // does what a .x file does in SunRPC. -int srv::handle_22(string &r, const string a, string b) { +int srv::handle_22(string & r, const string a, string b) { r = a + b; return 0; } -int srv::handle_fast(int &r, const int a) { +int srv::handle_fast(int & r, const int a) { r = a + 1; return 0; } -int srv::handle_slow(int &r, const int a) { +int srv::handle_slow(int & r, const int a) { usleep(random() % 500); r = a + 2; return 0; } -int srv::handle_bigrep(string &r, const size_t len) { +int srv::handle_bigrep(string & r, const size_t len) { r = string(len, 'x'); return 0; } diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 4988dab..fc7be3d 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -18,7 +18,7 @@ thread_pool::~thread_pool() { th_[i].join(); } -bool thread_pool::addJob(const job_t &j) { +bool thread_pool::addJob(const job_t & j) { return jobq_.enq(j,blockadd_); } diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index 28c5236..df11f20 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -11,7 +11,7 @@ class thread_pool { thread_pool(size_t sz, bool blocking=true); ~thread_pool(); - bool addJob(const job_t &j); + bool addJob(const job_t & j); private: size_t nthreads_; diff --git a/rsm.cc b/rsm.cc index 956f45d..c766145 100644 --- a/rsm.cc +++ b/rsm.cc @@ -83,9 +83,7 @@ #include "rsm_client.h" #include -rsm::rsm(const string & _first, const string & _me) : - stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), - partitioned (false), dopartition(false), break1(false), break2(false) +rsm::rsm(const string & _first, const string & _me) : primary(_first) { cfg = unique_ptr(new config(_first, _me, this)); @@ -103,7 +101,7 @@ rsm::rsm(const string & _first, const string & _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = unique_ptr(new rpcs((in_port_t)stoi(_me) + 1)); + testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1)); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); } @@ -115,11 +113,6 @@ void rsm::start() { thread(&rsm::recovery, this).detach(); } -void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) { - lock ml(rsm_mutex); - procs[proc] = h; -} - // The recovery thread runs this function void rsm::recovery() { bool r = true; @@ -279,7 +272,7 @@ void rsm::commit_change(unsigned vid) { lock ml(rsm_mutex); commit_change(vid, ml); if (cfg->ismember(cfg->myaddr(), vid_commit)) - breakpoint2(); + breakpoint(2); } void rsm::commit_change(unsigned vid, lock &) { @@ -293,7 +286,7 @@ void rsm::commit_change(unsigned vid, lock &) { recovery_cond.notify_one(); sync_cond.notify_one(); if (cfg->ismember(cfg->myaddr(), vid_commit)) - breakpoint2(); + breakpoint(2); } @@ -301,10 +294,9 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r LOG("execute"); handler *h = procs[procno]; VERIFY(h); - unmarshall args(req, false); marshall rep; - auto ret = (rsm_protocol::status)(*h)(args, rep); - r = marshall{ret, rep.content()}.content(); + auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep); + r = marshall(ret, rep.content()).content(); } // @@ -350,7 +342,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; - breakpoint1(); + breakpoint(1); lock rsm_mutex_lock(rsm_mutex); partition1(rsm_mutex_lock); } @@ -398,14 +390,14 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp string r; execute(proc, req, r); last_myvs = vs; - breakpoint1(); + breakpoint(1); return rsm_protocol::OK; } // // RPC handler: Send back the local node's state to the caller // -rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src, +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << @@ -473,7 +465,7 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last // RPC handler: Responds with the list of known nodes for fall-back on a // primary failure // -rsm_client_protocol::status rsm::client_members(vector &r, int) { +rsm_client_protocol::status rsm::client_members(vector & r, int) { vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); @@ -516,7 +508,8 @@ bool rsm::amiprimary() { // Test RPCs -- simulate partitions and failures -void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { +void rsm::net_repair(bool heal, lock & rsm_mutex_lock) { + VERIFY(rsm_mutex_lock); vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { @@ -529,7 +522,7 @@ void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { rsmrpc->set_reachable(heal); } -rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) { +rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) { lock ml(rsm_mutex); LOG("heal " << heal << " (dopartition " << dopartition << ", partitioned " << partitioned << ")"); @@ -543,16 +536,9 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, // simulate failure at breakpoint 1 and 2 -void rsm::breakpoint1() { - if (break1) { - LOG("Dying at breakpoint 1 in rsm!"); - exit(1); - } -} - -void rsm::breakpoint2() { - if (break2) { - LOG("Dying at breakpoint 2 in rsm!"); +void rsm::breakpoint(int b) { + if (breakpoints[b-1]) { + LOG("Dying at breakpoint " << b << " in rsm!"); exit(1); } } @@ -565,12 +551,12 @@ void rsm::partition1(lock & rsm_mutex_lock) { } } -rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) { +rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); LOG("breakpoint " << b); - if (b == 1) break1 = true; - else if (b == 2) break2 = true; + if (b == 1) breakpoints[1-1] = true; + else if (b == 2) breakpoints[2-1] = true; else if (b == 3 || b == 4) cfg->breakpoint(b); else r = rsm_test_protocol::ERR; return r; diff --git a/rsm.h b/rsm.h index 14dc011..ca03473 100644 --- a/rsm.h +++ b/rsm.h @@ -17,8 +17,6 @@ class rsm_state_transfer { }; class rsm : public config_view_change { - private: - void reg1(rpc_protocol::proc_id_t proc, handler *); protected: map procs; unique_ptr cfg; @@ -29,27 +27,26 @@ class rsm : public config_view_change { viewstamp last_myvs{0, 0}; // Viewstamp of the last executed request viewstamp myvs{0, 1}; string primary; - bool insync; - bool inviewchange; - unsigned vid_commit; // Latest view id that is known to rsm layer + bool insync = false; + bool inviewchange = true; + unsigned vid_commit = 0; // Latest view id that is known to rsm layer unsigned vid_insync; // The view id that this node is synchronizing for vector backups; // A list of unsynchronized backups // For testing purposes unique_ptr testsvr; - bool partitioned; - bool dopartition; - bool break1; - bool break2; + bool partitioned = false; + bool dopartition = false; + bool breakpoints[2] = {}; - rsm_client_protocol::status client_members(vector &r, int i); + rsm_client_protocol::status client_members(vector & r, int i); rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq); - rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src, + rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src, viewstamp last, unsigned vid); rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid); rsm_protocol::status joinreq(string & log, const string & src, viewstamp last); - rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal); - rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b); + rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal); + rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b); mutex rsm_mutex, invoke_mutex; cond recovery_cond, sync_cond; @@ -63,24 +60,24 @@ class rsm : public config_view_change { bool sync_with_backups(lock & rsm_mutex_lock); bool sync_with_primary(lock & rsm_mutex_lock); void net_repair(bool heal, lock & rsm_mutex_lock); - void breakpoint1(); - void breakpoint2(); + void breakpoint(int b); void partition1(lock & rsm_mutex_lock); void commit_change(unsigned vid, lock & rsm_mutex_lock); + void recovery NORETURN (); public: rsm (const string & _first, const string & _me); bool amiprimary(); void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; } - void recovery NORETURN (); void commit_change(unsigned vid); template void reg(rpc_protocol::proc_t

proc, F f, C *c=nullptr) { static_assert(is_valid_registration::value, "RSM handler registered with incorrect argument types"); - reg1(proc.id, marshalled_func::wrap(f, c)); + lock ml(rsm_mutex); + procs[proc.id] = marshalled_func::wrap(f, c); } void start(); }; -#endif /* rsm_h */ +#endif diff --git a/rsm_client.cc b/rsm_client.cc index 9e86915..598a1ed 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -15,7 +15,7 @@ void rsm_client::primary_failure(lock &) { known_mems.pop_back(); } -rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) { +rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) { lock ml(rsm_client_mutex); while (1) { LOG("proc " << hex << proc << " primary " << primary); diff --git a/rsm_client.h b/rsm_client.h index be66fec..06ca5a6 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -19,7 +19,7 @@ class rsm_client { mutex rsm_client_mutex; void primary_failure(lock & rsm_client_mutex_lock); bool init_members(lock & rsm_client_mutex_lock); - rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req); + rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req); template int call_m(unsigned int proc, R & r, const marshall & req); public: rsm_client(string dst); @@ -27,7 +27,7 @@ class rsm_client { template int call(rpc_protocol::proc_t

proc, R & r, const Args & ...a1) { static_assert(is_valid_call::value, "RSM method invoked with incorrect argument types"); - return call_m(proc.id, r, marshall{a1...}); + return call_m(proc.id, r, marshall(a1...)); } }; @@ -43,12 +43,11 @@ inline string hexify(const string & s) { template int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { string rep; - string res; int intret = invoke(proc, rep, req.content()); VERIFY( intret == rsm_client_protocol::OK ); - unmarshall u(rep, false); - u >> intret; + unmarshall u(rep, false, intret); if (intret < 0) return intret; + string res; u >> res; if (!u.okdone()) { LOG("failed to unmarshall the reply."); @@ -57,17 +56,15 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { "0x" << hex << proc << " with the wrong return type"); LOG("here's what I got: \"" << hexify(rep) << "\""); VERIFY(0); - return rpc_protocol::unmarshal_reply_failure; + return rpc_protocol::unmarshall_reply_failure; } - unmarshall u1(res, false); - u1 >> r; - if(!u1.okdone()) { + if(!unmarshall(res, false, r).okdone()) { LOG("failed to unmarshall the reply."); LOG("You are probably calling RPC 0x" << hex << proc << " with the wrong return type."); LOG("here's what I got: \"" << hexify(res) << "\""); VERIFY(0); - return rpc_protocol::unmarshal_reply_failure; + return rpc_protocol::unmarshall_reply_failure; } return intret; } diff --git a/threaded_log.h b/threaded_log.h index 6630a86..903b0fa 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -13,14 +13,14 @@ extern char log_thread_prefix; namespace std { // Sticking this in std:: makes it possible for ostream_iterator to use it. template - ostream & operator<<(ostream &o, const pair &d) { + ostream & operator<<(ostream & o, const pair & d) { return o << "<" << d.first << "," << d.second << ">"; } } template typename enable_if::value && !is_same::value, ostream>::type & -operator<<(ostream &o, const A &a) { +operator<<(ostream & o, const A & a) { return o << "[" << implode(a, ", ") << "]"; } diff --git a/types.h b/types.h index ede859f..888cd68 100644 --- a/types.h +++ b/types.h @@ -122,13 +122,13 @@ using std::vector; template struct is_const_iterable : false_type {}; template struct is_const_iterable().cbegin(), declval().cend(), void()) + decltype(declval().cbegin(), declval().cend(), void()) > : true_type {}; template struct supports_emplace_back : false_type {}; template struct supports_emplace_back().emplace_back(declval()), void()) + decltype(declval().emplace_back(declval()), void()) > : true_type {}; template @@ -151,7 +151,7 @@ implode(const C & v, string delim=" ") { return oss.str(); } -inline vector explode(const string &s, string delim=" ") { +inline vector explode(const string & s, string delim=" ") { vector out; size_t start = 0, end = 0; while ((end = s.find(delim, start)) != string::npos) { @@ -185,7 +185,7 @@ inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS // LEXICOGRAPHIC_COMPARISON(foo) #define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \ -inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); } +inline bool operator _op_(const _c_ & b) const { return _tuple_() _op_ b._tuple_(); } #define LEXICOGRAPHIC_COMPARISON(_c_) \ LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \ @@ -212,7 +212,7 @@ template struct make_tuple_indices { // Template parameter pack expansion is not allowed in certain contexts, but // brace initializers (for instance, calls to constructors of empty structs) // are fair game. -struct pass { template inline pass(Args&&...) {} }; +struct pass { template inline pass(Args && ...) {} }; #include "endian.h" -- 1.7.9.5