#include <cstddef>
#include <inttypes.h>
#include "lang/verify.h"
-#include "lang/algorithm.h"
-
-struct req_header {
- req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0):
- xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
- int xid;
- int proc;
- unsigned int clt_nonce;
- unsigned int srv_nonce;
- int xid_rep;
+
+struct request_header {
+ request_header(int x=0, int p=0, int c=0, int s=0, int xi=0) :
+ xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {}
+ int xid;
+ int proc;
+ unsigned int clt_nonce;
+ unsigned int srv_nonce;
+ int xid_rep;
+ request_header hton() const {
+ return {
+ htonl(xid), htonl(proc), htonl(clt_nonce), htonl(srv_nonce), htonl(xid_rep)
+ };
+ }
};
struct reply_header {
- reply_header(int x=0, int r=0): xid(x), ret(r) {}
- int xid;
- int ret;
+ reply_header(int x=0, int r=0): xid(x), ret(r) {}
+ int xid;
+ int ret;
+ reply_header hton() const {
+ return {
+ htonl(xid), htonl(ret)
+ };
+ }
};
-typedef uint64_t rpc_checksum_t;
typedef int rpc_sz_t;
-enum {
- //size of initial buffer allocation
- DEFAULT_RPC_SZ = 1024,
-#if RPC_CHECKSUMMING
- //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum
- RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t)
-#else
- RPC_HEADER_SZ = static_max<sizeof(req_header), sizeof(reply_header)>::value + sizeof(rpc_sz_t)
-#endif
-};
+//size of initial buffer allocation
+#define DEFAULT_RPC_SZ 1024
+#define RPC_HEADER_SZ (std::max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
class marshall {
- private:
- char *_buf; // Base of the raw bytes buffer (dynamically readjusted)
- int _capa; // Capacity of the buffer
- int _ind; // Read/write head position
-
- public:
- marshall() {
- _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
- VERIFY(_buf);
- _capa = DEFAULT_RPC_SZ;
- _ind = RPC_HEADER_SZ;
- }
-
- ~marshall() {
- if (_buf)
- free(_buf);
- }
-
- int size() { return _ind;}
- char *cstr() { return _buf;}
-
- void rawbyte(unsigned char);
- void rawbytes(const char *, int);
-
- // Return the current content (excluding header) as a string
- std::string get_content() {
- return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ);
- }
-
- // Return the current content (excluding header) as a string
- std::string str() {
- return get_content();
- }
-
- void pack(int i);
-
- void pack_req_header(const req_header &h) {
- int saved_sz = _ind;
- //leave the first 4-byte empty for channel to fill size of pdu
- _ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
- pack(h.xid);
- pack(h.proc);
- pack((int)h.clt_nonce);
- pack((int)h.srv_nonce);
- pack(h.xid_rep);
- _ind = saved_sz;
- }
-
- void pack_reply_header(const reply_header &h) {
- int saved_sz = _ind;
- //leave the first 4-byte empty for channel to fill size of pdu
- _ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
- pack(h.xid);
- pack(h.ret);
- _ind = saved_sz;
- }
-
- void take_buf(char **b, int *s) {
- *b = _buf;
- *s = _ind;
- _buf = NULL;
- _ind = 0;
- return;
- }
+ private:
+ char *buf_; // Base of the raw bytes buffer (dynamically readjusted)
+ size_t capacity_; // Capacity of the buffer
+ size_t index_; // Read/write head position
+
+ inline void reserve(size_t n) {
+ if((index_+n) > capacity_){
+ capacity_ += std::max(capacity_, n);
+ VERIFY (buf_ != NULL);
+ buf_ = (char *)realloc(buf_, capacity_);
+ VERIFY(buf_);
+ }
+ }
+ public:
+ struct pass { template <typename... Args> inline pass(Args&&...) {} };
+
+ template <typename... Args>
+
+ marshall(const Args&... args) {
+ buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ);
+ VERIFY(buf_);
+ capacity_ = DEFAULT_RPC_SZ;
+ index_ = RPC_HEADER_SZ;
+ (void)pass{(*this << args)...};
+ }
+
+ ~marshall() {
+ if (buf_)
+ free(buf_);
+ }
+
+ int size() { return index_;}
+ char *cstr() { return buf_;}
+ const char *cstr() const { return buf_;}
+
+ void rawbyte(unsigned char x) {
+ reserve(1);
+ buf_[index_++] = x;
+ }
+
+ void rawbytes(const char *p, int n) {
+ reserve(n);
+ memcpy(buf_+index_, p, n);
+ index_ += n;
+ }
+
+ // Return the current content (excluding header) as a string
+ std::string get_content() {
+ return std::string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ);
+ }
+
+ // Return the current content (excluding header) as a string
+ std::string str() {
+ return get_content();
+ }
+
+ void pack_req_header(const request_header &h);
+ void pack_reply_header(const reply_header &h);
+
+ void take_buf(char **b, int *s) {
+ *b = buf_;
+ *s = index_;
+ buf_ = NULL;
+ index_ = 0;
+ return;
+ }
};
+
marshall& operator<<(marshall &, bool);
marshall& operator<<(marshall &, unsigned int);
marshall& operator<<(marshall &, int);
marshall& operator<<(marshall &, unsigned long long);
marshall& operator<<(marshall &, const std::string &);
+template <class A> marshall &
+operator<<(marshall &m, const A &x) {
+ m << (unsigned int) x.size();
+ for (const auto &a : x)
+ m << a;
+ return m;
+}
+
+template <class A, class B> marshall &
+operator<<(marshall &m, const std::pair<A,B> &d) {
+ m << d.first;
+ m << d.second;
+ return m;
+}
+
class unmarshall {
- private:
- char *_buf;
- int _sz;
- int _ind;
- bool _ok;
- public:
- unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {}
- unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {}
- unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false)
- {
- //take the content which does not exclude a RPC header from a string
- take_content(s);
- }
- ~unmarshall() {
- if (_buf) free(_buf);
- }
-
- //take contents from another unmarshall object
- void take_in(unmarshall &another);
-
- //take the content which does not exclude a RPC header from a string
- void take_content(const std::string &s) {
- _sz = s.size()+RPC_HEADER_SZ;
- _buf = (char *)realloc(_buf,_sz);
- VERIFY(_buf);
- _ind = RPC_HEADER_SZ;
- memcpy(_buf+_ind, s.data(), s.size());
- _ok = true;
- }
-
- bool ok() { return _ok; }
- char *cstr() { return _buf;}
- bool okdone();
- unsigned int rawbyte();
- void rawbytes(std::string &s, unsigned int n);
-
- int ind() { return _ind;}
- int size() { return _sz;}
- void unpack(int *); //non-const ref
- void take_buf(char **b, int *sz) {
- *b = _buf;
- *sz = _sz;
- _sz = _ind = 0;
- _buf = NULL;
- }
-
- void unpack_req_header(req_header *h) {
- //the first 4-byte is for channel to fill size of pdu
- _ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
- unpack(&h->xid);
- unpack(&h->proc);
- unpack((int *)&h->clt_nonce);
- unpack((int *)&h->srv_nonce);
- unpack(&h->xid_rep);
- _ind = RPC_HEADER_SZ;
- }
-
- void unpack_reply_header(reply_header *h) {
- //the first 4-byte is for channel to fill size of pdu
- _ind = sizeof(rpc_sz_t);
-#if RPC_CHECKSUMMING
- _ind += sizeof(rpc_checksum_t);
-#endif
- unpack(&h->xid);
- unpack(&h->ret);
- _ind = RPC_HEADER_SZ;
- }
+ private:
+ char *buf_;
+ int sz_;
+ int index_;
+ bool ok_;
+
+ inline bool ensure(size_t n);
+ public:
+ unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {}
+ unmarshall(char *b, int sz): buf_(b),sz_(sz),index_(),ok_(true) {}
+ unmarshall(const std::string &s) : buf_(NULL),sz_(0),index_(0),ok_(false)
+ {
+ //take the content which does not exclude a RPC header from a string
+ take_content(s);
+ }
+ ~unmarshall() {
+ if (buf_) free(buf_);
+ }
+
+ //take contents from another unmarshall object
+ void take_in(unmarshall &another);
+
+ //take the content which does not exclude a RPC header from a string
+ void take_content(const std::string &s) {
+ sz_ = s.size()+RPC_HEADER_SZ;
+ buf_ = (char *)realloc(buf_,sz_);
+ VERIFY(buf_);
+ index_ = RPC_HEADER_SZ;
+ memcpy(buf_+index_, s.data(), s.size());
+ ok_ = true;
+ }
+
+ bool ok() const { return ok_; }
+ char *cstr() { return buf_;}
+ bool okdone() const { return ok_ && index_ == sz_; }
+
+ unsigned int rawbyte();
+ void rawbytes(std::string &s, size_t n);
+
+ int ind() { return index_;}
+ int size() { return sz_;}
+ void unpack(int *); //non-const ref
+ void take_buf(char **b, int *sz) {
+ *b = buf_;
+ *sz = sz_;
+ sz_ = index_ = 0;
+ buf_ = NULL;
+ }
+
+ void unpack_req_header(request_header *h) {
+ //the first 4-byte is for channel to fill size of pdu
+ index_ = sizeof(rpc_sz_t);
+ unpack(&h->xid);
+ unpack(&h->proc);
+ unpack((int *)&h->clt_nonce);
+ unpack((int *)&h->srv_nonce);
+ unpack(&h->xid_rep);
+ index_ = RPC_HEADER_SZ;
+ }
+
+ void unpack_reply_header(reply_header *h) {
+ //the first 4-byte is for channel to fill size of pdu
+ index_ = sizeof(rpc_sz_t);
+ unpack(&h->xid);
+ unpack(&h->ret);
+ index_ = RPC_HEADER_SZ;
+ }
+
+ template <class A>
+ inline A grab() {
+ A a;
+ *this >> a;
+ return a;
+ }
};
-unmarshall& operator>>(unmarshall &, bool &);
-unmarshall& operator>>(unmarshall &, unsigned char &);
-unmarshall& operator>>(unmarshall &, char &);
-unmarshall& operator>>(unmarshall &, unsigned short &);
-unmarshall& operator>>(unmarshall &, short &);
-unmarshall& operator>>(unmarshall &, unsigned int &);
-unmarshall& operator>>(unmarshall &, int &);
-unmarshall& operator>>(unmarshall &, unsigned long long &);
-unmarshall& operator>>(unmarshall &, std::string &);
-
-template <class C> marshall &
-operator<<(marshall &m, std::vector<C> v)
-{
- m << (unsigned int) v.size();
- for(unsigned i = 0; i < v.size(); i++)
- m << v[i];
- return m;
+template <class A> unmarshall & operator>>(unmarshall &u, A &x) {
+ unsigned n = u.grab<unsigned>();
+ x.clear();
+ while (n--)
+ x.emplace_back(u.grab<typename A::value_type>());
+ return u;
}
-template <class C> unmarshall &
-operator>>(unmarshall &u, std::vector<C> &v)
-{
- unsigned n;
- u >> n;
- for(unsigned i = 0; i < n; i++){
- C z;
- u >> z;
- v.push_back(z);
- }
- return u;
+template <class A, class B> unmarshall &
+operator>>(unmarshall &u, std::map<A,B> &x) {
+ unsigned n = u.grab<unsigned>();
+ x.clear();
+ while (n--)
+ x.emplace(u.grab<std::pair<A,B>>());
+ return u;
}
-template <class A, class B> marshall &
-operator<<(marshall &m, const std::map<A,B> &d) {
- typename std::map<A,B>::const_iterator i;
+template <class A, class B> unmarshall &
+operator>>(unmarshall &u, std::pair<A,B> &d) {
+ return u >> d.first >> d.second;
+}
- m << (unsigned int) d.size();
+typedef std::function<int(unmarshall &, marshall &)> handler;
+
+//
+// Automatic marshalling wrappers for RPC handlers
+//
+
+// PAI 2013/09/19
+// C++11 does neither of these two things for us:
+// 1) Declare variables using a parameter pack expansion, like so
+// Args ...args;
+// 2) Call a function with a std::tuple of the arguments it expects
+//
+// We implement an 'invoke' function for functions of the RPC handler
+// signature, i.e. int(R & r, const Args...)
+//
+// One thing we need in order to accomplish this is a way to cause the compiler
+// to specialize 'invoke' with a parameter pack containing a list of indices
+// for the elements of the tuple. This will allow us to call the underlying
+// function with the exploded contents of the tuple. The empty type
+// tuple_indices<size_t...> accomplishes this. It will be passed in to
+// 'invoke' as a parameter which will be ignored, but its type will force the
+// compiler to specialize 'invoke' appropriately.
+
+// The following implementation of tuple_indices is redistributed under the MIT
+// License as an insubstantial portion of the LLVM compiler infrastructure.
+
+template <size_t...> struct tuple_indices {};
+template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
+template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
+ typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
+};
+template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
+ typedef tuple_indices<Indices...> type;
+};
+template <size_t E, size_t S=0> struct make_tuple_indices {
+ typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
+};
- for (i = d.begin(); i != d.end(); i++) {
- m << i->first << i->second;
- }
- return m;
+// This class encapsulates the default response to runtime unmarshalling
+// failures. The templated wrappers below may optionally use a different
+// class.
+
+struct VerifyOnFailure {
+ static inline int unmarshall_args_failure() {
+ VERIFY(0);
+ return 0;
+ }
+};
+
+// Here's the implementation of 'invoke'. It could be more general, but this
+// meets our needs.
+
+// One for function pointers...
+
+template <class F, class R, class args_type, size_t ...Indices>
+typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
+ return f(r, std::move(std::get<Indices>(t))...);
}
-template <class A, class B> unmarshall &
-operator>>(unmarshall &u, std::map<A,B> &d) {
- unsigned int n;
- u >> n;
-
- d.clear();
-
- for (unsigned int lcv = 0; lcv < n; lcv++) {
- A a;
- B b;
- u >> a >> b;
- d[a] = b;
- }
- return u;
+// And one for pointers to member functions...
+
+template <class F, class C, class R, class args_type, size_t ...Indices>
+typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
+ return (c->*f)(r, std::move(std::get<Indices>(t))...);
}
+// The class marshalled_func_imp uses partial template specialization to
+// implement the ::wrap static function. ::wrap takes a function pointer or a
+// pointer to a member function and returns a handler * object which
+// unmarshalls arguments, verifies successful unmarshalling, calls the supplied
+// function, and marshalls the response.
+
+template <class Functor, class Instance, class Signature,
+ class ErrorHandler=VerifyOnFailure> struct marshalled_func_imp;
+
+// Here we specialize on the Signature template parameter to obtain the list of
+// argument types. Note that we do not assume that the Functor parameter has
+// the same pattern as Signature; this allows us to ignore the distinctions
+// between various types of callable objects at this level of abstraction.
+
+template <class F, class C, class ErrorHandler, class R, class... Args>
+struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
+ static inline handler *wrap(F f, C *c=nullptr) {
+ // This type definition corresponds to an empty struct with
+ // template parameters running from 0 up to (# args) - 1.
+ using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
+ // This type definition represents storage for f's unmarshalled
+ // arguments. std::decay is (most notably) stripping off const
+ // qualifiers.
+ using ArgsStorage = std::tuple<typename std::decay<Args>::type...>;
+ // Allocate a handler (i.e. std::function) to hold the lambda
+ // which will unmarshall RPCs and call f.
+ return new handler([=](unmarshall &u, marshall &m) -> int {
+ // Unmarshall each argument with the correct type and store the
+ // result in a tuple.
+ ArgsStorage t = {u.grab<typename std::decay<Args>::type>()...};
+ // Verify successful unmarshalling of the entire input stream.
+ if (!u.okdone())
+ return ErrorHandler::unmarshall_args_failure();
+ // Allocate space for the RPC response -- will be passed into the
+ // function as an lvalue reference.
+ R r;
+ // Perform the invocation. Note that Indices() calls the default
+ // constructor of the empty struct with the special template
+ // parameters.
+ int b = invoke(f, c, r, t, Indices());
+ // Marshall the response.
+ m << r;
+ // Make like a tree.
+ return b;
+ });
+ }
+};
+
+// More partial template specialization shenanigans to reduce the number of
+// parameters which must be provided explicitly and to support a few common
+// callable types. C++11 doesn't allow partial function template
+// specialization, so we use classes (structs).
+
+template <class Functor, class ErrorHandler=VerifyOnFailure,
+ class Signature=Functor> struct marshalled_func;
+
+template <class F, class ErrorHandler, class... Args>
+struct marshalled_func<F, ErrorHandler, int(*)(Args...)> :
+ public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class C, class... Args>
+struct marshalled_func<F, ErrorHandler, int(C::*)(Args...)> :
+ public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class Signature>
+struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
+ public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
+
#endif