X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/4e881433f37417ccbda89c09ffdf936855d462d4..f0dcb6b97d6d40f67698d1f71ac26970f1776f82:/rpc/rpc.h?ds=sidebyside diff --git a/rpc/rpc.h b/rpc/rpc.h index 3ae7737..211c717 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -52,29 +52,30 @@ class rpcc : private connection_delegate { cond c; }; - void get_refconn(shared_ptr & ch); - void update_xid_rep(xid_t xid); + void get_latest_connection(shared_ptr & ch); + void update_xid_rep(xid_t xid, lock & m_lock); sockaddr_in dst_; nonce_t clt_nonce_; - nonce_t srv_nonce_; - bool bind_done_; - xid_t xid_; - int lossytest_; - bool retrans_; - bool reachable_; + nonce_t srv_nonce_ = 0; + bool bind_done_ = false; + int lossytest_ = 0; + bool reachable_ = true; shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; - bool destroy_wait_; + bool destroy_wait_ = false; cond destroy_wait_c_; map calls_; - list xid_rep_window_; + + // xid starts with 1 and latest received reply starts with 0 + xid_t xid_ = 1; + list xid_rep_window_ = {0}; struct request { void clear() { buf.clear(); xid = -1; } @@ -83,22 +84,21 @@ class rpcc : private connection_delegate { xid_t xid = -1; }; request dup_req_; - int xid_rep_done_; + int xid_rep_done_ = -1; - int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to); + int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req); template - int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) { + inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) { string rep; - int intret = call1(proc, req, rep, to); - unmarshall u(rep, true); + int intret = call1(proc, to, rep, req); if (intret < 0) return intret; - u >> r; + unmarshall u(rep, true, r); if (u.okdone() != true) { LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << "calling RPC 0x" << hex << proc << " with the wrong return type."); VERIFY(0); - return rpc_protocol::unmarshal_reply_failure; + return rpc_protocol::unmarshall_reply_failure; } return intret; } @@ -107,7 +107,7 @@ class rpcc : private connection_delegate { public: - rpcc(const string & d, bool retrans=true); + rpcc(const string & d); ~rpcc(); nonce_t id() { return clt_nonce_; } @@ -119,15 +119,14 @@ class rpcc : private connection_delegate { void cancel(); template - inline int call(proc_t

proc, R & r, const Args&... args) { + inline int call(proc_t

proc, R & r, const Args & ... args) { return call_timeout(proc, rpc::to_max, r, args...); } template - inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args&... args) { + inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args & ... args) { static_assert(is_valid_call::value, "RPC called with incorrect argument types"); - marshall m{args...}; - return call_m(proc.id, m, r, to); + return call_m(proc.id, to, r, forward(marshall(args...))); } }; @@ -167,16 +166,15 @@ class rpcs : private connection_delegate { // indexed by client nonce. map> reply_window_; - void free_reply_window(void); void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); - rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, + rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid, xid_t rep_xid, string & b); // latest connection to the client map> conns_; - bool reachable_; + bool reachable_ = true; // map proc # to function map procs_; @@ -187,14 +185,11 @@ class rpcs : private connection_delegate { void dispatch(shared_ptr c, const string & buf); - // internal handler registration - void reg1(proc_id_t proc, handler *); - - unique_ptr dispatchpool_; + unique_ptr dispatchpool_{new thread_pool(6, false)}; unique_ptr listener_; // RPC handler for clients binding - rpc_protocol::status rpcbind(nonce_t &r); + rpc_protocol::status rpcbind(nonce_t & r); bool got_pdu(const shared_ptr & c, const string & b); @@ -209,10 +204,13 @@ class rpcs : private connection_delegate { static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); struct ReturnOnFailure { static inline int unmarshall_args_failure() { - return rpc_protocol::unmarshal_args_failure; + return rpc_protocol::unmarshall_args_failure; } }; - reg1(proc.id, marshalled_func::wrap(f, c)); + lock pl(procs_m_); + VERIFY(procs_.count(proc.id) == 0); + procs_[proc.id] = marshalled_func::wrap(f, c); + VERIFY(procs_.count(proc.id) >= 1); } void start();