More cleanups to marshalling logic.
authorPeter Iannucci <iannucci@mit.edu>
Fri, 20 Sep 2013 05:31:16 +0000 (01:31 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 01:46:13 +0000 (21:46 -0400)
lock.h
rpc/marshall.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rpc/thr_pool.cc
rsm.h
rsm_client.cc
rsm_client.h

diff --git a/lock.h b/lock.h
index b23f6cf..1789ec3 100644 (file)
--- a/lock.h
+++ b/lock.h
@@ -10,7 +10,7 @@ using cond = std::condition_variable;
 
 class adopt_lock : public lock {
 public:
-    inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
+    explicit inline adopt_lock(class mutex &m) : std::unique_lock<std::mutex>(m, std::adopt_lock) {
     }
     inline ~adopt_lock() {
         release();
index 448240b..bd45c02 100644 (file)
 #include <inttypes.h>
 #include "lang/verify.h"
 
-struct req_header {
-       req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0):
-               xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
-       int xid;
-       int proc;
-       unsigned int clt_nonce;
-       unsigned int srv_nonce;
-       int xid_rep;
+struct request_header {
+    request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) :
+        xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
+    int xid;
+    int proc;
+    unsigned int clt_nonce;
+    unsigned int srv_nonce;
+    int xid_rep;
+    request_header hton() const {
+        return {
+            htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep)
+        };
+    }
 };
 
 struct reply_header {
-       reply_header(int x=0, int r=0): xid(x), ret(r) {}
-       int xid;
-       int ret;
+    reply_header(int x=0, int r=0): xid(x), ret(r) {}
+    int xid;
+    int ret;
+    reply_header hton() const {
+        return {
+            htonl(xid), htonl(ret)
+        };
+    }
 };
 
 typedef int rpc_sz_t;
 
-//size of initial buffer allocation 
+//size of initial buffer allocation
 #define DEFAULT_RPC_SZ 1024
-#define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
-
-struct pass {
-    template<typename... Args> inline pass(Args&&...) {}
-};
+#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 
 class marshall {
-       private:
-               char *_buf;     // Base of the raw bytes buffer (dynamically readjusted)
-               int _capa;      // Capacity of the buffer
-               int _ind;       // Read/write head position
-
-       public:
-               marshall() {
-                       _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
-                       VERIFY(_buf);
-                       _capa = DEFAULT_RPC_SZ;
-                       _ind = RPC_HEADER_SZ;
-               }
-
-        template <typename... Args> marshall(const Args&... args) : marshall() {
+    private:
+        char *buf_;     // Base of the raw bytes buffer (dynamically readjusted)
+        size_t capacity_;  // Capacity of the buffer
+        size_t index_;     // Read/write head position
+
+        inline void reserve(size_t n) {
+            if((index_+n) > capacity_){
+                capacity_ += std::max(capacity_, n);
+                VERIFY (buf_ != NULL);
+                buf_ = (char *)realloc(buf_, capacity_);
+                VERIFY(buf_);
+            }
+        }
+    public:
+        struct pass { template <typename... Args> inline pass(Args&&...) {} };
+
+        template <typename... Args>
+
+        marshall(const Args&... args) {
+            buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
+            VERIFY(buf_);
+            capacity_ = DEFAULT_RPC_SZ;
+            index_ = RPC_HEADER_SZ;
             (void)pass{(*this << args)...};
         }
 
-               ~marshall() { 
-                       if (_buf) 
-                               free(_buf); 
-               }
-
-               int size() { return _ind;}
-               char *cstr() { return _buf;}
-
-               void rawbyte(unsigned char);
-               void rawbytes(const char *, int);
-
-               // Return the current content (excluding header) as a string
-               std::string get_content() { 
-                       return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ);
-               }
-
-               // Return the current content (excluding header) as a string
-               std::string str() {
-                       return get_content();
-               }
-
-               void pack(int i);
-
-               void pack_req_header(const req_header &h) {
-                       int saved_sz = _ind;
-                       //leave the first 4-byte empty for channel to fill size of pdu
-                       _ind = sizeof(rpc_sz_t); 
-                       pack(h.xid);
-                       pack(h.proc);
-                       pack((int)h.clt_nonce);
-                       pack((int)h.srv_nonce);
-                       pack(h.xid_rep);
-                       _ind = saved_sz;
-               }
-
-               void pack_reply_header(const reply_header &h) {
-                       int saved_sz = _ind;
-                       //leave the first 4-byte empty for channel to fill size of pdu
-                       _ind = sizeof(rpc_sz_t); 
-                       pack(h.xid);
-                       pack(h.ret);
-                       _ind = saved_sz;
-               }
-
-               void take_buf(char **b, int *s) {
-                       *b = _buf;
-                       *s = _ind;
-                       _buf = NULL;
-                       _ind = 0;
-                       return;
-               }
+        ~marshall() {
+            if (buf_)
+                free(buf_);
+        }
+
+        int size() { return index_;}
+        char *cstr() { return buf_;}
+        const char *cstr() const { return buf_;}
+
+        void rawbyte(unsigned char x) {
+            reserve(1);
+            buf_[index_++] = x;
+        }
+
+        void rawbytes(const char *p, int n) {
+            reserve(n);
+            memcpy(buf_+index_, p, n);
+            index_ += n;
+        }
+
+        // Return the current content (excluding header) as a string
+        std::string get_content() {
+            return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
+        }
+
+        // Return the current content (excluding header) as a string
+        std::string str() {
+            return get_content();
+        }
+
+        void pack_req_header(const request_header &h);
+        void pack_reply_header(const reply_header &h);
+
+        void take_buf(char **b, int *s) {
+            *b = buf_;
+            *s = index_;
+            buf_ = NULL;
+            index_ = 0;
+            return;
+        }
 };
 
 marshall& operator<<(marshall &, bool);
@@ -122,22 +126,22 @@ marshall& operator<<(marshall &, const std::string &);
 template <class C> marshall &
 operator<<(marshall &m, std::vector<C> v)
 {
-       m << (unsigned int) v.size();
-       for(unsigned i = 0; i < v.size(); i++)
-               m << v[i];
-       return m;
+    m << (unsigned int) v.size();
+    for(unsigned i = 0; i < v.size(); i++)
+        m << v[i];
+    return m;
 }
 
 template <class A, class B> marshall &
 operator<<(marshall &m, const std::map<A,B> &d) {
-       typename std::map<A,B>::const_iterator i;
+    typename std::map<A,B>::const_iterator i;
 
-       m << (unsigned int) d.size();
+    m << (unsigned int) d.size();
 
-       for (i = d.begin(); i != d.end(); i++) {
-               m << i->first << i->second;
-       }
-       return m;
+    for (i = d.begin(); i != d.end(); i++) {
+        m << i->first << i->second;
+    }
+    return m;
 }
 
 template <class A> marshall &
@@ -154,70 +158,73 @@ operator<<(marshall &m, const std::pair<A,B> &d) {
 }
 
 class unmarshall {
-       private:
-               char *_buf;
-               int _sz;
-               int _ind;
-               bool _ok;
-       public:
-               unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {}
-               unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {}
-               unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false) 
-               {
-                       //take the content which does not exclude a RPC header from a string
-                       take_content(s);
-               }
-               ~unmarshall() {
-                       if (_buf) free(_buf);
-               }
-
-               //take contents from another unmarshall object
-               void take_in(unmarshall &another);
-
-               //take the content which does not exclude a RPC header from a string
-               void take_content(const std::string &s) {
-                       _sz = s.size()+RPC_HEADER_SZ;
-                       _buf = (char *)realloc(_buf,_sz);
-                       VERIFY(_buf);
-                       _ind = RPC_HEADER_SZ;
-                       memcpy(_buf+_ind, s.data(), s.size());
-                       _ok = true;
-               }
-
-               bool ok() { return _ok; }
-               char *cstr() { return _buf;}
-               bool okdone();
-               unsigned int rawbyte();
-               void rawbytes(std::string &s, unsigned int n);
-
-               int ind() { return _ind;}
-               int size() { return _sz;}
-               void unpack(int *); //non-const ref
-               void take_buf(char **b, int *sz) {
-                       *b = _buf;
-                       *sz = _sz;
-                       _sz = _ind = 0;
-                       _buf = NULL;
-               }
-
-               void unpack_req_header(req_header *h) {
-                       //the first 4-byte is for channel to fill size of pdu
-                       _ind = sizeof(rpc_sz_t); 
-                       unpack(&h->xid);
-                       unpack(&h->proc);
-                       unpack((int *)&h->clt_nonce);
-                       unpack((int *)&h->srv_nonce);
-                       unpack(&h->xid_rep);
-                       _ind = RPC_HEADER_SZ;
-               }
-
-               void unpack_reply_header(reply_header *h) {
-                       //the first 4-byte is for channel to fill size of pdu
-                       _ind = sizeof(rpc_sz_t); 
-                       unpack(&h->xid);
-                       unpack(&h->ret);
-                       _ind = RPC_HEADER_SZ;
-               }
+    private:
+        char *buf_;
+        int sz_;
+        int index_;
+        bool ok_;
+
+        inline bool ensure(size_t n);
+    public:
+        unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
+        unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {}
+        unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
+        {
+            //take the content which does not exclude a RPC header from a string
+            take_content(s);
+        }
+        ~unmarshall() {
+            if (buf_) free(buf_);
+        }
+
+        //take contents from another unmarshall object
+        void take_in(unmarshall &another);
+
+        //take the content which does not exclude a RPC header from a string
+        void take_content(const std::string &s) {
+            sz_ = s.size()+RPC_HEADER_SZ;
+            buf_ = (char *)realloc(buf_,sz_);
+            VERIFY(buf_);
+            index_ = RPC_HEADER_SZ;
+            memcpy(buf_+index_, s.data(), s.size());
+            ok_ = true;
+        }
+
+        bool ok() const { return ok_; }
+        char *cstr() { return buf_;}
+        bool okdone() const { return ok_ && index_ == sz_; }
+
+        unsigned int rawbyte();
+        void rawbytes(std::string &s, size_t n);
+
+        int ind() { return index_;}
+        int size() { return sz_;}
+        void unpack(int *); //non-const ref
+        void take_buf(char **b, int *sz) {
+            *b = buf_;
+            *sz = sz_;
+            sz_ = index_ = 0;
+            buf_ = NULL;
+        }
+
+        void unpack_req_header(request_header *h) {
+            //the first 4-byte is for channel to fill size of pdu
+            index_ = sizeof(rpc_sz_t);
+            unpack(&h->xid);
+            unpack(&h->proc);
+            unpack((int *)&h->clt_nonce);
+            unpack((int *)&h->srv_nonce);
+            unpack(&h->xid_rep);
+            index_ = RPC_HEADER_SZ;
+        }
+
+        void unpack_reply_header(reply_header *h) {
+            //the first 4-byte is for channel to fill size of pdu
+            index_ = sizeof(rpc_sz_t);
+            unpack(&h->xid);
+            unpack(&h->ret);
+            index_ = RPC_HEADER_SZ;
+        }
 
         template <class A>
         inline A grab() {
@@ -240,8 +247,8 @@ unmarshall& operator>>(unmarshall &, std::string &);
 template <class C> unmarshall &
 operator>>(unmarshall &u, std::vector<C> &v)
 {
-       unsigned n;
-       u >> n;
+    unsigned n;
+    u >> n;
     v.clear();
     while (n--) {
         C c;
@@ -253,16 +260,16 @@ operator>>(unmarshall &u, std::vector<C> &v)
 
 template <class A, class B> unmarshall &
 operator>>(unmarshall &u, std::map<A,B> &d) {
-       unsigned n;
-       u >> n;
-       d.clear();
+    unsigned n;
+    u >> n;
+    d.clear();
     while (n--) {
         A a;
         B b;
         u >> a >> b;
         d[a] = b;
     }
-       return u;
+    return u;
 }
 
 template <class C> unmarshall &
@@ -283,6 +290,32 @@ operator>>(unmarshall &u, std::pair<A,B> &d) {
     return u >> d.first >> d.second;
 }
 
+typedef std::function<int(unmarshall &, marshall &)> handler;
+
+//
+// Automatic marshalling wrappers for RPC handlers
+//
+
+// PAI 2013/09/19
+// C++11 does neither of these two things for us:
+// 1) Declare variables using a parameter pack expansion, like so
+//      Args ...args;
+// 2) Call a function with a std::tuple of the arguments it expects
+//
+// We implement an 'invoke' function for functions of the RPC handler
+// signature, i.e. int(R & r, const Args...)
+//
+// One thing we need in order to accomplish this is a way to cause the compiler
+// to specialize 'invoke' with a parameter pack containing a list of indices
+// for the elements of the tuple.  This will allow us to call the underlying
+// function with the exploded contents of the tuple.  The empty type
+// tuple_indices<size_t...> accomplishes this.  It will be passed in to
+// 'invoke' as a parameter which will be ignored, but its type will force the
+// compiler to specialize 'invoke' appropriately.
+
+// The following implementation of tuple_indices is redistributed under the MIT
+// License as an insubstantial portion of the LLVM compiler infrastructure.
+
 template <size_t...> struct tuple_indices {};
 template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
 template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
@@ -291,10 +324,14 @@ template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tupl
 template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
     typedef tuple_indices<Indices...> type;
 };
-template <size_t E, size_t S = 0> struct make_tuple_indices {
+template <size_t E, size_t S=0> struct make_tuple_indices {
     typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
 };
 
+// This class encapsulates the default response to runtime unmarshalling
+// failures.  The templated wrappers below may optionally use a different
+// class.
+
 struct VerifyOnFailure {
     static inline int unmarshall_args_failure() {
         VERIFY(0);
@@ -302,70 +339,91 @@ struct VerifyOnFailure {
     }
 };
 
-typedef std::function<int(unmarshall &, marshall &)> handler;
-
-using std::move;
-using std::get;
-using std::tuple;
-using std::decay;
+// Here's the implementation of 'invoke'.  It could be more general, but this
+// meets our needs.
 
-#include <iostream>
+// One for function pointers...
 
 template <class F, class R, class args_type, size_t ...Indices>
 typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
 invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
-    return f(r, move(get<Indices>(t))...);
+    return f(r, std::move(std::get<Indices>(t))...);
 }
 
+// And one for pointers to member functions...
+
 template <class F, class C, class R, class args_type, size_t ...Indices>
 typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
 invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
-    return (c->*f)(r, move(get<Indices>(t))...);
+    return (c->*f)(r, std::move(std::get<Indices>(t))...);
 }
 
-template <class Functor,
-          class Instance,
-          class Signature,
+// The class marshalled_func_imp uses partial template specialization to
+// implement the ::wrap static function.  ::wrap takes a function pointer or a
+// pointer to a member function and returns a handler * object which
+// unmarshalls arguments, verifies successful unmarshalling, calls the supplied
+// function, and marshalls the response.
+
+template <class Functor, class Instance, class Signature,
           class ErrorHandler=VerifyOnFailure> struct marshalled_func_imp;
 
+// Here we specialize on the Signature template parameter to obtain the list of
+// argument types.  Note that we do not assume that the Functor parameter has
+// the same pattern as Signature; this allows us to ignore the distinctions
+// between various types of callable objects at this level of abstraction.
+
 template <class F, class C, class ErrorHandler, class R, class... Args>
 struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
-    using result_type = R;
-    using args_type = tuple<typename decay<Args>::type...>;
-    using index_type = typename make_tuple_indices<sizeof...(Args)>::type;
-
-    static inline int call(F f, C *c, unmarshall &u, marshall &m) {
-        args_type t{std::move(std::tuple<Args...>{u.grab<Args>()...})};
-        if (!u.okdone())
-            return ErrorHandler::unmarshall_args_failure();
-        R r;
-        int b = invoke(f, c, r, t, index_type());
-        m << r;
-        return b;
-    }
-
     static inline handler *wrap(F f, C *c=nullptr) {
-        typename decay<F>::type f_ = f;
-        return new handler([f_, c](unmarshall &u, marshall &m) -> int {
-            return call(f_, c, u, m);
+        // This type definition corresponds to an empty struct with
+        // template parameters running from 0 up to (# args) - 1.
+        using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+        // This type definition represents storage for f's unmarshalled
+        // arguments.  std::decay is (most notably) stripping off const
+        // qualifiers.
+        using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
+        // Allocate a handler (i.e. std::function) to hold the lambda
+        // which will unmarshall RPCs and call f.
+        return new handler([=](unmarshall &u, marshall &m) -> int {
+            // Unmarshall each argument with the correct type and store the
+            // result in a tuple.
+            ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
+            // Verify successful unmarshalling of the entire input stream.
+            if (!u.okdone())
+                return ErrorHandler::unmarshall_args_failure();
+            // Allocate space for the RPC response -- will be passed into the
+            // function as an lvalue reference.
+            R r;
+            // Perform the invocation.  Note that Indices() calls the default
+            // constructor of the empty struct with the special template
+            // parameters.
+            int b = invoke(f, c, r, t, Indices());
+            // Marshall the response.
+            m << r;
+            // Make like a tree.
+            return b;
         });
     }
 };
 
-template <class Functor,
-          class Signature,
-          class ErrorHandler=VerifyOnFailure> struct marshalled_func;
+// More partial template specialization shenanigans to reduce the number of
+// parameters which must be provided explicitly and to support a few common
+// callable types.  C++11 doesn't allow partial function template
+// specialization, so we use classes (structs).
+
+template <class Functor, class ErrorHandler=VerifyOnFailure,
+    class Signature=Functor> struct marshalled_func;
 
 template <class F, class ErrorHandler, class... Args>
-struct marshalled_func<F, int(*)(Args...), ErrorHandler> :
+struct marshalled_func<F, ErrorHandler, int(*)(Args...)> :
     public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
 
 template <class F, class ErrorHandler, class C, class... Args>
-struct marshalled_func<F, int(C::*)(Args...), ErrorHandler> :
+struct marshalled_func<F, ErrorHandler, int(C::*)(Args...)> :
     public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
 
 template <class F, class ErrorHandler, class Signature>
-struct marshalled_func<F, std::function<Signature>, ErrorHandler> :
+struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
     public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
 
 #endif
index a3ffef1..9d9f1e2 100644 (file)
@@ -189,10 +189,8 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
         ca.xid = xid_++;
         calls_[ca.xid] = &ca;
 
-        req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
-                             xid_rep_window_.front());
-        req.pack_req_header(h);
-                xid_rep = xid_rep_window_.front();
+        req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+        xid_rep = xid_rep_window_.front();
     }
 
     TO curr_to;
@@ -476,7 +474,7 @@ rpcs::dispatch(djob_t *j)
     unmarshall req(j->buf, j->sz);
     delete j;
 
-    req_header h;
+    request_header h;
     req.unpack_req_header(&h);
     int proc = h.proc;
 
@@ -731,109 +729,53 @@ rpcs::rpcbind(int &r, int a)
     return 0;
 }
 
-void
-marshall::rawbyte(unsigned char x)
-{
-    if(_ind >= _capa){
-        _capa *= 2;
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    _buf[_ind++] = x;
-}
-
-void
-marshall::rawbytes(const char *p, int n)
-{
-    if((_ind+n) > _capa){
-        _capa = _capa > n? 2*_capa:(_capa+n);
-        VERIFY (_buf != NULL);
-        _buf = (char *)realloc(_buf, _capa);
-        VERIFY(_buf);
-    }
-    memcpy(_buf+_ind, p, n);
-    _ind += n;
-}
-
-marshall &
-operator<<(marshall &m, bool x)
-{
-    m.rawbyte(x);
-    return m;
-}
-
 marshall &
-operator<<(marshall &m, unsigned char x)
-{
+operator<<(marshall &m, uint8_t x) {
     m.rawbyte(x);
     return m;
 }
 
 marshall &
-operator<<(marshall &m, char x)
-{
-    m << (unsigned char) x;
-    return m;
-}
-
-
-marshall &
-operator<<(marshall &m, unsigned short x)
-{
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
-    return m;
-}
-
-marshall &
-operator<<(marshall &m, short x)
-{
-    m << (unsigned short) x;
+operator<<(marshall &m, uint16_t x) {
+    x = htons(x);
+    m.rawbytes((char *)&x, 2);
     return m;
 }
 
 marshall &
-operator<<(marshall &m, unsigned int x)
-{
-    // network order is big-endian
-    m.rawbyte((x >> 24) & 0xff);
-    m.rawbyte((x >> 16) & 0xff);
-    m.rawbyte((x >> 8) & 0xff);
-    m.rawbyte(x & 0xff);
+operator<<(marshall &m, uint32_t x) {
+    x = htonl(x);
+    m.rawbytes((char *)&x, 4);
     return m;
 }
 
-marshall &
-operator<<(marshall &m, int x)
-{
-    m << (unsigned int) x;
-    return m;
-}
+marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
+marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
+marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
+marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
 
 marshall &
-operator<<(marshall &m, const std::string &s)
-{
+operator<<(marshall &m, const std::string &s) {
     m << (unsigned int) s.size();
     m.rawbytes(s.data(), s.size());
     return m;
 }
 
-marshall &
-operator<<(marshall &m, unsigned long long x)
-{
-    m << (unsigned int) (x >> 32);
-    m << (unsigned int) x;
-    return m;
+void marshall::pack_req_header(const request_header &h) {
+    int saved_sz = index_;
+    //leave the first 4-byte empty for channel to fill size of pdu
+    index_ = sizeof(rpc_sz_t);
+    *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
+    index_ = saved_sz;
 }
 
-void
-marshall::pack(int x)
-{
-    rawbyte((x >> 24) & 0xff);
-    rawbyte((x >> 16) & 0xff);
-    rawbyte((x >> 8) & 0xff);
-    rawbyte(x & 0xff);
+void marshall::pack_reply_header(const reply_header &h) {
+    int saved_sz = index_;
+    //leave the first 4-byte empty for channel to fill size of pdu
+    index_ = sizeof(rpc_sz_t);
+    *this << h.xid << h.ret;
+    index_ = saved_sz;
 }
 
 void
@@ -849,56 +791,57 @@ unmarshall::unpack(int *x)
 void
 unmarshall::take_in(unmarshall &another)
 {
-    if(_buf)
-        free(_buf);
-    another.take_buf(&_buf, &_sz);
-    _ind = RPC_HEADER_SZ;
-    _ok = _sz >= RPC_HEADER_SZ?true:false;
+    if(buf_)
+        free(buf_);
+    another.take_buf(&buf_, &sz_);
+    index_ = RPC_HEADER_SZ;
+    ok_ = sz_ >= RPC_HEADER_SZ?true:false;
 }
 
-bool
-unmarshall::okdone()
-{
-    if(ok() && _ind == _sz){
-        return true;
-    } else {
-        return false;
-    }
+inline bool
+unmarshall::ensure(size_t n) {
+    if (index_+n > sz_)
+        ok_ = false;
+    return ok_;
 }
 
 unsigned int
 unmarshall::rawbyte()
 {
-    char c = 0;
-    if(_ind >= _sz)
-        _ok = false;
-    else
-        c = _buf[_ind++];
-    return c;
+    if (!ensure(1))
+        return 0;
+    return buf_[index_++];
+}
+
+void
+unmarshall::rawbytes(std::string &ss, size_t n)
+{
+    VERIFY(ensure(n));
+    ss.assign(buf_+index_, n);
+    index_ += n;
 }
 
 unmarshall &
 operator>>(unmarshall &u, bool &x)
 {
-    x = (bool) u.rawbyte() ;
+    x = (bool)u.rawbyte();
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, unsigned char &x)
 {
-    x = (unsigned char) u.rawbyte() ;
+    x = (unsigned char)u.rawbyte();
     return u;
 }
 
 unmarshall &
 operator>>(unmarshall &u, char &x)
 {
-    x = (char) u.rawbyte();
+    x = (char)u.rawbyte();
     return u;
 }
 
-
 unmarshall &
 operator>>(unmarshall &u, unsigned short &x)
 {
@@ -955,19 +898,6 @@ operator>>(unmarshall &u, std::string &s)
     return u;
 }
 
-void
-unmarshall::rawbytes(std::string &ss, unsigned int n)
-{
-    if((_ind+n) > (unsigned)_sz){
-        _ok = false;
-    } else {
-        std::string tmps = std::string(_buf+_ind, n);
-        swap(ss, tmps);
-        VERIFY(ss.size() == n);
-        _ind += n;
-    }
-}
-
 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
index 245f277..20a0aa9 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -27,12 +27,6 @@ class rpc_const {
         static const int cancel_failure = -7;
 };
 
-struct ReturnOnFailure {
-    static inline int unmarshall_args_failure() {
-        return rpc_const::unmarshal_args_failure;
-    }
-};
-
 // rpc client endpoint.
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
@@ -76,16 +70,16 @@ class rpcc : public chanmgr {
 
         std::map<int, caller *> calls_;
         std::list<unsigned int> xid_rep_window_;
-                
-                struct request {
-                    request() { clear(); }
-                    void clear() { buf.clear(); xid = -1; }
-                    bool isvalid() { return xid != -1; }
-                    std::string buf;
-                    int xid;
-                };
-                struct request dup_req_;
-                int xid_rep_done_;
+
+        struct request {
+            request() { clear(); }
+            void clear() { buf.clear(); xid = -1; }
+            bool isvalid() { return xid != -1; }
+            std::string buf;
+            int xid;
+        };
+        struct request dup_req_;
+        int xid_rep_done_;
     public:
 
         rpcc(sockaddr_in d, bool retrans=true);
@@ -258,8 +252,14 @@ class rpcs : public chanmgr {
     template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
 };
 
+struct ReturnOnFailure {
+    static inline int unmarshall_args_failure() {
+        return rpc_const::unmarshal_args_failure;
+    }
+};
+
 template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
-    reg1(proc, marshalled_func<F, F, ReturnOnFailure>::wrap(f, c));
+    reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
 }
 
 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
index 8aed748..c381745 100644 (file)
@@ -81,7 +81,7 @@ void
 testmarshall()
 {
        marshall m;
-       req_header rh(1,2,3,4,5);
+       request_header rh{1,2,3,4,5};
        m.pack_req_header(rh);
        VERIFY(m.size()==RPC_HEADER_SZ);
        int i = 12345;
@@ -97,7 +97,7 @@ testmarshall()
        VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
 
        unmarshall un(b,sz);
-       req_header rh1;
+       request_header rh1;
        un.unpack_req_header(&rh1);
        VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
        int i1;
@@ -203,7 +203,7 @@ simple_tests(rpcc *c)
        int xx = 0;
        intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
        VERIFY(intret == 0 && xx == 78);
-       printf("   -- no suprious timeout .. ok\n");
+       printf("   -- no spurious timeout .. ok\n");
 
        // specify a timeout value to an RPC that should succeed (tcp)
        {
@@ -211,7 +211,7 @@ simple_tests(rpcc *c)
                std::string rep;
                c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x");
                VERIFY(rep.size() == 1001);
-               printf("   -- no suprious timeout .. ok\n");
+               printf("   -- no spurious timeout .. ok\n");
        }
 
        // huge RPC
index 146764f..73f94f4 100644 (file)
@@ -9,7 +9,7 @@ ThrPool::ThrPool(int sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
 {
        for (int i=0; i<nthreads_; i++)
-        th_.push_back(std::thread(&ThrPool::do_worker, this));
+        th_.emplace_back(&ThrPool::do_worker, this);
 }
 
 // IMPORTANT: this function can be called only when no external thread 
diff --git a/rsm.h b/rsm.h
index bf37a5d..517316a 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -81,7 +81,7 @@ class rsm : public config_view_change {
 };
 
 template<class F, class C> void rsm::reg(int proc, F f, C *c) {
-    reg1(proc, marshalled_func<F, F>::wrap(f, c));
+    reg1(proc, marshalled_func<F>::wrap(f, c));
 }
 
 #endif /* rsm_h */
index 7062cea..2e6d33d 100644 (file)
@@ -28,7 +28,7 @@ void rsm_client::primary_failure() {
     known_mems.pop_back();
 }
 
-rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
+rsm_protocol::status rsm_client::invoke(int proc, std::string &rep, const std::string &req) {
     int ret = 0;
     lock ml(rsm_client_mutex);
     while (1) {
index 039bc26..b275974 100644 (file)
@@ -24,34 +24,19 @@ class rsm_client {
         bool init_members();
     public:
         rsm_client(std::string dst);
-        rsm_protocol::status invoke(int proc, std::string req, std::string &rep);
+        rsm_protocol::status invoke(int proc, std::string &rep, const std::string &req);
 
-        template<class R, class A1>
-            int call(unsigned int proc, const A1 & a1, R &r);
-
-        template<class R, class A1, class A2>
-            int call(unsigned int proc, const A1 & a1, const A2 & a2, R &r);
-
-        template<class R, class A1, class A2, class A3>
-            int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
-                    R &r);
-
-        template<class R, class A1, class A2, class A3, class A4>
-            int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
-                    const A4 & a4, R &r);
-
-        template<class R, class A1, class A2, class A3, class A4, class A5>
-            int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
-                    const A4 & a4, const A5 & a5, R &r);
+        template<class R, class ...Args>
+            int call(unsigned int proc, R & r, const Args & ...a1);
     private:
-        template<class R> int call_m(unsigned int proc, marshall &req, R &r);
+        template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
 };
 
 template<class R>
-int rsm_client::call_m(unsigned int proc, marshall &req, R &r) {
+int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
        std::string rep;
         std::string res;
-       int intret = invoke(proc, req.str(), rep);
+       int intret = invoke(proc, rep, req.cstr());
         VERIFY( intret == rsm_client_protocol::OK );
         unmarshall u(rep);
        u >> intret;
@@ -77,55 +62,9 @@ int rsm_client::call_m(unsigned int proc, marshall &req, R &r) {
        return intret;
 }
 
-template<class R, class A1>
-int rsm_client::call(unsigned int proc, const A1 & a1, R & r) {
-    marshall m;
-    m << a1;
-    return call_m(proc, m, r);
-}
-
-template<class R, class A1, class A2>
-int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, R & r) {
-    marshall m;
-    m << a1;
-    m << a2;
-    return call_m(proc, m, r);
-}
-
-template<class R, class A1, class A2, class A3>
-int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, R & r) {
-    marshall m;
-    std::string rep;
-    std::string res;
-    m << a1;
-    m << a2;
-    m << a3;
-    return call_m(proc, m, r);
-}
-
-template<class R, class A1, class A2, class A3, class A4>
-int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, R & r) {
-    marshall m;
-    std::string rep;
-    std::string res;
-    m << a1;
-    m << a2;
-    m << a3;
-    m << a4;
-    return call_m(proc, m, r);
-}
-
-template<class R, class A1, class A2, class A3, class A4, class A5>
-int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, const A5 & a5, R & r) {
-    marshall m;
-    std::string rep;
-    std::string res;
-    m << a1;
-    m << a2;
-    m << a3;
-    m << a4;
-    m << a5;
-    return call_m(proc, m, r);
+template<class R, class ...Args>
+int rsm_client::call(unsigned int proc, R & r, const Args & ...a1) {
+    return call_m(proc, r, marshall{a1...});
 }
 
 #endif