Cosmetic improvements.
[invirt/third/libt4.git] / rpc / rpc.h
index 723121c..211c717 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #ifndef rpc_h
 #define rpc_h
 
+#include "types.h"
 #include <sys/socket.h>
 #include <netinet/in.h>
-#include <list>
-#include <map>
-#include <stdio.h>
 
+#include "rpc_protocol.h"
 #include "thr_pool.h"
 #include "marshall.h"
+#include "marshall_wrap.h"
 #include "connection.h"
 
-#ifdef DMALLOC
-#include "dmalloc.h"
-#endif
+namespace rpc {
+    static constexpr milliseconds to_max{12000};
+    static constexpr milliseconds to_min{100};
+}
 
-class rpc_const {
-       public:
-               static const unsigned int bind = 1;   // handler number reserved for bind
-               static const int timeout_failure = -1;
-               static const int unmarshal_args_failure = -2;
-               static const int unmarshal_reply_failure = -3;
-               static const int atmostonce_failure = -4;
-               static const int oldsrv_failure = -5;
-               static const int bind_failure = -6;
-               static const int cancel_failure = -7;
-};
+template<class P, class R, class ...Args> struct is_valid_call : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_call<S(R &, Args...), R, Args...> : true_type {};
+
+template<class P, class F> struct is_valid_registration : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_registration<S(R &, typename decay<Args>::type...), S(R &, Args...)> : true_type {};
+
+template<class P, class C, class S, class R, class ...Args>
+struct is_valid_registration<P, S(C::*)(R &, Args...)> : is_valid_registration<P, S(R &, Args...)> {};
 
 // rpc client endpoint.
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
-class rpcc : public chanmgr {
-
-       private:
-
-               //manages per rpc info
-               struct caller {
-                       caller(unsigned int xxid, unmarshall *un);
-                       ~caller();
-
-                       unsigned int xid;
-                       unmarshall *un;
-                       int intret;
-                       bool done;
-            std::mutex m;
-            std::condition_variable c;
-               };
-
-               void get_refconn(connection **ch);
-               void update_xid_rep(unsigned int xid);
-
-
-               sockaddr_in dst_;
-               unsigned int clt_nonce_;
-               unsigned int srv_nonce_;
-               bool bind_done_;
-               unsigned int xid_;
-               int lossytest_;
-               bool retrans_;
-               bool reachable_;
-
-               connection *chan_;
-
-        std::mutex m_; // protect insert/delete to calls[]
-               std::mutex chan_m_;
-
-               bool destroy_wait_;
-        std::condition_variable destroy_wait_c_;
-
-               std::map<int, caller *> calls_;
-               std::list<unsigned int> xid_rep_window_;
-                
-                struct request {
-                    request() { clear(); }
-                    void clear() { buf.clear(); xid = -1; }
-                    bool isvalid() { return xid != -1; }
-                    std::string buf;
-                    int xid;
-                };
-                struct request dup_req_;
-                int xid_rep_done_;
-       public:
-
-               rpcc(sockaddr_in d, bool retrans=true);
-               ~rpcc();
-
-               struct TO {
-                       int to;
-               };
-               static const TO to_max;
-               static const TO to_min;
-               static TO to(int x) { TO t; t.to = x; return t;}
-
-               unsigned int id() { return clt_nonce_; }
-
-               int bind(TO to = to_max);
-
-               void set_reachable(bool r) { reachable_ = r; }
-
-               void cancel();
-                
-                int islossy() { return lossytest_ > 0; }
-
-               int call1(unsigned int proc, 
-                               marshall &req, unmarshall &rep, TO to);
-
-               bool got_pdu(connection *c, char *b, int sz);
-
-
-               template<class R>
-                       int call_m(unsigned int proc, marshall &req, R & r, TO to);
-
-               template<class R>
-                       int call(unsigned int proc, R & r, TO to = to_max); 
-               template<class R, class A1>
-                       int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); 
-               template<class R, class A1, class A2>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, 
-                                       TO to = to_max); 
-               template<class R, class A1, class A2, class A3>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       const A4 & a4, R & r, TO to = to_max);
-               template<class R, class A1, class A2, class A3, class A4, class A5>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       const A4 & a4, const A5 & a5, R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4, class A5,
-                       class A6>
-                               int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                               const A4 & a4, const A5 & a5, const A6 & a6,
-                                               R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4, class A5, 
-                       class A6, class A7>
-                               int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                               const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7,
-                                               R & r, TO to = to_max); 
-
-};
-
-template<class R> int 
-rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
-{
-       unmarshall u;
-       int intret = call1(proc, req, u, to);
-       if (intret < 0) return intret;
-       u >> r;
-       if(u.okdone() != true) {
-                fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
-                       "You are probably calling RPC 0x%x with wrong return "
-                       "type.\n", proc);
+class rpcc : private connection_delegate {
+    private:
+        using proc_id_t = rpc_protocol::proc_id_t;
+        template <class S>
+        using proc_t = rpc_protocol::proc_t<S>;
+        using nonce_t = rpc_protocol::nonce_t;
+        using xid_t = rpc_protocol::xid_t;
+
+        // manages per rpc info
+        struct caller {
+            caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {}
+
+            int xid;
+            string *rep;
+            int intret;
+            bool done = false;
+            mutex m;
+            cond c;
+        };
+
+        void get_latest_connection(shared_ptr<connection> & ch);
+        void update_xid_rep(xid_t xid, lock & m_lock);
+
+
+        sockaddr_in dst_;
+        nonce_t clt_nonce_;
+        nonce_t srv_nonce_ = 0;
+        bool bind_done_ = false;
+        int lossytest_ = 0;
+        bool reachable_ = true;
+
+        shared_ptr<connection> chan_;
+
+        mutex m_; // protect insert/delete to calls[]
+        mutex chan_m_;
+
+        bool destroy_wait_ = false;
+        cond destroy_wait_c_;
+
+        map<int, caller *> calls_;
+
+        // xid starts with 1 and latest received reply starts with 0
+        xid_t xid_ = 1;
+        list<xid_t> xid_rep_window_ = {0};
+
+        struct request {
+            void clear() { buf.clear(); xid = -1; }
+            bool isvalid() { return xid != -1; }
+            string buf;
+            xid_t xid = -1;
+        };
+        request dup_req_;
+        int xid_rep_done_ = -1;
+
+        int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req);
+
+        template<class R>
+        inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
+            string rep;
+            int intret = call1(proc, to, rep, req);
+            if (intret < 0) return intret;
+            unmarshall u(rep, true, r);
+            if (u.okdone() != true) {
+                LOG("rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
+                    "calling RPC 0x" << hex << proc << " with the wrong return type.");
                 VERIFY(0);
-               return rpc_const::unmarshal_reply_failure;
+                return rpc_protocol::unmarshall_reply_failure;
+            }
+            return intret;
         }
-       return intret;
-}
 
-template<class R> int
-rpcc::call(unsigned int proc, R & r, TO to) 
-{
-       marshall m;
-       return call_m(proc, m, r, to);
-}
+        bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
-template<class R, class A1> int
-rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       return call_m(proc, m, r, to);
-}
+    public:
 
-template<class R, class A1, class A2> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1, class A2, class A3> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       return call_m(proc, m, r, to);
-}
+        rpcc(const string & d);
+        ~rpcc();
 
-template<class R, class A1, class A2, class A3, class A4> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       return call_m(proc, m, r, to);
-}
+        nonce_t id() { return clt_nonce_; }
 
-template<class R, class A1, class A2, class A3, class A4, class A5> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       return call_m(proc, m, r, to);
-}
+        int bind(milliseconds to = rpc::to_max);
 
-template<class R, class A1, class A2, class A3, class A4, class A5,
-       class A6> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, 
-               const A6 & a6, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       m << a6;
-       return call_m(proc, m, r, to);
-}
+        void set_reachable(bool r) { reachable_ = r; }
 
-template<class R, class A1, class A2, class A3, class A4, class A5,
-       class A6, class A7> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, 
-               const A6 & a6, const A7 & a7,
-               R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       m << a6;
-       m << a7;
-       return call_m(proc, m, r, to);
-}
+        void cancel();
 
-bool operator<(const sockaddr_in &a, const sockaddr_in &b);
+        template<class P, class R, typename ...Args>
+        inline int call(proc_t<P> proc, R & r, const Args & ... args) {
+            return call_timeout(proc, rpc::to_max, r, args...);
+        }
 
-class handler {
-       public:
-               handler() { }
-               virtual ~handler() { }
-               virtual int fn(unmarshall &, marshall &) = 0;
+        template<class P, class R, typename ...Args>
+        inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
+            static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
+            return call_m(proc.id, to, r, forward<marshall>(marshall(args...)));
+        }
 };
 
-
 // rpc server endpoint.
-class rpcs : public chanmgr {
-
-       typedef enum {
-               NEW,  // new RPC, not a duplicate
-               INPROGRESS, // duplicate of an RPC we're still processing
-               DONE, // duplicate of an RPC we already replied to (have reply)
-               FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
-       } rpcstate_t;
-
-       private:
+class rpcs : private connection_delegate {
+    private:
+        using proc_id_t = rpc_protocol::proc_id_t;
+        template <class S>
+        using proc_t = rpc_protocol::proc_t<S>;
+        using nonce_t = rpc_protocol::nonce_t;
+        using xid_t = rpc_protocol::xid_t;
+
+        typedef enum {
+            NEW,  // new RPC, not a duplicate
+            INPROGRESS, // duplicate of an RPC we're still processing
+            DONE, // duplicate of an RPC we already replied to (have reply)
+            FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
+        } rpcstate_t;
 
         // state about an in-progress or completed RPC, for at-most-once.
         // if cb_present is true, then the RPC is complete and a reply
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
-       struct reply_t {
-               reply_t (unsigned int _xid) {
-                       xid = _xid;
-                       cb_present = false;
-                       buf = NULL;
-                       sz = 0;
-               }
-               reply_t (unsigned int _xid, char *_buf, int _sz) {
-                       xid = _xid;
-                       cb_present = true;
-                       buf = _buf;
-                       sz = _sz;
-               }
-               unsigned int xid;
-               bool cb_present; // whether the reply buffer is valid
-               char *buf;      // the reply buffer
-               int sz;         // the size of reply buffer
-       };
-
-       int port_;
-       unsigned int nonce_;
-
-       // provide at most once semantics by maintaining a window of replies
-       // per client that that client hasn't acknowledged receiving yet.
+        struct reply_t {
+            reply_t (xid_t _xid) : xid(_xid), cb_present(false) {}
+            reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
+            xid_t xid;
+            bool cb_present; // whether the reply buffer is valid
+            string buf;      // the reply buffer
+        };
+
+        in_port_t port_;
+        nonce_t nonce_;
+
+        // provide at most once semantics by maintaining a window of replies
+        // per client that that client hasn't acknowledged receiving yet.
         // indexed by client nonce.
-       std::map<unsigned int, std::list<reply_t> > reply_window_;
-
-       void free_reply_window(void);
-       void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
-
-       rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
-                       unsigned int xid, unsigned int rep_xid,
-                       char **b, int *sz);
-
-       void updatestat(unsigned int proc);
-
-       // latest connection to the client
-       std::map<unsigned int, connection *> conns_;
-
-       // counting
-       const int counting_;
-       int curr_counts_;
-       std::map<int, int> counts_;
-
-       int lossytest_; 
-       bool reachable_;
-
-       // map proc # to function
-       std::map<int, handler *> procs_;
-
-       std::mutex procs_m_; // protect insert/delete to procs[]
-       std::mutex count_m_;  //protect modification of counts
-       std::mutex reply_window_m_; // protect reply window et al
-       std::mutex conss_m_; // protect conns_
-
-
-       protected:
-
-       struct djob_t {
-               djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
-               char *buf;
-               int sz;
-               connection *conn;
-       };
-       void dispatch(djob_t *);
-
-       // internal handler registration
-       void reg1(unsigned int proc, handler *);
-
-       ThrPool* dispatchpool_;
-       tcpsconn* listener_;
-
-       public:
-       rpcs(unsigned int port, int counts=0);
-       ~rpcs();
-        inline int port() { return listener_->port(); }
-       //RPC handler for clients binding
-       int rpcbind(int a, int &r);
-
-       void set_reachable(bool r) { reachable_ = r; }
-
-       bool got_pdu(connection *c, char *b, int sz);
-
-       // register a handler
-       template<class S, class A1, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
-       template<class S, class A1, class A2, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, 
-                                       R & r));
-       template<class S, class A1, class A2, class A3, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, const A4, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, const A4, const A5, 
-                                       R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
-               class R>
-                       void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                               const A3, const A4, const A5, 
-                                               const A6, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
-               class A7, class R>
-                       void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                               const A3, const A4, const A5, 
-                                               const A6, const A7,
-                                               R & r));
-};
+        map<nonce_t, list<reply_t>> reply_window_;
 
-template<class S, class A1, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               R r;
-                               args >> a1;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
 
-template<class S, class A1, class A2, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
+                xid_t rep_xid, string & b);
 
-template<class S, class A1, class A2, class A3, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        // latest connection to the client
+        map<nonce_t, shared_ptr<connection>> conns_;
 
-template<class S, class A1, class A2, class A3, class A4, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, R & r))
-                               : sob(xsob), meth(xmeth)  { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        bool reachable_ = true;
 
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        // map proc # to function
+        map<proc_id_t, handler *> procs_;
 
-template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, const A6 a6, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, const A6 a6, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, const A6 a6, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               A6 a6;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               args >> a6;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        mutex procs_m_; // protect insert/delete to procs[]
+        mutex reply_window_m_; // protect reply window et al
+        mutex conns_m_; // protect conns_
 
-template<class S, class A1, class A2, class A3, class A4, class A5, 
-       class A6, class A7, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, const A6 a6,
-                       const A7 a7, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, const A6 a6, const A7 a7, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, const A6 a6,
-                                               const A7 a7, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               A6 a6;
-                               A7 a7;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               args >> a6;
-                               args >> a7;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+        void dispatch(shared_ptr<connection> c, const string & buf);
+
+        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
+        unique_ptr<connection_listener> listener_;
+
+        // RPC handler for clients binding
+        rpc_protocol::status rpcbind(nonce_t & r);
+
+        bool got_pdu(const shared_ptr<connection> & c, const string & b);
 
+    public:
 
-void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const char *host, const char *port,
-               struct sockaddr_in *dst);
+        rpcs(in_port_t port);
+        ~rpcs();
+
+        void set_reachable(bool r) { reachable_ = r; }
+
+        template<class P, class F, class C=void> inline void reg(proc_t<P> proc, F f, C *c=nullptr) {
+            static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
+            struct ReturnOnFailure {
+                static inline int unmarshall_args_failure() {
+                    return rpc_protocol::unmarshall_args_failure;
+                }
+            };
+            lock pl(procs_m_);
+            VERIFY(procs_.count(proc.id) == 0);
+            procs_[proc.id] = marshalled_func<F, ReturnOnFailure>::wrap(f, c);
+            VERIFY(procs_.count(proc.id) >= 1);
+        }
+
+        void start();
+};
 
 #endif