X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/6b5e09540e9392a7015fae1ad3b01b0973600ff2..f0dcb6b97d6d40f67698d1f71ac26970f1776f82:/rpc/rpc.h diff --git a/rpc/rpc.h b/rpc/rpc.h index 7b65101..211c717 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -37,10 +37,12 @@ class rpcc : private connection_delegate { using proc_id_t = rpc_protocol::proc_id_t; template using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; // manages per rpc info struct caller { - caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} + caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {} int xid; string *rep; @@ -50,53 +52,53 @@ class rpcc : private connection_delegate { cond c; }; - void get_refconn(shared_ptr & ch); - void update_xid_rep(int xid); + void get_latest_connection(shared_ptr & ch); + void update_xid_rep(xid_t xid, lock & m_lock); sockaddr_in dst_; - unsigned int clt_nonce_; - unsigned int srv_nonce_; - bool bind_done_; - int xid_; - int lossytest_; - bool retrans_; - bool reachable_; + nonce_t clt_nonce_; + nonce_t srv_nonce_ = 0; + bool bind_done_ = false; + int lossytest_ = 0; + bool reachable_ = true; shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; - bool destroy_wait_; + bool destroy_wait_ = false; cond destroy_wait_c_; map calls_; - list xid_rep_window_; + + // xid starts with 1 and latest received reply starts with 0 + xid_t xid_ = 1; + list xid_rep_window_ = {0}; struct request { void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } string buf; - int xid = -1; + xid_t xid = -1; }; request dup_req_; - int xid_rep_done_; + int xid_rep_done_ = -1; - int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to); + int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req); template - int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) { + inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) { string rep; - int intret = call1(proc, req, rep, to); - unmarshall u(rep, true); + int intret = call1(proc, to, rep, req); if (intret < 0) return intret; - u >> r; + unmarshall u(rep, true, r); if (u.okdone() != true) { LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << "calling RPC 0x" << hex << proc << " with the wrong return type."); VERIFY(0); - return rpc_protocol::unmarshal_reply_failure; + return rpc_protocol::unmarshall_reply_failure; } return intret; } @@ -105,10 +107,10 @@ class rpcc : private connection_delegate { public: - rpcc(const string & d, bool retrans=true); + rpcc(const string & d); ~rpcc(); - unsigned int id() { return clt_nonce_; } + nonce_t id() { return clt_nonce_; } int bind(milliseconds to = rpc::to_max); @@ -117,15 +119,14 @@ class rpcc : private connection_delegate { void cancel(); template - inline int call(proc_t

proc, R & r, const Args&... args) { + inline int call(proc_t

proc, R & r, const Args & ... args) { return call_timeout(proc, rpc::to_max, r, args...); } template - inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args&... args) { + inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args & ... args) { static_assert(is_valid_call::value, "RPC called with incorrect argument types"); - marshall m{args...}; - return call_m(proc.id, m, r, to); + return call_m(proc.id, to, r, forward(marshall(args...))); } }; @@ -135,6 +136,8 @@ class rpcs : private connection_delegate { using proc_id_t = rpc_protocol::proc_id_t; template using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; typedef enum { NEW, // new RPC, not a duplicate @@ -148,75 +151,66 @@ class rpcs : private connection_delegate { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (int _xid) : xid(_xid), cb_present(false) {} - reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} - int xid; + reply_t (xid_t _xid) : xid(_xid), cb_present(false) {} + reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} + xid_t xid; bool cb_present; // whether the reply buffer is valid string buf; // the reply buffer }; in_port_t port_; - unsigned int nonce_; + nonce_t nonce_; // provide at most once semantics by maintaining a window of replies // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - map> reply_window_; - - void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, const string & b); + map> reply_window_; - rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, string & b); + void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); - void updatestat(proc_id_t proc); + rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid, + xid_t rep_xid, string & b); // latest connection to the client - map> conns_; + map> conns_; - // counting - const size_t counting_; - size_t curr_counts_; - map counts_; - - bool reachable_; + bool reachable_ = true; // map proc # to function map procs_; mutex procs_m_; // protect insert/delete to procs[] - mutex count_m_; // protect modification of counts mutex reply_window_m_; // protect reply window et al mutex conns_m_; // protect conns_ void dispatch(shared_ptr c, const string & buf); - // internal handler registration - void reg1(proc_id_t proc, handler *); - - unique_ptr dispatchpool_; - unique_ptr listener_; + unique_ptr dispatchpool_{new thread_pool(6, false)}; + unique_ptr listener_; // RPC handler for clients binding - rpc_protocol::status rpcbind(unsigned int &r, int a); + rpc_protocol::status rpcbind(nonce_t & r); bool got_pdu(const shared_ptr & c, const string & b); public: - rpcs(in_port_t port, size_t counts=0); + rpcs(in_port_t port); ~rpcs(); void set_reachable(bool r) { reachable_ = r; } - template void reg(proc_t

proc, F f, C *c=nullptr) { + template inline void reg(proc_t

proc, F f, C *c=nullptr) { static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); struct ReturnOnFailure { static inline int unmarshall_args_failure() { - return rpc_protocol::unmarshal_args_failure; + return rpc_protocol::unmarshall_args_failure; } }; - reg1(proc.id, marshalled_func::wrap(f, c)); + lock pl(procs_m_); + VERIFY(procs_.count(proc.id) == 0); + procs_[proc.id] = marshalled_func::wrap(f, c); + VERIFY(procs_.count(proc.id) >= 1); } void start();