-
-// rpc server endpoint.
-class rpcs : public chanmgr {
-
- typedef enum {
- NEW, // new RPC, not a duplicate
- INPROGRESS, // duplicate of an RPC we're still processing
- DONE, // duplicate of an RPC we already replied to (have reply)
- FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
- } rpcstate_t;
-
- private:
-
- // state about an in-progress or completed RPC, for at-most-once.
- // if cb_present is true, then the RPC is complete and a reply
- // has been sent; in that case buf points to a copy of the reply,
- // and sz holds the size of the reply.
- struct reply_t {
- reply_t (unsigned int _xid) {
- xid = _xid;
- cb_present = false;
- buf = NULL;
- sz = 0;
- }
- reply_t (unsigned int _xid, char *_buf, int _sz) {
- xid = _xid;
- cb_present = true;
- buf = _buf;
- sz = _sz;
- }
- unsigned int xid;
- bool cb_present; // whether the reply buffer is valid
- char *buf; // the reply buffer
- int sz; // the size of reply buffer
- };
-
- int port_;
- unsigned int nonce_;
-
- // provide at most once semantics by maintaining a window of replies
- // per client that that client hasn't acknowledged receiving yet.
- // indexed by client nonce.
- std::map<unsigned int, std::list<reply_t> > reply_window_;
-
- void free_reply_window(void);
- void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
-
- rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
- unsigned int xid, unsigned int rep_xid,
- char **b, int *sz);
-
- void updatestat(unsigned int proc);
-
- // latest connection to the client
- std::map<unsigned int, connection *> conns_;
-
- // counting
- const int counting_;
- int curr_counts_;
- std::map<int, int> counts_;
-
- int lossytest_;
- bool reachable_;
-
- // map proc # to function
- std::map<int, handler *> procs_;
-
- std::mutex procs_m_; // protect insert/delete to procs[]
- std::mutex count_m_; //protect modification of counts
- std::mutex reply_window_m_; // protect reply window et al
- std::mutex conss_m_; // protect conns_
-
-
- protected:
-
- struct djob_t {
- djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
- char *buf;
- int sz;
- connection *conn;
- };
- void dispatch(djob_t *);
-
- // internal handler registration
- void reg1(unsigned int proc, handler *);
-
- ThrPool* dispatchpool_;
- tcpsconn* listener_;
-
- public:
- rpcs(unsigned int port, int counts=0);
- ~rpcs();
- inline int port() { return listener_->port(); }
- //RPC handler for clients binding
- int rpcbind(int a, int &r);
-
- void set_reachable(bool r) { reachable_ = r; }
-
- bool got_pdu(connection *c, char *b, int sz);
-
- // register a handler
- template<class S, class A1, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
- template<class S, class A1, class A2, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2,
- R & r));
- template<class S, class A1, class A2, class A3, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, R & r));
- template<class S, class A1, class A2, class A3, class A4, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
- class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- const A6, R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
- class A7, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- const A6, const A7,
- R & r));
-};
-
-template<class S, class A1, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- R r;
- args >> a1;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- R r;
- args >> a1;
- args >> a2;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, const A6 a6,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, const A6 a6, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, const A6 a6, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- A6 a6;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- args >> a6;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class A5,
- class A6, class A7, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, const A6 a6,
- const A7 a7, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, const A6 a6, const A7 a7, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, const A6 a6,
- const A7 a7, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- A6 a6;
- A7 a7;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- args >> a6;
- args >> a7;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-
-void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
-void make_sockaddr(const char *host, const char *port,
- struct sockaddr_in *dst);
-