X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/ba03b19875aa2e3586e49b10904563cdd3b91de0..f0dcb6b97d6d40f67698d1f71ac26970f1776f82:/rpc/rpc.h diff --git a/rpc/rpc.h b/rpc/rpc.h index 2b32e28..211c717 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -5,32 +5,44 @@ #include #include +#include "rpc_protocol.h" #include "thr_pool.h" #include "marshall.h" +#include "marshall_wrap.h" #include "connection.h" -class rpc_const { - public: - static const unsigned int bind = 1; // handler number reserved for bind - static const int timeout_failure = -1; - static const int unmarshal_args_failure = -2; - static const int unmarshal_reply_failure = -3; - static const int atmostonce_failure = -4; - static const int oldsrv_failure = -5; - static const int bind_failure = -6; - static const int cancel_failure = -7; -}; +namespace rpc { + static constexpr milliseconds to_max{12000}; + static constexpr milliseconds to_min{100}; +} + +template struct is_valid_call : false_type {}; + +template +struct is_valid_call : true_type {}; + +template struct is_valid_registration : false_type {}; + +template +struct is_valid_registration::type...), S(R &, Args...)> : true_type {}; + +template +struct is_valid_registration : is_valid_registration {}; // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, -class rpcc : public chanmgr { - +class rpcc : private connection_delegate { private: + using proc_id_t = rpc_protocol::proc_id_t; + template + using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; - //manages per rpc info + // manages per rpc info struct caller { - caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} + caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {} int xid; string *rep; @@ -40,200 +52,168 @@ class rpcc : public chanmgr { cond c; }; - void get_refconn(connection **ch); - void update_xid_rep(int xid); + void get_latest_connection(shared_ptr & ch); + void update_xid_rep(xid_t xid, lock & m_lock); sockaddr_in dst_; - unsigned int clt_nonce_; - unsigned int srv_nonce_; - bool bind_done_; - int xid_; - int lossytest_; - bool retrans_; - bool reachable_; + nonce_t clt_nonce_; + nonce_t srv_nonce_ = 0; + bool bind_done_ = false; + int lossytest_ = 0; + bool reachable_ = true; - connection *chan_; + shared_ptr chan_; mutex m_; // protect insert/delete to calls[] mutex chan_m_; - bool destroy_wait_; + bool destroy_wait_ = false; cond destroy_wait_c_; map calls_; - list xid_rep_window_; + + // xid starts with 1 and latest received reply starts with 0 + xid_t xid_ = 1; + list xid_rep_window_ = {0}; struct request { - request() { clear(); } void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } string buf; - int xid; + xid_t xid = -1; }; - struct request dup_req_; - int xid_rep_done_; + request dup_req_; + int xid_rep_done_ = -1; + + int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req); + + template + inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) { + string rep; + int intret = call1(proc, to, rep, req); + if (intret < 0) return intret; + unmarshall u(rep, true, r); + if (u.okdone() != true) { + LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " << + "calling RPC 0x" << hex << proc << " with the wrong return type."); + VERIFY(0); + return rpc_protocol::unmarshall_reply_failure; + } + return intret; + } + + bool got_pdu(const shared_ptr & c, const string & b); + public: - rpcc(const string & d, bool retrans=true); + rpcc(const string & d); ~rpcc(); - struct TO { - int to; - }; - static const TO to_max; - static const TO to_min; - static TO to(int x) { TO t; t.to = x; return t;} - - unsigned int id() { return clt_nonce_; } + nonce_t id() { return clt_nonce_; } - int bind(TO to = to_max); + int bind(milliseconds to = rpc::to_max); void set_reachable(bool r) { reachable_ = r; } void cancel(); - int islossy() { return lossytest_ > 0; } - - int call1(proc_t proc, marshall &req, string &rep, TO to); + template + inline int call(proc_t

proc, R & r, const Args & ... args) { + return call_timeout(proc, rpc::to_max, r, args...); + } - bool got_pdu(connection *c, const string & b); - - template - int call_m(proc_t proc, marshall &req, R & r, TO to); - - template - inline int call(proc_t proc, R & r, const Args&... args); - - template - inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args); + template + inline int call_timeout(proc_t

proc, milliseconds to, R & r, const Args & ... args) { + static_assert(is_valid_call::value, "RPC called with incorrect argument types"); + return call_m(proc.id, to, r, forward(marshall(args...))); + } }; -template int -rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) -{ - string rep; - int intret = call1(proc, req, rep, to); - unmarshall u(rep, true); - if (intret < 0) return intret; - u >> r; - if (u.okdone() != true) { - cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " << - "calling RPC 0x" << hex << proc << " with the wrong return type." << endl; - VERIFY(0); - return rpc_const::unmarshal_reply_failure; - } - return intret; -} - -template inline int -rpcc::call(proc_t proc, R & r, const Args&... args) -{ - return call_timeout(proc, rpcc::to_max, r, args...); -} - -template inline int -rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) -{ - marshall m{args...}; - return call_m(proc, m, r, to); -} - // rpc server endpoint. -class rpcs : public chanmgr { - - typedef enum { - NEW, // new RPC, not a duplicate - INPROGRESS, // duplicate of an RPC we're still processing - DONE, // duplicate of an RPC we already replied to (have reply) - FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten - } rpcstate_t; - +class rpcs : private connection_delegate { private: + using proc_id_t = rpc_protocol::proc_id_t; + template + using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; + + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; // state about an in-progress or completed RPC, for at-most-once. // if cb_present is true, then the RPC is complete and a reply // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. - struct reply_t { - reply_t (int _xid) : xid(_xid), cb_present(false) {} - reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} - int xid; - bool cb_present; // whether the reply buffer is valid - string buf; // the reply buffer - }; - - in_port_t port_; - unsigned int nonce_; - - // provide at most once semantics by maintaining a window of replies - // per client that that client hasn't acknowledged receiving yet. - // indexed by client nonce. - map > reply_window_; - - void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, const string & b); + struct reply_t { + reply_t (xid_t _xid) : xid(_xid), cb_present(false) {} + reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} + xid_t xid; + bool cb_present; // whether the reply buffer is valid + string buf; // the reply buffer + }; - rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, string & b); + in_port_t port_; + nonce_t nonce_; - void updatestat(proc_t proc); + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. + // indexed by client nonce. + map> reply_window_; - // latest connection to the client - map conns_; + void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); - // counting - const size_t counting_; - size_t curr_counts_; - map counts_; + rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid, + xid_t rep_xid, string & b); - bool reachable_; + // latest connection to the client + map> conns_; - // map proc # to function - map procs_; + bool reachable_ = true; - mutex procs_m_; // protect insert/delete to procs[] - mutex count_m_; //protect modification of counts - mutex reply_window_m_; // protect reply window et al - mutex conss_m_; // protect conns_ + // map proc # to function + map procs_; + mutex procs_m_; // protect insert/delete to procs[] + mutex reply_window_m_; // protect reply window et al + mutex conns_m_; // protect conns_ - protected: + void dispatch(shared_ptr c, const string & buf); - struct djob_t { - connection *conn; - string buf; - }; - void dispatch(djob_t *); + unique_ptr dispatchpool_{new thread_pool(6, false)}; + unique_ptr listener_; - // internal handler registration - void reg1(proc_t proc, handler *); + // RPC handler for clients binding + rpc_protocol::status rpcbind(nonce_t & r); - ThrPool* dispatchpool_; - tcpsconn *listener_; + bool got_pdu(const shared_ptr & c, const string & b); public: - rpcs(in_port_t port, size_t counts=0); - ~rpcs(); - inline in_port_t port() { return listener_->port(); } - //RPC handler for clients binding - int rpcbind(unsigned int &r, int a); - - void set_reachable(bool r) { reachable_ = r; } - bool got_pdu(connection *c, const string & b); + rpcs(in_port_t port); + ~rpcs(); - template void reg(proc_t proc, F f, C *c=nullptr); -}; + void set_reachable(bool r) { reachable_ = r; } -struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; - } + template inline void reg(proc_t

proc, F f, C *c=nullptr) { + static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); + struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_protocol::unmarshall_args_failure; + } + }; + lock pl(procs_m_); + VERIFY(procs_.count(proc.id) == 0); + procs_[proc.id] = marshalled_func::wrap(f, c); + VERIFY(procs_.count(proc.id) >= 1); + } + + void start(); }; -template void rpcs::reg(proc_t proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); -} - #endif