Cosmetic improvements.
authorPeter Iannucci <iannucci@mit.edu>
Sat, 16 Nov 2013 17:56:29 +0000 (12:56 -0500)
committerPeter Iannucci <iannucci@mit.edu>
Sat, 16 Nov 2013 17:56:29 +0000 (12:56 -0500)
26 files changed:
Makefile.osx
config.cc
config.h
lock_client.cc
lock_client.h
lock_server.cc
lock_server.h
lock_smain.cc
paxos.cc
paxos.h
rpc/connection.cc
rpc/connection.h
rpc/marshall.h
rpc/marshall_wrap.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpc_protocol.h
rpc/rpctest.cc
rpc/thr_pool.cc
rpc/thr_pool.h
rsm.cc
rsm.h
rsm_client.cc
rsm_client.h
threaded_log.h
types.h

index 80d85a3..74218df 100644 (file)
@@ -2,7 +2,7 @@ PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \
                   -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
                   -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
                   -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++
-OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins 
+OPTFLAGS = -O3 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins 
 STDLIB = -stdlib=libc++ 
 #STDLIB = 
 CXX = clang++-mp-3.4
index 2b40078..7df1bbc 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -33,7 +33,7 @@
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
-config::config(const string &_first, const string &_me, config_view_change *_vc)
+config::config(const string & _first, const string & _me, config_view_change *_vc)
     : my_view_id(0), first(_first), me(_me), vc(_vc),
       paxos(this, me == _first, me, me)
 {
@@ -43,32 +43,25 @@ config::config(const string &_first, const string &_me, config_view_change *_vc)
     thread(&config::heartbeater, this).detach();
 }
 
-void config::restore(const string &s) {
+void config::restore(const string & s) {
     lock cfg_mutex_lock(cfg_mutex);
     paxos.restore(s);
     reconstruct(cfg_mutex_lock);
 }
 
-void config::get_view(unsigned instance, vector<string> &m) {
+void config::get_view(unsigned instance, vector<string> & m) {
     lock cfg_mutex_lock(cfg_mutex);
     get_view(instance, m, cfg_mutex_lock);
 }
 
-void config::get_view(unsigned instance, vector<string> &m, lock &) {
+void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
+    VERIFY(cfg_mutex_lock);
     string value = paxos.value(instance);
     LOG("get_view(" << instance << "): returns " << value);
-    m = members(value);
-}
-
-vector<string> config::members(const string &value) const {
-    return explode(value);
+    m = explode(value);
 }
 
-string config::value(const vector<string> &members) const {
-    return implode(members);
-}
-
-void config::reconstruct(lock &cfg_mutex_lock) {
+void config::reconstruct(lock & cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
     my_view_id = paxos.instance();
     if (my_view_id > 0) {
@@ -78,10 +71,10 @@ void config::reconstruct(lock &cfg_mutex_lock) {
 }
 
 // Called by Paxos's acceptor.
-void config::paxos_commit(unsigned instance, const string &value) {
+void config::paxos_commit(unsigned instance, const string & value) {
     lock cfg_mutex_lock(cfg_mutex);
 
-    vector<string> newmem = members(value);
+    vector<string> newmem = explode(value);
     LOG("instance " << instance << ": " << newmem);
 
     for (auto mem : mems) {
@@ -101,14 +94,14 @@ void config::paxos_commit(unsigned instance, const string &value) {
     }
 }
 
-bool config::ismember(const string &m, unsigned vid) {
+bool config::ismember(const string & m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
     vector<string> v;
     get_view(vid, v, cfg_mutex_lock);
     return isamember(m, v);
 }
 
-bool config::add(const string &new_m, unsigned vid) {
+bool config::add(const string & new_m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
     LOG("adding " << new_m << " to " << vid);
     if (vid != my_view_id) {
@@ -118,18 +111,19 @@ bool config::add(const string &new_m, unsigned vid) {
     LOG("calling down to paxos layer");
     vector<string> m(mems), cmems(mems);
     m.push_back(new_m);
-    LOG("old mems " << cmems << " " << value(cmems));
-    LOG("new mems " << m << " " << value(m));
+    LOG("old mems " << cmems << " " << implode(cmems));
+    LOG("new mems " << m << " " << implode(m));
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos.run(nextvid, cmems, value(m));
+    bool r = paxos.run(nextvid, cmems, implode(m));
     cfg_mutex_lock.lock();
     LOG("paxos proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
 // caller should hold cfg_mutex
-bool config::remove(const string &m, lock &cfg_mutex_lock) {
+bool config::remove(const string & m, lock & cfg_mutex_lock) {
+    VERIFY(cfg_mutex_lock);
     LOG("my_view_id " << my_view_id << " remove? " << m);
     vector<string> n;
     for (auto mem : mems) {
@@ -139,7 +133,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) {
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos.run(nextvid, cmems, value(n));
+    bool r = paxos.run(nextvid, cmems, implode(n));
     cfg_mutex_lock.lock();
     LOG("proposer returned " << (r ? "success" : "failure"));
     return r;
@@ -183,7 +177,7 @@ void config::heartbeater() {
     }
 }
 
-paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
     r = (int) my_view_id;
     LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
@@ -196,7 +190,8 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
     return paxos_protocol::ERR;
 }
 
-config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
+config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
+    VERIFY(cfg_mutex_lock);
     unsigned vid = my_view_id;
     LOG("heartbeat to " << m << " (" << vid << ")");
     handle h(m);
index 7124a6e..895de1b 100644 (file)
--- a/config.h
+++ b/config.h
@@ -20,29 +20,27 @@ class config : public paxos_change {
         vector<string> mems;
         mutex cfg_mutex;
         cond config_cond;
-        paxos_protocol::status heartbeat(int &r, string m, unsigned instance);
-        string value(const vector<string> &mems) const;
-        vector<string> members(const string &v) const;
-        void get_view(unsigned instance, vector<string> &m, lock &cfg_mutex_lock);
-        bool remove(const string &, lock &cfg_mutex_lock);
-        void reconstruct(lock &cfg_mutex_lock);
+        paxos_protocol::status heartbeat(int & r, string m, unsigned instance);
+        void get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock);
+        bool remove(const string &, lock & cfg_mutex_lock);
+        void reconstruct(lock & cfg_mutex_lock);
         typedef enum {
             OK, // response and same view #
             VIEWERR, // response but different view #
             FAILURE, // no response
         } heartbeat_t;
-        heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock);
+        heartbeat_t doheartbeat(const string & m, lock & cfg_mutex_lock);
     public:
-        config(const string &_first, const string &_me, config_view_change *_vc);
+        config(const string & _first, const string & _me, config_view_change *_vc);
         unsigned view_id() { return my_view_id; }
-        const string &myaddr() const { return me; }
+        const string & myaddr() const { return me; }
         string dump() { return paxos.dump(); }
-        void get_view(unsigned instance, vector<string> &m);
-        void restore(const string &s);
+        void get_view(unsigned instance, vector<string> & m);
+        void restore(const string & s);
         bool add(const string &, unsigned view_id);
-        bool ismember(const string &m, unsigned view_id);
+        bool ismember(const string & m, unsigned view_id);
         void heartbeater NORETURN ();
-        void paxos_commit(unsigned instance, const string &v);
+        void paxos_commit(unsigned instance, const string & v);
         rpcs *get_rpcs() { return paxos.get_rpcs(); }
         void breakpoint(int b) { paxos.breakpoint(b); }
 };
index 3c5aa89..beca1cc 100644 (file)
@@ -52,7 +52,7 @@ void lock_client::releaser() {
         release_fifo.deq(&lid);
         LOG("Releaser: " << lid);
 
-        lock_state &st = get_lock_state(lid);
+        lock_state & st = get_lock_state(lid);
         lock sl(st.m);
         VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
         st.state = lock_state::releasing;
@@ -79,7 +79,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) {
 }
 
 lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     auto self = this_thread::get_id();
 
@@ -144,7 +144,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
 }
 
 lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     auto self = this_thread::get_id();
     VERIFY(st.state == lock_state::locked && st.held_by == self);
@@ -167,7 +167,7 @@ lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
 
 rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
     LOG("Revoke handler " << lid << " " << xid);
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
 
     if (st.state == lock_state::releasing || st.state == lock_state::none)
@@ -189,7 +189,7 @@ rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_
 }
 
 rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     VERIFY(st.state == lock_state::acquiring);
     st.state = lock_state::retrying;
index 73dfffe..654cf4f 100644 (file)
@@ -55,7 +55,7 @@ class lock_client {
         fifo<lock_protocol::lockid_t> release_fifo;
         mutex lock_table_lock;
         lock_map lock_table;
-        lock_state &get_lock_state(lock_protocol::lockid_t lid);
+        lock_state & get_lock_state(lock_protocol::lockid_t lid);
     public:
         static in_port_t last_port;
         lock_client(string xdst, lock_release_user *l = 0);
index 0c3a6e9..90ad5b2 100644 (file)
@@ -10,11 +10,11 @@ lock_state::lock_state():
 {
 }
 
-lock_state::lock_state(const lock_state &other) {
+lock_state::lock_state(const lock_state & other) {
     *this = other;
 }
 
-lock_state& lock_state::operator=(const lock_state& o) {
+lock_state & lock_state::operator=(const lock_state & o) {
     held = o.held;
     held_by = o.held_by;
     wanted_by = o.wanted_by;
@@ -28,10 +28,14 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) {
     return lock_table[lid];
 }
 
-lock_server::lock_server(rsm *r) : rsm_ (r) {
+lock_server::lock_server(rsm & r) : rsm_ (&r) {
     thread(&lock_server::revoker, this).detach();
     thread(&lock_server::retryer, this).detach();
-    rsm_->set_state_transfer(this);
+    r.set_state_transfer(this);
+
+    r.reg(lock_protocol::acquire, &lock_server::acquire, this);
+    r.reg(lock_protocol::release, &lock_server::release, this);
+    r.reg(lock_protocol::stat, &lock_server::stat, this);
 }
 
 void lock_server::revoker () {
@@ -42,7 +46,7 @@ void lock_server::revoker () {
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
-        lock_state &st = get_lock_state(lid);
+        lock_state & st = get_lock_state(lid);
         holder_t held_by;
         {
             lock sl(st.m);
@@ -70,7 +74,7 @@ void lock_server::retryer() {
             continue;
 
         LOG("Sending retry for " << lid);
-        lock_state &st = get_lock_state(lid);
+        lock_state & st = get_lock_state(lid);
         holder_t front;
         {
             lock sl(st.m);
@@ -95,7 +99,7 @@ void lock_server::retryer() {
 lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
     LOG("lid=" << lid << " client=" << id << "," << xid);
     holder_t h = holder_t(id, xid);
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
 
     // deal with duplicated requests
@@ -152,7 +156,7 @@ lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, c
 
 lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
     LOG("lid=" << lid << " client=" << id << "," << xid);
-    lock_state &st = get_lock_state(lid);
+    lock_state & st = get_lock_state(lid);
     lock sl(st.m);
     if (st.held && st.held_by == holder_t(id, xid)) {
         st.held = false;
@@ -165,18 +169,15 @@ lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, c
 
 string lock_server::marshal_state() {
     lock sl(lock_table_lock);
-    marshall rep;
-    rep << nacquire << lock_table;
-    return rep.content();
+    return marshall(nacquire, lock_table).content();
 }
 
 void lock_server::unmarshal_state(const string & state) {
     lock sl(lock_table_lock);
-    unmarshall rep(state, false);
-    rep >> nacquire >> lock_table;
+    unmarshall(state, false, nacquire, lock_table);
 }
 
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) {
+lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
     LOG("stat request for " << lid);
     VERIFY(0);
     r = nacquire;
index d3ec580..1f30f87 100644 (file)
@@ -11,13 +11,13 @@ typedef pair<callback_t, lock_protocol::xid_t> holder_t;
 class lock_state {
 public:
     lock_state();
-    lock_state(const lock_state &other);
+    lock_state(const lock_state & other);
     bool held;
     holder_t held_by;
     list<holder_t> wanted_by;
     map<callback_t, lock_protocol::xid_t> old_requests;
     mutex m;
-    lock_state& operator=(const lock_state&);
+    lock_state & operator=(const lock_state &);
 
     MEMBERS(held, held_by, wanted_by)
 };
@@ -26,21 +26,21 @@ MARSHALLABLE_STRUCT(lock_state)
 
 typedef map<lock_protocol::lockid_t, lock_state> lock_map;
 
-class lock_server : public rsm_state_transfer {
+class lock_server : private rsm_state_transfer {
     private:
         int nacquire;
         mutex lock_table_lock;
         lock_map lock_table;
-        lock_state &get_lock_state(lock_protocol::lockid_t lid);
+        lock_state & get_lock_state(lock_protocol::lockid_t lid);
         fifo<lock_protocol::lockid_t> retry_fifo;
         fifo<lock_protocol::lockid_t> revoke_fifo;
         rsm *rsm_;
-    public:
-        lock_server(rsm *r = 0);
-        void revoker NORETURN ();
-        void retryer NORETURN ();
         string marshal_state();
         void unmarshal_state(const string & state);
+        void revoker NORETURN ();
+        void retryer NORETURN ();
+    public:
+        lock_server(rsm & r);
         lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
         lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
         lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id);
index 0a3c209..2c9828b 100644 (file)
@@ -18,12 +18,7 @@ int main(int argc, char *argv[]) {
     }
 
     rsm rsm(argv[1], argv[2]);
-    lock_server ls(&rsm);
-    rsm.set_state_transfer(&ls);
-
-    rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls);
-    rsm.reg(lock_protocol::release, &lock_server::release, &ls);
-    rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
+    lock_server ls(rsm);
 
     rsm.start();
 
index a88a7a5..dad5ecf 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -6,7 +6,7 @@ bool isamember(const node_t & m, const nodes_t & nodes) {
 }
 
 // check if l2 contains a majority of the elements of l1
-bool majority(const nodes_t &l1, const nodes_t &l2) {
+bool majority(const nodes_t & l1, const nodes_t & l2) {
     auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
     return overlap >= (l1.size() >> 1) + 1;
 }
diff --git a/paxos.h b/paxos.h
index db18e6c..f1123e6 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -66,7 +66,7 @@ class proposer_acceptor {
         unsigned instance() { return instance_h; }
         const value_t & value(unsigned instance) { return values[instance]; }
         string dump() { return l.dump(); }
-        void restore(const string &s) { l.restore(s); l.logread(); }
+        void restore(const string & s) { l.restore(s); l.logread(); }
         rpcs *get_rpcs() { return &pxs; }
 
         bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
index b29e136..4e49305 100644 (file)
@@ -31,7 +31,7 @@ connection::~connection() {
     VERIFY(!wpdu_.buf.size());
 }
 
-shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
+shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
index 8f7d494..3133299 100644 (file)
@@ -28,7 +28,7 @@ class connection : private aio_callback, public enable_shared_from_this<connecti
 
         bool send(const string & b);
 
-        static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
+        static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
 
         const time_point<steady_clock> create_time = steady_clock::now();
         const file_t fd;
index 6e0c94a..7b716e2 100644 (file)
@@ -4,9 +4,6 @@
 #include "types.h"
 #include "rpc_protocol.h"
 
-class marshall;
-class unmarshall;
-
 //
 // Marshall and unmarshall objects
 //
@@ -18,7 +15,7 @@ class marshall {
 
     public:
         template <typename... Args>
-        marshall(const Args&... args) {
+        marshall(const Args & ... args) {
             (void)pass{(*this << args)...};
         }
 
@@ -35,7 +32,7 @@ class marshall {
         inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
 
         // letting S be a defaulted template parameter forces the compiler to
-        // delay looking up operator<<(marshall&, rpc_sz_t) until we define it
+        // delay looking up operator<<(marshall &, rpc_sz_t) until we define it
         // (i.e. we define an operator for marshalling uint32_t)
         template <class T, class S=rpc_protocol::rpc_sz_t> inline void
         pack_header(const T & h) {
@@ -54,11 +51,13 @@ class unmarshall {
         bool ok_ = false;
 
     public:
-        unmarshall(const string &s, bool has_header)
+        template <typename... Args>
+        unmarshall(const string & s, bool has_header, Args && ... args)
             : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) {
             if (!has_header)
                 buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
             ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
+            (void)pass{(*this >> args)...};
         }
 
         bool ok() const { return ok_; }
@@ -89,8 +88,8 @@ class unmarshall {
 //
 
 #define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
-inline marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
-inline unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
+inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
 
 #define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
 
@@ -133,13 +132,13 @@ operator<<(marshall & m, tuple<Args...> && t) {
 }
 
 template <class... Args, size_t... Indices> inline unmarshall &
-tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
+tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, tuple_indices<Indices...>) {
     (void)pass{(u >> get<Indices>(t))...};
     return u;
 }
 
 template <class... Args> unmarshall &
-operator>>(unmarshall & u, tuple<Args &...> && t) {
+operator>>(unmarshall & u, tuple<Args & ...> && t) {
     using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
     return tuple_unmarshall_imp(u, t, Indices());
 }
@@ -150,8 +149,8 @@ operator>>(unmarshall & u, tuple<Args &...> && t) {
 
 // Implements struct marshalling via tuple marshalling of members.
 #define MARSHALLABLE_STRUCT(_c_) \
-inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
-inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
+inline unmarshall & operator>>(unmarshall & u, _c_ & a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall & m, const _c_ a) { return m << a._tuple_(); }
 
 // our first two marshallable structs...
 MARSHALLABLE_STRUCT(rpc_protocol::request_header)
@@ -164,9 +163,9 @@ MARSHALLABLE_STRUCT(rpc_protocol::reply_header)
 // this overload is visible for type A only if A::cbegin and A::cend exist
 template <class A> inline typename
 enable_if<is_const_iterable<A>::value, marshall>::type &
-operator<<(marshall &m, const A &x) {
+operator<<(marshall & m, const A & x) {
     m << (unsigned int)x.size();
-    for (const auto &a : x)
+    for (const auto & a : x)
         m << a;
     return m;
 }
@@ -174,7 +173,7 @@ operator<<(marshall &m, const A &x) {
 // visible for type A if A::emplace_back(a) makes sense
 template <class A> inline typename
 enable_if<supports_emplace_back<A>::value, unmarshall>::type &
-operator>>(unmarshall &u, A &x) {
+operator>>(unmarshall & u, A & x) {
     unsigned n = u._grab<unsigned>();
     x.clear();
     while (n--)
@@ -184,18 +183,18 @@ operator>>(unmarshall &u, A &x) {
 
 // std::pair<A, B>
 template <class A, class B> inline marshall &
-operator<<(marshall &m, const pair<A,B> &d) {
+operator<<(marshall & m, const pair<A,B> & d) {
     return m << d.first << d.second;
 }
 
 template <class A, class B> inline unmarshall &
-operator>>(unmarshall &u, pair<A,B> &d) {
+operator>>(unmarshall & u, pair<A,B> & d) {
     return u >> d.first >> d.second;
 }
 
 // std::map<A, B>
 template <class A, class B> inline unmarshall &
-operator>>(unmarshall &u, map<A,B> &x) {
+operator>>(unmarshall & u, map<A,B> & x) {
     unsigned n = u._grab<unsigned>();
     x.clear();
     while (n--)
@@ -204,13 +203,13 @@ operator>>(unmarshall &u, map<A,B> &x) {
 }
 
 // std::string
-inline marshall & operator<<(marshall &m, const string &s) {
+inline marshall & operator<<(marshall & m, const string & s) {
     m << (uint32_t)s.size();
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
-inline unmarshall & operator>>(unmarshall &u, string &s) {
+inline unmarshall & operator>>(unmarshall & u, string & s) {
     uint32_t sz = u._grab<uint32_t>();
     if (u.ok()) {
         s.resize(sz);
@@ -224,12 +223,12 @@ inline unmarshall & operator>>(unmarshall &u, string &s) {
 //
 
 template <class E> typename enable_if<is_enum<E>::value, marshall>::type &
-operator<<(marshall &m, E e) {
+operator<<(marshall & m, E e) {
     return m << from_enum(e);
 }
 
 template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
-operator>>(unmarshall &u, E &e) {
+operator>>(unmarshall & u, E & e) {
     e = to_enum<E>(u._grab<enum_type_t<E>>());
     return u;
 }
@@ -238,11 +237,11 @@ operator>>(unmarshall &u, E &e) {
 // Recursive marshalling
 //
 
-inline marshall & operator<<(marshall &m, marshall &n) {
+inline marshall & operator<<(marshall & m, marshall & n) {
     return m << n.content();
 }
 
-inline unmarshall & operator>>(unmarshall &u, unmarshall &v) {
+inline unmarshall & operator>>(unmarshall & u, unmarshall & v) {
     v = unmarshall(u._grab<string>(), false);
     return u;
 }
index 8e10a75..fa55f17 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "marshall.h"
 
-typedef function<int(unmarshall &, marshall &)> handler;
+typedef function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
 
 //
 // Automatic marshalling wrappers for RPC handlers
@@ -71,7 +71,7 @@ template <class Functor, class Instance, class Signature,
 // between various types of callable objects at this level of abstraction.
 
 template <class F, class C, class ErrorHandler, class R, class RV, class... Args>
-struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
+struct marshalled_func_imp<F, C, RV(R &, Args...), ErrorHandler> {
     static inline handler *wrap(F f, C *c=nullptr) {
         // This type definition corresponds to an empty struct with
         // template parameters running from 0 up to (# args) - 1.
@@ -82,7 +82,7 @@ struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
         using ArgsStorage = tuple<typename decay<Args>::type...>;
         // Allocate a handler (i.e. function) to hold the lambda
         // which will unmarshall RPCs and call f.
-        return new handler([=](unmarshall &u, marshall &m) -> RV {
+        return new handler([=](unmarshall && u, marshall & m) -> RV {
             // Unmarshall each argument with the correct type and store the
             // result in a tuple.
             ArgsStorage t{u._grab<typename decay<Args>::type>()...};
index 0c3a97d..7937785 100644 (file)
@@ -65,29 +65,17 @@ inline void set_rand_seed() {
     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
 }
 
-static sockaddr_in make_sockaddr(const string &hostandport);
+static sockaddr_in make_sockaddr(const string & hostandport);
 
-rpcc::rpcc(const string & d, bool retrans) :
-    dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
-    retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
+rpcc::rpcc(const string & d) : dst_(make_sockaddr(d))
 {
-    if (retrans) {
-        set_rand_seed();
-        clt_nonce_ = (nonce_t)random();
-    } else {
-        // special client nonce 0 means this client does not
-        // require at-most-once logic from the server
-        // because it uses tcp and never retries a failed connection
-        clt_nonce_ = 0;
-    }
+    set_rand_seed();
+    clt_nonce_ = (nonce_t)random();
 
     char *loss_env = getenv("RPC_LOSSY");
     if (loss_env)
         lossytest_ = atoi(loss_env);
 
-    // xid starts with 1 and latest received reply starts with 0
-    xid_rep_window_.push_back(0);
-
     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
 }
 
@@ -102,7 +90,7 @@ rpcc::~rpcc() {
 
 int rpcc::bind(milliseconds to) {
     nonce_t r;
-    int ret = call_timeout(rpc_protocol::bind, to, r);
+    rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r);
     if (ret == 0) {
         lock ml(m_);
         bind_done_ = true;
@@ -118,7 +106,7 @@ void rpcc::cancel(void) {
     lock ml(m_);
     if (calls_.size()) {
         LOG("force callers to fail");
-        for (auto &p : calls_) {
+        for (auto & p : calls_) {
             caller *ca = p.second;
 
             IF_LEVEL(2) LOG("force caller to fail");
@@ -137,7 +125,7 @@ void rpcc::cancel(void) {
     }
 }
 
-int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
+int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
 
     caller ca(0, &rep);
     xid_t xid_rep;
@@ -169,7 +157,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
 
     while (1) {
         if (transmit) {
-            get_refconn(ch);
+            get_latest_connection(ch);
             if (ch) {
                 if (reachable_) {
                     request forgot;
@@ -212,11 +200,9 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
         if (nextdeadline >= finaldeadline)
             break;
 
-        if (retrans_ && (!ch || ch->isdead())) {
-            // since connection is dead, retransmit
-            // on the new connection
+        // retransmit on new connection if connection is dead
+        if (!ch || ch->isdead())
             transmit = true;
-        }
     }
 
     {
@@ -226,7 +212,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
         // may need to update the xid again here, in case the
         // packet times out before it's even sent by the channel.
         // I don't think there's any harm in maybe doing it twice
-        update_xid_rep(ca.xid);
+        update_xid_rep(ca.xid, ml);
 
         if (destroy_wait_)
             destroy_wait_c_.notify_one();
@@ -253,7 +239,7 @@ int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
     return (ca.done? ca.intret : rpc_protocol::timeout_failure);
 }
 
-void rpcc::get_refconn(shared_ptr<connection> & ch) {
+void rpcc::get_latest_connection(shared_ptr<connection> & ch) {
     lock ml(chan_m_);
     if (!chan_ || chan_->isdead())
         chan_ = connection::to_dst(dst_, this, lossytest_);
@@ -281,7 +267,7 @@ rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
 
     lock ml(m_);
 
-    update_xid_rep(h.xid);
+    update_xid_rep(h.xid, ml);
 
     if (calls_.find(h.xid) == calls_.end()) {
         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
@@ -302,10 +288,8 @@ rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
     return true;
 }
 
-// assumes thread holds mutex m
-void
-rpcc::update_xid_rep(int xid)
-{
+void rpcc::update_xid_rep(xid_t xid, lock & m_lock) {
+    VERIFY(m_lock);
     if (xid <= xid_rep_window_.front())
         return;
 
@@ -325,15 +309,13 @@ compress:
     }
 }
 
-rpcs::rpcs(in_port_t p1)
-  : port_(p1), reachable_ (true)
+rpcs::rpcs(in_port_t p1) : port_(p1)
 {
     set_rand_seed();
     nonce_ = (nonce_t)random();
     IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
     reg(rpc_protocol::bind, &rpcs::rpcbind, this);
-    dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
 }
 
 void rpcs::start() {
@@ -345,7 +327,6 @@ rpcs::~rpcs() {
     // must delete listener before dispatchpool
     listener_ = nullptr;
     dispatchpool_ = nullptr;
-    free_reply_window();
 }
 
 bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
@@ -357,13 +338,6 @@ bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
     return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b));
 }
 
-void rpcs::reg1(proc_id_t proc, handler *h) {
-    lock pl(procs_m_);
-    VERIFY(procs_.count(proc) == 0);
-    procs_[proc] = h;
-    VERIFY(procs_.count(proc) >= 1);
-}
-
 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
     unmarshall req(buf, true);
 
@@ -405,41 +379,33 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
         f = procs_[proc];
     }
 
-    rpcs::rpcstate_t stat;
-    string b1;
-
-    if (h.clt_nonce) {
-        // have i seen this client before?
-        {
-            lock rwl(reply_window_m_);
-            // if we don't know about this clt_nonce, create a cleanup object
-            if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
-                VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
-                reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
-                IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
-                                " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
-            }
-        }
-
-        // save the latest good connection to the client
-        {
-            lock rwl(conns_m_);
-            if (conns_.find(h.clt_nonce) == conns_.end())
-                conns_[h.clt_nonce] = c;
-            else if (conns_[h.clt_nonce]->create_time < c->create_time)
-                conns_[h.clt_nonce] = c;
+    // have i seen this client before?
+    {
+        lock rwl(reply_window_m_);
+        // if we don't know about this clt_nonce, create a cleanup object
+        if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
+            VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+            reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
+            IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+                            " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
         }
+    }
 
-        stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
-    } else {
-        // this client does not require at most once logic
-        stat = NEW;
+    // save the latest good connection to the client
+    {
+        lock rwl(conns_m_);
+        if (conns_.find(h.clt_nonce) == conns_.end())
+            conns_[h.clt_nonce] = c;
+        else if (conns_[h.clt_nonce]->create_time < c->create_time)
+            conns_[h.clt_nonce] = c;
     }
 
-    switch (stat) {
+    string b1;
+
+    switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) {
         case NEW: // new request
-            rh.ret = (*f)(req, rep);
-            if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+            rh.ret = (*f)(forward<unmarshall>(req), rep);
+            if (rh.ret == rpc_protocol::unmarshall_args_failure) {
                 LOG("failed to unmarshall the arguments. You are " <<
                     "probably calling RPC 0x" << hex << proc << " with the wrong " <<
                     "types of arguments.");
@@ -453,10 +419,7 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
 
-            if (h.clt_nonce > 0) {
-                // only record replies for clients that require at-most-once logic
-                add_reply(h.clt_nonce, h.xid, b1);
-            }
+            add_reply(h.clt_nonce, h.xid, b1);
 
             // get the latest connection to the client
             {
@@ -496,12 +459,12 @@ void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
 //   DONE: seen this xid, previous reply returned in b.
 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
 rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+rpcs::check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
         xid_t xid_rep, string & b)
 {
     lock rwl(reply_window_m_);
 
-    list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t> & l = reply_window_[clt_nonce];
 
     VERIFY(l.size() > 0);
     VERIFY(xid >= xid_rep);
@@ -543,12 +506,10 @@ rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
 // and passes the return value in b.
 // add_reply() should remember b.
-// free_reply_window() and checkduplicate_and_update are responsible for
-// cleaning up the remembered values.
 void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
     lock rwl(reply_window_m_);
     // remember the RPC reply value
-    list<reply_t> &l = reply_window_[clt_nonce];
+    list<reply_t> & l = reply_window_[clt_nonce];
     list<reply_t>::iterator it = l.begin();
     // skip to our place in the list
     for (it++; it != l.end() && it->xid < xid; it++);
@@ -561,18 +522,13 @@ void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
     }
 }
 
-void rpcs::free_reply_window(void) {
-    lock rwl(reply_window_m_);
-    reply_window_.clear();
-}
-
-int rpcs::rpcbind(nonce_t &r) {
+rpc_protocol::status rpcs::rpcbind(nonce_t & r) {
     IF_LEVEL(2) LOG("called return nonce " << nonce_);
     r = nonce_;
     return 0;
 }
 
-static sockaddr_in make_sockaddr(const string &hostandport) {
+static sockaddr_in make_sockaddr(const string & hostandport) {
     string host = "127.0.0.1";
     string port = hostandport;
     auto colon = hostandport.find(':');
index 3ae7737..211c717 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -52,29 +52,30 @@ class rpcc : private connection_delegate {
             cond c;
         };
 
-        void get_refconn(shared_ptr<connection> & ch);
-        void update_xid_rep(xid_t xid);
+        void get_latest_connection(shared_ptr<connection> & ch);
+        void update_xid_rep(xid_t xid, lock & m_lock);
 
 
         sockaddr_in dst_;
         nonce_t clt_nonce_;
-        nonce_t srv_nonce_;
-        bool bind_done_;
-        xid_t xid_;
-        int lossytest_;
-        bool retrans_;
-        bool reachable_;
+        nonce_t srv_nonce_ = 0;
+        bool bind_done_ = false;
+        int lossytest_ = 0;
+        bool reachable_ = true;
 
         shared_ptr<connection> chan_;
 
         mutex m_; // protect insert/delete to calls[]
         mutex chan_m_;
 
-        bool destroy_wait_;
+        bool destroy_wait_ = false;
         cond destroy_wait_c_;
 
         map<int, caller *> calls_;
-        list<xid_t> xid_rep_window_;
+
+        // xid starts with 1 and latest received reply starts with 0
+        xid_t xid_ = 1;
+        list<xid_t> xid_rep_window_ = {0};
 
         struct request {
             void clear() { buf.clear(); xid = -1; }
@@ -83,22 +84,21 @@ class rpcc : private connection_delegate {
             xid_t xid = -1;
         };
         request dup_req_;
-        int xid_rep_done_;
+        int xid_rep_done_ = -1;
 
-        int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to);
+        int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req);
 
         template<class R>
-        int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) {
+        inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
             string rep;
-            int intret = call1(proc, req, rep, to);
-            unmarshall u(rep, true);
+            int intret = call1(proc, to, rep, req);
             if (intret < 0) return intret;
-            u >> r;
+            unmarshall u(rep, true, r);
             if (u.okdone() != true) {
                 LOG("rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
                     "calling RPC 0x" << hex << proc << " with the wrong return type.");
                 VERIFY(0);
-                return rpc_protocol::unmarshal_reply_failure;
+                return rpc_protocol::unmarshall_reply_failure;
             }
             return intret;
         }
@@ -107,7 +107,7 @@ class rpcc : private connection_delegate {
 
     public:
 
-        rpcc(const string & d, bool retrans=true);
+        rpcc(const string & d);
         ~rpcc();
 
         nonce_t id() { return clt_nonce_; }
@@ -119,15 +119,14 @@ class rpcc : private connection_delegate {
         void cancel();
 
         template<class P, class R, typename ...Args>
-        inline int call(proc_t<P> proc, R & r, const Args&... args) {
+        inline int call(proc_t<P> proc, R & r, const Args & ... args) {
             return call_timeout(proc, rpc::to_max, r, args...);
         }
 
         template<class P, class R, typename ...Args>
-        inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args&... args) {
+        inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
             static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
-            marshall m{args...};
-            return call_m(proc.id, m, r, to);
+            return call_m(proc.id, to, r, forward<marshall>(marshall(args...)));
         }
 };
 
@@ -167,16 +166,15 @@ class rpcs : private connection_delegate {
         // indexed by client nonce.
         map<nonce_t, list<reply_t>> reply_window_;
 
-        void free_reply_window(void);
         void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
 
-        rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+        rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
                 xid_t rep_xid, string & b);
 
         // latest connection to the client
         map<nonce_t, shared_ptr<connection>> conns_;
 
-        bool reachable_;
+        bool reachable_ = true;
 
         // map proc # to function
         map<proc_id_t, handler *> procs_;
@@ -187,14 +185,11 @@ class rpcs : private connection_delegate {
 
         void dispatch(shared_ptr<connection> c, const string & buf);
 
-        // internal handler registration
-        void reg1(proc_id_t proc, handler *);
-
-        unique_ptr<thread_pool> dispatchpool_;
+        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
         unique_ptr<connection_listener> listener_;
 
         // RPC handler for clients binding
-        rpc_protocol::status rpcbind(nonce_t &r);
+        rpc_protocol::status rpcbind(nonce_t & r);
 
         bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
@@ -209,10 +204,13 @@ class rpcs : private connection_delegate {
             static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
             struct ReturnOnFailure {
                 static inline int unmarshall_args_failure() {
-                    return rpc_protocol::unmarshal_args_failure;
+                    return rpc_protocol::unmarshall_args_failure;
                 }
             };
-            reg1(proc.id, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
+            lock pl(procs_m_);
+            VERIFY(procs_.count(proc.id) == 0);
+            procs_[proc.id] = marshalled_func<F, ReturnOnFailure>::wrap(f, c);
+            VERIFY(procs_.count(proc.id) >= 1);
         }
 
         void start();
index 4a3ff32..65b7523 100644 (file)
@@ -13,8 +13,8 @@ namespace rpc_protocol {
 
     enum : status {
         timeout_failure = -1,
-        unmarshal_args_failure = -2,
-        unmarshal_reply_failure = -3,
+        unmarshall_args_failure = -2,
+        unmarshall_reply_failure = -3,
         atmostonce_failure = -4,
         oldsrv_failure = -5,
         bind_failure = -6,
index 4dd2af2..d4f53f4 100644 (file)
@@ -23,9 +23,9 @@ static in_port_t port;
 class srv {
     public:
         int handle_22(string & r, const string a, const string b);
-        int handle_fast(int &r, const int a);
-        int handle_slow(int &r, const int a);
-        int handle_bigrep(string &r, const size_t a);
+        int handle_fast(int & r, const int a);
+        int handle_slow(int & r, const int a);
+        int handle_bigrep(string & r, const size_t a);
 };
 
 namespace srv_protocol {
@@ -44,23 +44,23 @@ namespace srv_protocol {
 // rpcs::reg() decides how to unmarshall by looking
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
-int srv::handle_22(string &r, const string a, string b) {
+int srv::handle_22(string & r, const string a, string b) {
     r = a + b;
     return 0;
 }
 
-int srv::handle_fast(int &r, const int a) {
+int srv::handle_fast(int & r, const int a) {
     r = a + 1;
     return 0;
 }
 
-int srv::handle_slow(int &r, const int a) {
+int srv::handle_slow(int & r, const int a) {
     usleep(random() % 500);
     r = a + 2;
     return 0;
 }
 
-int srv::handle_bigrep(string &r, const size_t len) {
+int srv::handle_bigrep(string & r, const size_t len) {
     r = string(len, 'x');
     return 0;
 }
index 4988dab..fc7be3d 100644 (file)
@@ -18,7 +18,7 @@ thread_pool::~thread_pool() {
         th_[i].join();
 }
 
-bool thread_pool::addJob(const job_t &j) {
+bool thread_pool::addJob(const job_t & j) {
     return jobq_.enq(j,blockadd_);
 }
 
index 28c5236..df11f20 100644 (file)
@@ -11,7 +11,7 @@ class thread_pool {
         thread_pool(size_t sz, bool blocking=true);
         ~thread_pool();
 
-        bool addJob(const job_t &j);
+        bool addJob(const job_t & j);
 
     private:
         size_t nthreads_;
diff --git a/rsm.cc b/rsm.cc
index 956f45d..c766145 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -83,9 +83,7 @@
 #include "rsm_client.h"
 #include <unistd.h>
 
-rsm::rsm(const string & _first, const string & _me) :
-    stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
-    partitioned (false), dopartition(false), break1(false), break2(false)
+rsm::rsm(const string & _first, const string & _me) : primary(_first)
 {
     cfg = unique_ptr<config>(new config(_first, _me, this));
 
@@ -103,7 +101,7 @@ rsm::rsm(const string & _first, const string & _me) :
     rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
-    testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
+    testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
     testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
     testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 }
@@ -115,11 +113,6 @@ void rsm::start() {
     thread(&rsm::recovery, this).detach();
 }
 
-void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
-    lock ml(rsm_mutex);
-    procs[proc] = h;
-}
-
 // The recovery thread runs this function
 void rsm::recovery() {
     bool r = true;
@@ -279,7 +272,7 @@ void rsm::commit_change(unsigned vid) {
     lock ml(rsm_mutex);
     commit_change(vid, ml);
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 void rsm::commit_change(unsigned vid, lock &) {
@@ -293,7 +286,7 @@ void rsm::commit_change(unsigned vid, lock &) {
     recovery_cond.notify_one();
     sync_cond.notify_one();
     if (cfg->ismember(cfg->myaddr(), vid_commit))
-        breakpoint2();
+        breakpoint(2);
 }
 
 
@@ -301,10 +294,9 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r
     LOG("execute");
     handler *h = procs[procno];
     VERIFY(h);
-    unmarshall args(req, false);
     marshall rep;
-    auto ret = (rsm_protocol::status)(*h)(args, rep);
-    r = marshall{ret, rep.content()}.content();
+    auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
+    r = marshall(ret, rep.content()).content();
 }
 
 //
@@ -350,7 +342,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
-            breakpoint1();
+            breakpoint(1);
             lock rsm_mutex_lock(rsm_mutex);
             partition1(rsm_mutex_lock);
         }
@@ -398,14 +390,14 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp
     string r;
     execute(proc, req, r);
     last_myvs = vs;
-    breakpoint1();
+    breakpoint(1);
     return rsm_protocol::OK;
 }
 
 //
 // RPC handler: Send back the local node's state to the caller
 //
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
         viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
@@ -473,7 +465,7 @@ rsm_protocol::status rsm::joinreq(string & log, const string & m, viewstamp last
 // RPC handler: Responds with the list of known nodes for fall-back on a
 // primary failure
 //
-rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
     vector<string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
@@ -516,7 +508,8 @@ bool rsm::amiprimary() {
 
 // Test RPCs -- simulate partitions and failures
 
-void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
+void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
+    VERIFY(rsm_mutex_lock);
     vector<string> m;
     cfg->get_view(vid_commit, m);
     for (unsigned i  = 0; i < m.size(); i++) {
@@ -529,7 +522,7 @@ void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
     rsmrpc->set_reachable(heal);
 }
 
-rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
     lock ml(rsm_mutex);
     LOG("heal " << heal << " (dopartition " <<
             dopartition << ", partitioned " << partitioned << ")");
@@ -543,16 +536,9 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r,
 
 // simulate failure at breakpoint 1 and 2
 
-void rsm::breakpoint1() {
-    if (break1) {
-        LOG("Dying at breakpoint 1 in rsm!");
-        exit(1);
-    }
-}
-
-void rsm::breakpoint2() {
-    if (break2) {
-        LOG("Dying at breakpoint 2 in rsm!");
+void rsm::breakpoint(int b) {
+    if (breakpoints[b-1]) {
+        LOG("Dying at breakpoint " << b << " in rsm!");
         exit(1);
     }
 }
@@ -565,12 +551,12 @@ void rsm::partition1(lock & rsm_mutex_lock) {
     }
 }
 
-rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
     LOG("breakpoint " << b);
-    if (b == 1) break1 = true;
-    else if (b == 2) break2 = true;
+    if (b == 1) breakpoints[1-1] = true;
+    else if (b == 2) breakpoints[2-1] = true;
     else if (b == 3 || b == 4) cfg->breakpoint(b);
     else r = rsm_test_protocol::ERR;
     return r;
diff --git a/rsm.h b/rsm.h
index 14dc011..ca03473 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -17,8 +17,6 @@ class rsm_state_transfer {
 };
 
 class rsm : public config_view_change {
-    private:
-        void reg1(rpc_protocol::proc_id_t proc, handler *);
     protected:
         map<rpc_protocol::proc_id_t, handler *> procs;
         unique_ptr<config> cfg;
@@ -29,27 +27,26 @@ class rsm : public config_view_change {
         viewstamp last_myvs{0, 0};   // Viewstamp of the last executed request
         viewstamp myvs{0, 1};
         string primary;
-        bool insync;
-        bool inviewchange;
-        unsigned vid_commit;  // Latest view id that is known to rsm layer
+        bool insync = false;
+        bool inviewchange = true;
+        unsigned vid_commit = 0;  // Latest view id that is known to rsm layer
         unsigned vid_insync;  // The view id that this node is synchronizing for
         vector<string> backups;   // A list of unsynchronized backups
 
         // For testing purposes
         unique_ptr<rpcs> testsvr;
-        bool partitioned;
-        bool dopartition;
-        bool break1;
-        bool break2;
+        bool partitioned = false;
+        bool dopartition = false;
+        bool breakpoints[2] = {};
 
-        rsm_client_protocol::status client_members(vector<string> &r, int i);
+        rsm_client_protocol::status client_members(vector<string> & r, int i);
         rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
-        rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
+        rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src,
                 viewstamp last, unsigned vid);
         rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
         rsm_protocol::status joinreq(string & log, const string & src, viewstamp last);
-        rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
-        rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
+        rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal);
+        rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b);
 
         mutex rsm_mutex, invoke_mutex;
         cond recovery_cond, sync_cond;
@@ -63,24 +60,24 @@ class rsm : public config_view_change {
         bool sync_with_backups(lock & rsm_mutex_lock);
         bool sync_with_primary(lock & rsm_mutex_lock);
         void net_repair(bool heal, lock & rsm_mutex_lock);
-        void breakpoint1();
-        void breakpoint2();
+        void breakpoint(int b);
         void partition1(lock & rsm_mutex_lock);
         void commit_change(unsigned vid, lock & rsm_mutex_lock);
+        void recovery NORETURN ();
     public:
         rsm (const string & _first, const string & _me);
 
         bool amiprimary();
         void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
-        void recovery NORETURN ();
         void commit_change(unsigned vid);
 
         template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
             static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
-            reg1(proc.id, marshalled_func<F>::wrap(f, c));
+            lock ml(rsm_mutex);
+            procs[proc.id] = marshalled_func<F>::wrap(f, c);
         }
 
         void start();
 };
 
-#endif /* rsm_h */
+#endif
index 9e86915..598a1ed 100644 (file)
@@ -15,7 +15,7 @@ void rsm_client::primary_failure(lock &) {
     known_mems.pop_back();
 }
 
-rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
+rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
     lock ml(rsm_client_mutex);
     while (1) {
         LOG("proc " << hex << proc << " primary " << primary);
index be66fec..06ca5a6 100644 (file)
@@ -19,7 +19,7 @@ class rsm_client {
         mutex rsm_client_mutex;
         void primary_failure(lock & rsm_client_mutex_lock);
         bool init_members(lock & rsm_client_mutex_lock);
-        rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
+        rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req);
         template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
     public:
         rsm_client(string dst);
@@ -27,7 +27,7 @@ class rsm_client {
         template<class P, class R, class ...Args>
         int call(rpc_protocol::proc_t<P> proc, R & r, const Args & ...a1) {
             static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
-            return call_m(proc.id, r, marshall{a1...});
+            return call_m(proc.id, r, marshall(a1...));
         }
 };
 
@@ -43,12 +43,11 @@ inline string hexify(const string & s) {
 template<class R>
 int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
     string rep;
-    string res;
     int intret = invoke(proc, rep, req.content());
     VERIFY( intret == rsm_client_protocol::OK );
-    unmarshall u(rep, false);
-    u >> intret;
+    unmarshall u(rep, false, intret);
     if (intret < 0) return intret;
+    string res;
     u >> res;
     if (!u.okdone()) {
         LOG("failed to unmarshall the reply.");
@@ -57,17 +56,15 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
             "0x" << hex << proc << " with the wrong return type");
         LOG("here's what I got: \"" << hexify(rep) << "\"");
         VERIFY(0);
-        return rpc_protocol::unmarshal_reply_failure;
+        return rpc_protocol::unmarshall_reply_failure;
     }
-    unmarshall u1(res, false);
-    u1 >> r;
-    if(!u1.okdone()) {
+    if(!unmarshall(res, false, r).okdone()) {
         LOG("failed to unmarshall the reply.");
         LOG("You are probably calling RPC 0x" << hex << proc <<
             " with the wrong return type.");
         LOG("here's what I got: \"" << hexify(res) << "\"");
         VERIFY(0);
-        return rpc_protocol::unmarshal_reply_failure;
+        return rpc_protocol::unmarshall_reply_failure;
     }
     return intret;
 }
index 6630a86..903b0fa 100644 (file)
@@ -13,14 +13,14 @@ extern char log_thread_prefix;
 namespace std {
     // Sticking this in std:: makes it possible for ostream_iterator to use it.
     template <class A, class B>
-    ostream & operator<<(ostream &o, const pair<A,B> &d) {
+    ostream & operator<<(ostream & o, const pair<A,B> & d) {
         return o << "<" << d.first << "," << d.second << ">";
     }
 }
 
 template <class A>
 typename enable_if<is_const_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
-operator<<(ostream &o, const A &a) {
+operator<<(ostream & o, const A & a) {
     return o << "[" << implode(a, ", ") << "]";
 }
 
diff --git a/types.h b/types.h
index ede859f..888cd68 100644 (file)
--- a/types.h
+++ b/types.h
@@ -122,13 +122,13 @@ using std::vector;
 template <class A, typename I=void> struct is_const_iterable : false_type {};
 
 template<class A> struct is_const_iterable<A,
-    decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
+    decltype(declval<A &>().cbegin(), declval<A &>().cend(), void())
 > : true_type {};
 
 template <class A, typename I=void> struct supports_emplace_back : false_type {};
 
 template<class A> struct supports_emplace_back<A,
-    decltype(declval<A&>().emplace_back(declval<typename A::value_type>()), void())
+    decltype(declval<A &>().emplace_back(declval<typename A::value_type>()), void())
 > : true_type {};
 
 template<typename E>
@@ -151,7 +151,7 @@ implode(const C & v, string delim=" ") {
     return oss.str();
 }
 
-inline vector<string> explode(const string &s, string delim=" ") {
+inline vector<string> explode(const string & s, string delim=" ") {
     vector<string> out;
     size_t start = 0, end = 0;
     while ((end = s.find(delim, start)) != string::npos) {
@@ -185,7 +185,7 @@ inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS
 // LEXICOGRAPHIC_COMPARISON(foo)
 
 #define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \
-inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); }
+inline bool operator _op_(const _c_ & b) const { return _tuple_() _op_ b._tuple_(); }
 
 #define LEXICOGRAPHIC_COMPARISON(_c_) \
 LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
@@ -212,7 +212,7 @@ template <size_t E, size_t S=0> struct make_tuple_indices {
 // Template parameter pack expansion is not allowed in certain contexts, but
 // brace initializers (for instance, calls to constructors of empty structs)
 // are fair game.  
-struct pass { template <typename... Args> inline pass(Args&&...) {} };
+struct pass { template <typename... Args> inline pass(Args && ...) {} };
 
 #include "endian.h"