X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/6623ac357055b95ce4fc0cbe9c5dc15524a9f20c..4a160f880ce46153acb23b137f30fd588df5fb9d:/rpc/rpc.h diff --git a/rpc/rpc.h b/rpc/rpc.h index 4f9a231..84c12f3 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -30,10 +30,10 @@ class rpc_const { // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, -class rpcc : public connection_delegate { +class rpcc : private connection_delegate { private: - //manages per rpc info + // manages per rpc info struct caller { caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} @@ -81,7 +81,23 @@ class rpcc : public connection_delegate { int call1(proc_t proc, marshall &req, string &rep, milliseconds to); template - int call_m(proc_t proc, marshall &req, R & r, milliseconds to); + int call_m(proc_t proc, marshall &req, R & r, milliseconds to) { + string rep; + int intret = call1(proc, req, rep, to); + unmarshall u(rep, true); + if (intret < 0) return intret; + u >> r; + if (u.okdone() != true) { + cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; + } + + bool got_pdu(const shared_ptr & c, const string & b); + public: rpcc(const string & d, bool retrans=true); @@ -95,136 +111,105 @@ class rpcc : public connection_delegate { void cancel(); - bool got_pdu(const shared_ptr & c, const string & b); - template - inline int call(proc_t proc, R & r, const Args&... args); + inline int call(proc_t proc, R & r, const Args&... args) { + return call_timeout(proc, rpc::to_max, r, args...); + } template - inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args); + inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args) { + marshall m{args...}; + return call_m(proc, m, r, to); + } }; -template int -rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) -{ - string rep; - int intret = call1(proc, req, rep, to); - unmarshall u(rep, true); - if (intret < 0) return intret; - u >> r; - if (u.okdone() != true) { - cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << - "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; - VERIFY(0); - return rpc_const::unmarshal_reply_failure; - } - return intret; -} - -template inline int -rpcc::call(proc_t proc, R & r, const Args&... args) -{ - return call_timeout(proc, rpc::to_max, r, args...); -} - -template inline int -rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args) -{ - marshall m{args...}; - return call_m(proc, m, r, to); -} - // rpc server endpoint. -class rpcs : public connection_delegate { - - typedef enum { - NEW, // new RPC, not a duplicate - INPROGRESS, // duplicate of an RPC we're still processing - DONE, // duplicate of an RPC we already replied to (have reply) - FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten - } rpcstate_t; - +class rpcs : private connection_delegate { private: + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; + // state about an in-progress or completed RPC, for at-most-once. // if cb_present is true, then the RPC is complete and a reply // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. - struct reply_t { - reply_t (int _xid) : xid(_xid), cb_present(false) {} - reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} - int xid; - bool cb_present; // whether the reply buffer is valid - string buf; // the reply buffer - }; - - in_port_t port_; - unsigned int nonce_; - - // provide at most once semantics by maintaining a window of replies - // per client that that client hasn't acknowledged receiving yet. + struct reply_t { + reply_t (int _xid) : xid(_xid), cb_present(false) {} + reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} + int xid; + bool cb_present; // whether the reply buffer is valid + string buf; // the reply buffer + }; + + in_port_t port_; + unsigned int nonce_; + + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - map > reply_window_; + map> reply_window_; - void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, const string & b); + void free_reply_window(void); + void add_reply(unsigned int clt_nonce, int xid, const string & b); - rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, string & b); + rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, + int xid, int rep_xid, string & b); - void updatestat(proc_t proc); + void updatestat(proc_t proc); - // latest connection to the client - map> conns_; + // latest connection to the client + map> conns_; - // counting - const size_t counting_; - size_t curr_counts_; - map counts_; + // counting + const size_t counting_; + size_t curr_counts_; + map counts_; - bool reachable_; + bool reachable_; - // map proc # to function - map procs_; + // map proc # to function + map procs_; - mutex procs_m_; // protect insert/delete to procs[] - mutex count_m_; //protect modification of counts - mutex reply_window_m_; // protect reply window et al - mutex conns_m_; // protect conns_ + mutex procs_m_; // protect insert/delete to procs[] + mutex count_m_; // protect modification of counts + mutex reply_window_m_; // protect reply window et al + mutex conns_m_; // protect conns_ + void dispatch(shared_ptr c, const string & buf); - protected: + // internal handler registration + void reg1(proc_t proc, handler *); - void dispatch(shared_ptr c, const string & buf); + unique_ptr dispatchpool_; + unique_ptr listener_; - // internal handler registration - void reg1(proc_t proc, handler *); + // RPC handler for clients binding + int rpcbind(unsigned int &r, int a); - unique_ptr dispatchpool_; - unique_ptr listener_; + bool got_pdu(const shared_ptr & c, const string & b); public: - rpcs(in_port_t port, size_t counts=0); - ~rpcs(); - inline in_port_t port() { return listener_->port(); } - //RPC handler for clients binding - int rpcbind(unsigned int &r, int a); - void set_reachable(bool r) { reachable_ = r; } + rpcs(in_port_t port, size_t counts=0); + ~rpcs(); - bool got_pdu(const shared_ptr & c, const string & b); + void set_reachable(bool r) { reachable_ = r; } - struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; + template void reg(proc_t proc, F f, C *c=nullptr) { + struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } + }; + reg1(proc, marshalled_func::wrap(f, c)); } - }; - - template void reg(proc_t proc, F f, C *c=nullptr) { - reg1(proc, marshalled_func::wrap(f, c)); - } - void start(); + void start(); }; #endif