From: Peter Iannucci Date: Fri, 20 Sep 2013 05:31:16 +0000 (-0400) Subject: More cleanups to marshalling logic. X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/b86bce0900f88c530f23dd602a8f2ef9ce008f8a More cleanups to marshalling logic. --- diff --git a/lock.h b/lock.h index b23f6cf..1789ec3 100644 --- a/lock.h +++ b/lock.h @@ -10,7 +10,7 @@ using cond = std::condition_variable; class adopt_lock : public lock { public: - inline adopt_lock(class mutex &m) : std::unique_lock(m, std::adopt_lock) { + explicit inline adopt_lock(class mutex &m) : std::unique_lock(m, std::adopt_lock) { } inline ~adopt_lock() { release(); diff --git a/rpc/marshall.h b/rpc/marshall.h index 448240b..bd45c02 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -12,101 +12,105 @@ #include #include "lang/verify.h" -struct req_header { - req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0): - xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} - int xid; - int proc; - unsigned int clt_nonce; - unsigned int srv_nonce; - int xid_rep; +struct request_header { + request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) : + xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} + int xid; + int proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; + request_header hton() const { + return { + htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep) + }; + } }; struct reply_header { - reply_header(int x=0, int r=0): xid(x), ret(r) {} - int xid; - int ret; + reply_header(int x=0, int r=0): xid(x), ret(r) {} + int xid; + int ret; + reply_header hton() const { + return { + htonl(xid), htonl(ret) + }; + } }; typedef int rpc_sz_t; -//size of initial buffer allocation +//size of initial buffer allocation #define DEFAULT_RPC_SZ 1024 -#define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) - -struct pass { - template inline pass(Args&&...) {} -}; +#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) class marshall { - private: - char *_buf; // Base of the raw bytes buffer (dynamically readjusted) - int _capa; // Capacity of the buffer - int _ind; // Read/write head position - - public: - marshall() { - _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); - VERIFY(_buf); - _capa = DEFAULT_RPC_SZ; - _ind = RPC_HEADER_SZ; - } - - template marshall(const Args&... args) : marshall() { + private: + char *buf_; // Base of the raw bytes buffer (dynamically readjusted) + size_t capacity_; // Capacity of the buffer + size_t index_; // Read/write head position + + inline void reserve(size_t n) { + if((index_+n) > capacity_){ + capacity_ += std::max(capacity_, n); + VERIFY (buf_ != NULL); + buf_ = (char *)realloc(buf_, capacity_); + VERIFY(buf_); + } + } + public: + struct pass { template inline pass(Args&&...) {} }; + + template + + marshall(const Args&... args) { + buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); + VERIFY(buf_); + capacity_ = DEFAULT_RPC_SZ; + index_ = RPC_HEADER_SZ; (void)pass{(*this << args)...}; } - ~marshall() { - if (_buf) - free(_buf); - } - - int size() { return _ind;} - char *cstr() { return _buf;} - - void rawbyte(unsigned char); - void rawbytes(const char *, int); - - // Return the current content (excluding header) as a string - std::string get_content() { - return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ); - } - - // Return the current content (excluding header) as a string - std::string str() { - return get_content(); - } - - void pack(int i); - - void pack_req_header(const req_header &h) { - int saved_sz = _ind; - //leave the first 4-byte empty for channel to fill size of pdu - _ind = sizeof(rpc_sz_t); - pack(h.xid); - pack(h.proc); - pack((int)h.clt_nonce); - pack((int)h.srv_nonce); - pack(h.xid_rep); - _ind = saved_sz; - } - - void pack_reply_header(const reply_header &h) { - int saved_sz = _ind; - //leave the first 4-byte empty for channel to fill size of pdu - _ind = sizeof(rpc_sz_t); - pack(h.xid); - pack(h.ret); - _ind = saved_sz; - } - - void take_buf(char **b, int *s) { - *b = _buf; - *s = _ind; - _buf = NULL; - _ind = 0; - return; - } + ~marshall() { + if (buf_) + free(buf_); + } + + int size() { return index_;} + char *cstr() { return buf_;} + const char *cstr() const { return buf_;} + + void rawbyte(unsigned char x) { + reserve(1); + buf_[index_++] = x; + } + + void rawbytes(const char *p, int n) { + reserve(n); + memcpy(buf_+index_, p, n); + index_ += n; + } + + // Return the current content (excluding header) as a string + std::string get_content() { + return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ); + } + + // Return the current content (excluding header) as a string + std::string str() { + return get_content(); + } + + void pack_req_header(const request_header &h); + void pack_reply_header(const reply_header &h); + + void take_buf(char **b, int *s) { + *b = buf_; + *s = index_; + buf_ = NULL; + index_ = 0; + return; + } }; marshall& operator<<(marshall &, bool); @@ -122,22 +126,22 @@ marshall& operator<<(marshall &, const std::string &); template marshall & operator<<(marshall &m, std::vector v) { - m << (unsigned int) v.size(); - for(unsigned i = 0; i < v.size(); i++) - m << v[i]; - return m; + m << (unsigned int) v.size(); + for(unsigned i = 0; i < v.size(); i++) + m << v[i]; + return m; } template marshall & operator<<(marshall &m, const std::map &d) { - typename std::map::const_iterator i; + typename std::map::const_iterator i; - m << (unsigned int) d.size(); + m << (unsigned int) d.size(); - for (i = d.begin(); i != d.end(); i++) { - m << i->first << i->second; - } - return m; + for (i = d.begin(); i != d.end(); i++) { + m << i->first << i->second; + } + return m; } template marshall & @@ -154,70 +158,73 @@ operator<<(marshall &m, const std::pair &d) { } class unmarshall { - private: - char *_buf; - int _sz; - int _ind; - bool _ok; - public: - unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {} - unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {} - unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false) - { - //take the content which does not exclude a RPC header from a string - take_content(s); - } - ~unmarshall() { - if (_buf) free(_buf); - } - - //take contents from another unmarshall object - void take_in(unmarshall &another); - - //take the content which does not exclude a RPC header from a string - void take_content(const std::string &s) { - _sz = s.size()+RPC_HEADER_SZ; - _buf = (char *)realloc(_buf,_sz); - VERIFY(_buf); - _ind = RPC_HEADER_SZ; - memcpy(_buf+_ind, s.data(), s.size()); - _ok = true; - } - - bool ok() { return _ok; } - char *cstr() { return _buf;} - bool okdone(); - unsigned int rawbyte(); - void rawbytes(std::string &s, unsigned int n); - - int ind() { return _ind;} - int size() { return _sz;} - void unpack(int *); //non-const ref - void take_buf(char **b, int *sz) { - *b = _buf; - *sz = _sz; - _sz = _ind = 0; - _buf = NULL; - } - - void unpack_req_header(req_header *h) { - //the first 4-byte is for channel to fill size of pdu - _ind = sizeof(rpc_sz_t); - unpack(&h->xid); - unpack(&h->proc); - unpack((int *)&h->clt_nonce); - unpack((int *)&h->srv_nonce); - unpack(&h->xid_rep); - _ind = RPC_HEADER_SZ; - } - - void unpack_reply_header(reply_header *h) { - //the first 4-byte is for channel to fill size of pdu - _ind = sizeof(rpc_sz_t); - unpack(&h->xid); - unpack(&h->ret); - _ind = RPC_HEADER_SZ; - } + private: + char *buf_; + int sz_; + int index_; + bool ok_; + + inline bool ensure(size_t n); + public: + unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {} + unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {} + unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false) + { + //take the content which does not exclude a RPC header from a string + take_content(s); + } + ~unmarshall() { + if (buf_) free(buf_); + } + + //take contents from another unmarshall object + void take_in(unmarshall &another); + + //take the content which does not exclude a RPC header from a string + void take_content(const std::string &s) { + sz_ = s.size()+RPC_HEADER_SZ; + buf_ = (char *)realloc(buf_,sz_); + VERIFY(buf_); + index_ = RPC_HEADER_SZ; + memcpy(buf_+index_, s.data(), s.size()); + ok_ = true; + } + + bool ok() const { return ok_; } + char *cstr() { return buf_;} + bool okdone() const { return ok_ && index_ == sz_; } + + unsigned int rawbyte(); + void rawbytes(std::string &s, size_t n); + + int ind() { return index_;} + int size() { return sz_;} + void unpack(int *); //non-const ref + void take_buf(char **b, int *sz) { + *b = buf_; + *sz = sz_; + sz_ = index_ = 0; + buf_ = NULL; + } + + void unpack_req_header(request_header *h) { + //the first 4-byte is for channel to fill size of pdu + index_ = sizeof(rpc_sz_t); + unpack(&h->xid); + unpack(&h->proc); + unpack((int *)&h->clt_nonce); + unpack((int *)&h->srv_nonce); + unpack(&h->xid_rep); + index_ = RPC_HEADER_SZ; + } + + void unpack_reply_header(reply_header *h) { + //the first 4-byte is for channel to fill size of pdu + index_ = sizeof(rpc_sz_t); + unpack(&h->xid); + unpack(&h->ret); + index_ = RPC_HEADER_SZ; + } template inline A grab() { @@ -240,8 +247,8 @@ unmarshall& operator>>(unmarshall &, std::string &); template unmarshall & operator>>(unmarshall &u, std::vector &v) { - unsigned n; - u >> n; + unsigned n; + u >> n; v.clear(); while (n--) { C c; @@ -253,16 +260,16 @@ operator>>(unmarshall &u, std::vector &v) template unmarshall & operator>>(unmarshall &u, std::map &d) { - unsigned n; - u >> n; - d.clear(); + unsigned n; + u >> n; + d.clear(); while (n--) { A a; B b; u >> a >> b; d[a] = b; } - return u; + return u; } template unmarshall & @@ -283,6 +290,32 @@ operator>>(unmarshall &u, std::pair &d) { return u >> d.first >> d.second; } +typedef std::function handler; + +// +// Automatic marshalling wrappers for RPC handlers +// + +// PAI 2013/09/19 +// C++11 does neither of these two things for us: +// 1) Declare variables using a parameter pack expansion, like so +// Args ...args; +// 2) Call a function with a std::tuple of the arguments it expects +// +// We implement an 'invoke' function for functions of the RPC handler +// signature, i.e. int(R & r, const Args...) +// +// One thing we need in order to accomplish this is a way to cause the compiler +// to specialize 'invoke' with a parameter pack containing a list of indices +// for the elements of the tuple. This will allow us to call the underlying +// function with the exploded contents of the tuple. The empty type +// tuple_indices accomplishes this. It will be passed in to +// 'invoke' as a parameter which will be ignored, but its type will force the +// compiler to specialize 'invoke' appropriately. + +// The following implementation of tuple_indices is redistributed under the MIT +// License as an insubstantial portion of the LLVM compiler infrastructure. + template struct tuple_indices {}; template struct make_indices_imp; template struct make_indices_imp, E> { @@ -291,10 +324,14 @@ template struct make_indices_imp struct make_indices_imp, E> { typedef tuple_indices type; }; -template struct make_tuple_indices { +template struct make_tuple_indices { typedef typename make_indices_imp, E>::type type; }; +// This class encapsulates the default response to runtime unmarshalling +// failures. The templated wrappers below may optionally use a different +// class. + struct VerifyOnFailure { static inline int unmarshall_args_failure() { VERIFY(0); @@ -302,70 +339,91 @@ struct VerifyOnFailure { } }; -typedef std::function handler; - -using std::move; -using std::get; -using std::tuple; -using std::decay; +// Here's the implementation of 'invoke'. It could be more general, but this +// meets our needs. -#include +// One for function pointers... template typename std::enable_if::value, int>::type invoke(F f, void *, R & r, args_type & t, tuple_indices) { - return f(r, move(get(t))...); + return f(r, std::move(std::get(t))...); } +// And one for pointers to member functions... + template typename std::enable_if::value, int>::type invoke(F f, C *c, R & r, args_type & t, tuple_indices) { - return (c->*f)(r, move(get(t))...); + return (c->*f)(r, std::move(std::get(t))...); } -template struct marshalled_func_imp; +// Here we specialize on the Signature template parameter to obtain the list of +// argument types. Note that we do not assume that the Functor parameter has +// the same pattern as Signature; this allows us to ignore the distinctions +// between various types of callable objects at this level of abstraction. + template struct marshalled_func_imp { - using result_type = R; - using args_type = tuple::type...>; - using index_type = typename make_tuple_indices::type; - - static inline int call(F f, C *c, unmarshall &u, marshall &m) { - args_type t{std::move(std::tuple{u.grab()...})}; - if (!u.okdone()) - return ErrorHandler::unmarshall_args_failure(); - R r; - int b = invoke(f, c, r, t, index_type()); - m << r; - return b; - } - static inline handler *wrap(F f, C *c=nullptr) { - typename decay::type f_ = f; - return new handler([f_, c](unmarshall &u, marshall &m) -> int { - return call(f_, c, u, m); + // This type definition corresponds to an empty struct with + // template parameters running from 0 up to (# args) - 1. + using Indices = typename make_tuple_indices::type; + // This type definition represents storage for f's unmarshalled + // arguments. std::decay is (most notably) stripping off const + // qualifiers. + using ArgsStorage = std::tuple::type...>; + // Allocate a handler (i.e. std::function) to hold the lambda + // which will unmarshall RPCs and call f. + return new handler([=](unmarshall &u, marshall &m) -> int { + // Unmarshall each argument with the correct type and store the + // result in a tuple. + ArgsStorage t = {u.grab::type>()...}; + // Verify successful unmarshalling of the entire input stream. + if (!u.okdone()) + return ErrorHandler::unmarshall_args_failure(); + // Allocate space for the RPC response -- will be passed into the + // function as an lvalue reference. + R r; + // Perform the invocation. Note that Indices() calls the default + // constructor of the empty struct with the special template + // parameters. + int b = invoke(f, c, r, t, Indices()); + // Marshall the response. + m << r; + // Make like a tree. + return b; }); } }; -template struct marshalled_func; +// More partial template specialization shenanigans to reduce the number of +// parameters which must be provided explicitly and to support a few common +// callable types. C++11 doesn't allow partial function template +// specialization, so we use classes (structs). + +template struct marshalled_func; template -struct marshalled_func : +struct marshalled_func : public marshalled_func_imp {}; template -struct marshalled_func : +struct marshalled_func : public marshalled_func_imp {}; template -struct marshalled_func, ErrorHandler> : +struct marshalled_func> : public marshalled_func_imp {}; #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index a3ffef1..9d9f1e2 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -189,10 +189,8 @@ rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, ca.xid = xid_++; calls_[ca.xid] = &ca; - req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, - xid_rep_window_.front()); - req.pack_req_header(h); - xid_rep = xid_rep_window_.front(); + req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); + xid_rep = xid_rep_window_.front(); } TO curr_to; @@ -476,7 +474,7 @@ rpcs::dispatch(djob_t *j) unmarshall req(j->buf, j->sz); delete j; - req_header h; + request_header h; req.unpack_req_header(&h); int proc = h.proc; @@ -731,109 +729,53 @@ rpcs::rpcbind(int &r, int a) return 0; } -void -marshall::rawbyte(unsigned char x) -{ - if(_ind >= _capa){ - _capa *= 2; - VERIFY (_buf != NULL); - _buf = (char *)realloc(_buf, _capa); - VERIFY(_buf); - } - _buf[_ind++] = x; -} - -void -marshall::rawbytes(const char *p, int n) -{ - if((_ind+n) > _capa){ - _capa = _capa > n? 2*_capa:(_capa+n); - VERIFY (_buf != NULL); - _buf = (char *)realloc(_buf, _capa); - VERIFY(_buf); - } - memcpy(_buf+_ind, p, n); - _ind += n; -} - -marshall & -operator<<(marshall &m, bool x) -{ - m.rawbyte(x); - return m; -} - marshall & -operator<<(marshall &m, unsigned char x) -{ +operator<<(marshall &m, uint8_t x) { m.rawbyte(x); return m; } marshall & -operator<<(marshall &m, char x) -{ - m << (unsigned char) x; - return m; -} - - -marshall & -operator<<(marshall &m, unsigned short x) -{ - m.rawbyte((x >> 8) & 0xff); - m.rawbyte(x & 0xff); - return m; -} - -marshall & -operator<<(marshall &m, short x) -{ - m << (unsigned short) x; +operator<<(marshall &m, uint16_t x) { + x = htons(x); + m.rawbytes((char *)&x, 2); return m; } marshall & -operator<<(marshall &m, unsigned int x) -{ - // network order is big-endian - m.rawbyte((x >> 24) & 0xff); - m.rawbyte((x >> 16) & 0xff); - m.rawbyte((x >> 8) & 0xff); - m.rawbyte(x & 0xff); +operator<<(marshall &m, uint32_t x) { + x = htonl(x); + m.rawbytes((char *)&x, 4); return m; } -marshall & -operator<<(marshall &m, int x) -{ - m << (unsigned int) x; - return m; -} +marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; } +marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; } +marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; } +marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; } +marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; } marshall & -operator<<(marshall &m, const std::string &s) -{ +operator<<(marshall &m, const std::string &s) { m << (unsigned int) s.size(); m.rawbytes(s.data(), s.size()); return m; } -marshall & -operator<<(marshall &m, unsigned long long x) -{ - m << (unsigned int) (x >> 32); - m << (unsigned int) x; - return m; +void marshall::pack_req_header(const request_header &h) { + int saved_sz = index_; + //leave the first 4-byte empty for channel to fill size of pdu + index_ = sizeof(rpc_sz_t); + *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep; + index_ = saved_sz; } -void -marshall::pack(int x) -{ - rawbyte((x >> 24) & 0xff); - rawbyte((x >> 16) & 0xff); - rawbyte((x >> 8) & 0xff); - rawbyte(x & 0xff); +void marshall::pack_reply_header(const reply_header &h) { + int saved_sz = index_; + //leave the first 4-byte empty for channel to fill size of pdu + index_ = sizeof(rpc_sz_t); + *this << h.xid << h.ret; + index_ = saved_sz; } void @@ -849,56 +791,57 @@ unmarshall::unpack(int *x) void unmarshall::take_in(unmarshall &another) { - if(_buf) - free(_buf); - another.take_buf(&_buf, &_sz); - _ind = RPC_HEADER_SZ; - _ok = _sz >= RPC_HEADER_SZ?true:false; + if(buf_) + free(buf_); + another.take_buf(&buf_, &sz_); + index_ = RPC_HEADER_SZ; + ok_ = sz_ >= RPC_HEADER_SZ?true:false; } -bool -unmarshall::okdone() -{ - if(ok() && _ind == _sz){ - return true; - } else { - return false; - } +inline bool +unmarshall::ensure(size_t n) { + if (index_+n > sz_) + ok_ = false; + return ok_; } unsigned int unmarshall::rawbyte() { - char c = 0; - if(_ind >= _sz) - _ok = false; - else - c = _buf[_ind++]; - return c; + if (!ensure(1)) + return 0; + return buf_[index_++]; +} + +void +unmarshall::rawbytes(std::string &ss, size_t n) +{ + VERIFY(ensure(n)); + ss.assign(buf_+index_, n); + index_ += n; } unmarshall & operator>>(unmarshall &u, bool &x) { - x = (bool) u.rawbyte() ; + x = (bool)u.rawbyte(); return u; } unmarshall & operator>>(unmarshall &u, unsigned char &x) { - x = (unsigned char) u.rawbyte() ; + x = (unsigned char)u.rawbyte(); return u; } unmarshall & operator>>(unmarshall &u, char &x) { - x = (char) u.rawbyte(); + x = (char)u.rawbyte(); return u; } - unmarshall & operator>>(unmarshall &u, unsigned short &x) { @@ -955,19 +898,6 @@ operator>>(unmarshall &u, std::string &s) return u; } -void -unmarshall::rawbytes(std::string &ss, unsigned int n) -{ - if((_ind+n) > (unsigned)_sz){ - _ok = false; - } else { - std::string tmps = std::string(_buf+_ind, n); - swap(ss, tmps); - VERIFY(ss.size() == n); - _ind += n; - } -} - bool operator<(const sockaddr_in &a, const sockaddr_in &b){ return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || ((a.sin_addr.s_addr == b.sin_addr.s_addr) && diff --git a/rpc/rpc.h b/rpc/rpc.h index 245f277..20a0aa9 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -27,12 +27,6 @@ class rpc_const { static const int cancel_failure = -7; }; -struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; - } -}; - // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, @@ -76,16 +70,16 @@ class rpcc : public chanmgr { std::map calls_; std::list xid_rep_window_; - - struct request { - request() { clear(); } - void clear() { buf.clear(); xid = -1; } - bool isvalid() { return xid != -1; } - std::string buf; - int xid; - }; - struct request dup_req_; - int xid_rep_done_; + + struct request { + request() { clear(); } + void clear() { buf.clear(); xid = -1; } + bool isvalid() { return xid != -1; } + std::string buf; + int xid; + }; + struct request dup_req_; + int xid_rep_done_; public: rpcc(sockaddr_in d, bool retrans=true); @@ -258,8 +252,14 @@ class rpcs : public chanmgr { template void reg(unsigned int proc, F f, C *c=nullptr); }; +struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } +}; + template void rpcs::reg(unsigned int proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); + reg1(proc, marshalled_func::wrap(f, c)); } void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 8aed748..c381745 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -81,7 +81,7 @@ void testmarshall() { marshall m; - req_header rh(1,2,3,4,5); + request_header rh{1,2,3,4,5}; m.pack_req_header(rh); VERIFY(m.size()==RPC_HEADER_SZ); int i = 12345; @@ -97,7 +97,7 @@ testmarshall() VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); unmarshall un(b,sz); - req_header rh1; + request_header rh1; un.unpack_req_header(&rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; @@ -203,7 +203,7 @@ simple_tests(rpcc *c) int xx = 0; intret = c->call_timeout(23, rpcc::to(3000), xx, 77); VERIFY(intret == 0 && xx == 78); - printf(" -- no suprious timeout .. ok\n"); + printf(" -- no spurious timeout .. ok\n"); // specify a timeout value to an RPC that should succeed (tcp) { @@ -211,7 +211,7 @@ simple_tests(rpcc *c) std::string rep; c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x"); VERIFY(rep.size() == 1001); - printf(" -- no suprious timeout .. ok\n"); + printf(" -- no spurious timeout .. ok\n"); } // huge RPC diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 146764f..73f94f4 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -9,7 +9,7 @@ ThrPool::ThrPool(int sz, bool blocking) : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { for (int i=0; i void rsm::reg(int proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); + reg1(proc, marshalled_func::wrap(f, c)); } #endif /* rsm_h */ diff --git a/rsm_client.cc b/rsm_client.cc index 7062cea..2e6d33d 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -28,7 +28,7 @@ void rsm_client::primary_failure() { known_mems.pop_back(); } -rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) { +rsm_protocol::status rsm_client::invoke(int proc, std::string &rep, const std::string &req) { int ret = 0; lock ml(rsm_client_mutex); while (1) { diff --git a/rsm_client.h b/rsm_client.h index 039bc26..b275974 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -24,34 +24,19 @@ class rsm_client { bool init_members(); public: rsm_client(std::string dst); - rsm_protocol::status invoke(int proc, std::string req, std::string &rep); + rsm_protocol::status invoke(int proc, std::string &rep, const std::string &req); - template - int call(unsigned int proc, const A1 & a1, R &r); - - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, R &r); - - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - R &r); - - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, R &r); - - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, const A5 & a5, R &r); + template + int call(unsigned int proc, R & r, const Args & ...a1); private: - template int call_m(unsigned int proc, marshall &req, R &r); + template int call_m(unsigned int proc, R & r, const marshall & req); }; template -int rsm_client::call_m(unsigned int proc, marshall &req, R &r) { +int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { std::string rep; std::string res; - int intret = invoke(proc, req.str(), rep); + int intret = invoke(proc, rep, req.cstr()); VERIFY( intret == rsm_client_protocol::OK ); unmarshall u(rep); u >> intret; @@ -77,55 +62,9 @@ int rsm_client::call_m(unsigned int proc, marshall &req, R &r) { return intret; } -template -int rsm_client::call(unsigned int proc, const A1 & a1, R & r) { - marshall m; - m << a1; - return call_m(proc, m, r); -} - -template -int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, R & r) { - marshall m; - m << a1; - m << a2; - return call_m(proc, m, r); -} - -template -int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, R & r) { - marshall m; - std::string rep; - std::string res; - m << a1; - m << a2; - m << a3; - return call_m(proc, m, r); -} - -template -int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, R & r) { - marshall m; - std::string rep; - std::string res; - m << a1; - m << a2; - m << a3; - m << a4; - return call_m(proc, m, r); -} - -template -int rsm_client::call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, const A4 & a4, const A5 & a5, R & r) { - marshall m; - std::string rep; - std::string res; - m << a1; - m << a2; - m << a3; - m << a4; - m << a5; - return call_m(proc, m, r); +template +int rsm_client::call(unsigned int proc, R & r, const Args & ...a1) { + return call_m(proc, r, marshall{a1...}); } #endif