X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/2546a41ad36fdc9ef6471cb35a1d56930ae1b527..b2609562b3d4fc548afcc0a3dfe4ff5fd4ae3d36:/rpc/rpc.h?ds=sidebyside diff --git a/rpc/rpc.h b/rpc/rpc.h index d81a5dd..02c7c62 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -1,20 +1,19 @@ #ifndef rpc_h #define rpc_h +#include "types.h" #include #include -#include -#include -#include #include "thr_pool.h" #include "marshall.h" +#include "marshall_wrap.h" #include "connection.h" -#include "lock.h" -using std::string; -using std::map; -using std::list; +namespace rpc { + static constexpr milliseconds to_max{12000}; + static constexpr milliseconds to_min{100}; +} class rpc_const { public: @@ -32,23 +31,21 @@ class rpc_const { // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, class rpcc : public chanmgr { - private: //manages per rpc info struct caller { - caller(int xxid, unmarshall *un); - ~caller(); + caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} int xid; - unmarshall *un; + string *rep; int intret; - bool done; + bool done = false; mutex m; cond c; }; - void get_refconn(connection **ch); + void get_refconn(shared_ptr & ch); void update_xid_rep(int xid); @@ -61,7 +58,7 @@ class rpcc : public chanmgr { bool retrans_; bool reachable_; - connection *chan_; + shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; @@ -73,62 +70,51 @@ class rpcc : public chanmgr { list xid_rep_window_; struct request { - request() { clear(); } void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } string buf; - int xid; + int xid = -1; }; - struct request dup_req_; + request dup_req_; int xid_rep_done_; + + int call1(proc_t proc, marshall &req, string &rep, milliseconds to); + + template + int call_m(proc_t proc, marshall &req, R & r, milliseconds to); public: rpcc(const string & d, bool retrans=true); ~rpcc(); - struct TO { - int to; - }; - static const TO to_max; - static const TO to_min; - static TO to(int x) { TO t; t.to = x; return t;} - unsigned int id() { return clt_nonce_; } - int bind(TO to = to_max); + int bind(milliseconds to = rpc::to_max); void set_reachable(bool r) { reachable_ = r; } void cancel(); - int islossy() { return lossytest_ > 0; } - - int call1(proc_t proc, - marshall &req, unmarshall &rep, TO to); - - bool got_pdu(connection *c, char *b, size_t sz); - - template - int call_m(proc_t proc, marshall &req, R & r, TO to); + bool got_pdu(const shared_ptr & c, const string & b); template inline int call(proc_t proc, R & r, const Args&... args); template - inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args); + inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args); }; template int -rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) +rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) { - unmarshall u; - int intret = call1(proc, req, u, to); + string rep; + int intret = call1(proc, req, rep, to); + unmarshall u(rep, true); if (intret < 0) return intret; u >> r; if (u.okdone() != true) { - fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." - "You are probably calling RPC 0x%x with wrong return " - "type.\n", proc); + cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; VERIFY(0); return rpc_const::unmarshal_reply_failure; } @@ -138,18 +124,16 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) template inline int rpcc::call(proc_t proc, R & r, const Args&... args) { - return call_timeout(proc, rpcc::to_max, r, args...); + return call_timeout(proc, rpc::to_max, r, args...); } template inline int -rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) +rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args) { marshall m{args...}; return call_m(proc, m, r, to); } -bool operator<(const sockaddr_in &a, const sockaddr_in &b); - // rpc server endpoint. class rpcs : public chanmgr { @@ -167,25 +151,14 @@ class rpcs : public chanmgr { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (int _xid) { - xid = _xid; - cb_present = false; - buf = NULL; - sz = 0; - } - reply_t (int _xid, char *_buf, size_t _sz) { - xid = _xid; - cb_present = true; - buf = _buf; - sz = _sz; - } + reply_t (int _xid) : xid(_xid), cb_present(false) {} + reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} int xid; bool cb_present; // whether the reply buffer is valid - char *buf; // the reply buffer - size_t sz; // the size of reply buffer + string buf; // the reply buffer }; - unsigned int port_; + in_port_t port_; unsigned int nonce_; // provide at most once semantics by maintaining a window of replies @@ -194,23 +167,21 @@ class rpcs : public chanmgr { map > reply_window_; void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz); + void add_reply(unsigned int clt_nonce, int xid, const string & b); rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, - char **b, size_t *sz); + int xid, int rep_xid, string & b); void updatestat(proc_t proc); // latest connection to the client - map conns_; + map> conns_; // counting const size_t counting_; size_t curr_counts_; map counts_; - int lossytest_; bool reachable_; // map proc # to function @@ -219,50 +190,41 @@ class rpcs : public chanmgr { mutex procs_m_; // protect insert/delete to procs[] mutex count_m_; //protect modification of counts mutex reply_window_m_; // protect reply window et al - mutex conss_m_; // protect conns_ + mutex conns_m_; // protect conns_ protected: - struct djob_t { - djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {} - char *buf; - size_t sz; - connection *conn; - }; - void dispatch(djob_t *); + void dispatch(shared_ptr c, const string & buf); // internal handler registration void reg1(proc_t proc, handler *); - ThrPool* dispatchpool_; - tcpsconn* listener_; + unique_ptr dispatchpool_; + unique_ptr listener_; public: - rpcs(unsigned int port, size_t counts=0); + rpcs(in_port_t port, size_t counts=0); ~rpcs(); - inline unsigned int port() { return listener_->port(); } + inline in_port_t port() { return listener_->port(); } //RPC handler for clients binding int rpcbind(unsigned int &r, int a); void set_reachable(bool r) { reachable_ = r; } - bool got_pdu(connection *c, char *b, size_t sz); + bool got_pdu(const shared_ptr & c, const string & b); - template void reg(proc_t proc, F f, C *c=nullptr); -}; + struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } + }; -struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; + template void reg(proc_t proc, F f, C *c=nullptr) { + reg1(proc, marshalled_func::wrap(f, c)); } -}; -template void rpcs::reg(proc_t proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); -} - -sockaddr_in make_sockaddr(const string &hostandport); -sockaddr_in make_sockaddr(const string &host, const string &port); + void start(); +}; #endif