X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/4b9798f44ae94deabf87dd534337b55259272950..2546a41ad36fdc9ef6471cb35a1d56930ae1b527:/rpc/rpc.h diff --git a/rpc/rpc.h b/rpc/rpc.h index 245f277..d81a5dd 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -10,10 +10,11 @@ #include "thr_pool.h" #include "marshall.h" #include "connection.h" +#include "lock.h" -#ifdef DMALLOC -#include "dmalloc.h" -#endif +using std::string; +using std::map; +using std::list; class rpc_const { public: @@ -27,12 +28,6 @@ class rpc_const { static const int cancel_failure = -7; }; -struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; - } -}; - // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, @@ -42,53 +37,53 @@ class rpcc : public chanmgr { //manages per rpc info struct caller { - caller(unsigned int xxid, unmarshall *un); + caller(int xxid, unmarshall *un); ~caller(); - unsigned int xid; + int xid; unmarshall *un; int intret; bool done; - std::mutex m; - std::condition_variable c; + mutex m; + cond c; }; void get_refconn(connection **ch); - void update_xid_rep(unsigned int xid); + void update_xid_rep(int xid); sockaddr_in dst_; unsigned int clt_nonce_; unsigned int srv_nonce_; bool bind_done_; - unsigned int xid_; + int xid_; int lossytest_; bool retrans_; bool reachable_; connection *chan_; - std::mutex m_; // protect insert/delete to calls[] - std::mutex chan_m_; + mutex m_; // protect insert/delete to calls[] + mutex chan_m_; bool destroy_wait_; - std::condition_variable destroy_wait_c_; - - std::map calls_; - std::list xid_rep_window_; - - struct request { - request() { clear(); } - void clear() { buf.clear(); xid = -1; } - bool isvalid() { return xid != -1; } - std::string buf; - int xid; - }; - struct request dup_req_; - int xid_rep_done_; + cond destroy_wait_c_; + + map calls_; + list xid_rep_window_; + + struct request { + request() { clear(); } + void clear() { buf.clear(); xid = -1; } + bool isvalid() { return xid != -1; } + string buf; + int xid; + }; + struct request dup_req_; + int xid_rep_done_; public: - rpcc(sockaddr_in d, bool retrans=true); + rpcc(const string & d, bool retrans=true); ~rpcc(); struct TO { @@ -105,27 +100,26 @@ class rpcc : public chanmgr { void set_reachable(bool r) { reachable_ = r; } void cancel(); - - int islossy() { return lossytest_ > 0; } - int call1(unsigned int proc, - marshall &req, unmarshall &rep, TO to); + int islossy() { return lossytest_ > 0; } - bool got_pdu(connection *c, char *b, int sz); + int call1(proc_t proc, + marshall &req, unmarshall &rep, TO to); + bool got_pdu(connection *c, char *b, size_t sz); template - int call_m(unsigned int proc, marshall &req, R & r, TO to); + int call_m(proc_t proc, marshall &req, R & r, TO to); template - inline int call(unsigned int proc, R & r, const Args&... args); + inline int call(proc_t proc, R & r, const Args&... args); template - inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args); + inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args); }; template int -rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) +rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) { unmarshall u; int intret = call1(proc, req, u, to); @@ -142,13 +136,13 @@ rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) } template inline int -rpcc::call(unsigned int proc, R & r, const Args&... args) +rpcc::call(proc_t proc, R & r, const Args&... args) { return call_timeout(proc, rpcc::to_max, r, args...); } template inline int -rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args) +rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) { marshall m{args...}; return call_m(proc, m, r, to); @@ -173,97 +167,102 @@ class rpcs : public chanmgr { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (unsigned int _xid) { + reply_t (int _xid) { xid = _xid; cb_present = false; buf = NULL; sz = 0; } - reply_t (unsigned int _xid, char *_buf, int _sz) { + reply_t (int _xid, char *_buf, size_t _sz) { xid = _xid; cb_present = true; buf = _buf; sz = _sz; } - unsigned int xid; + int xid; bool cb_present; // whether the reply buffer is valid char *buf; // the reply buffer - int sz; // the size of reply buffer + size_t sz; // the size of reply buffer }; - int port_; + unsigned int port_; unsigned int nonce_; // provide at most once semantics by maintaining a window of replies // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - std::map > reply_window_; + map > reply_window_; void free_reply_window(void); - void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); + void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz); rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - unsigned int xid, unsigned int rep_xid, - char **b, int *sz); + int xid, int rep_xid, + char **b, size_t *sz); - void updatestat(unsigned int proc); + void updatestat(proc_t proc); // latest connection to the client - std::map conns_; + map conns_; // counting - const int counting_; - int curr_counts_; - std::map counts_; + const size_t counting_; + size_t curr_counts_; + map counts_; int lossytest_; bool reachable_; // map proc # to function - std::map procs_; + map procs_; - std::mutex procs_m_; // protect insert/delete to procs[] - std::mutex count_m_; //protect modification of counts - std::mutex reply_window_m_; // protect reply window et al - std::mutex conss_m_; // protect conns_ + mutex procs_m_; // protect insert/delete to procs[] + mutex count_m_; //protect modification of counts + mutex reply_window_m_; // protect reply window et al + mutex conss_m_; // protect conns_ protected: struct djob_t { - djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {} char *buf; - int sz; + size_t sz; connection *conn; }; void dispatch(djob_t *); // internal handler registration - void reg1(unsigned int proc, handler *); + void reg1(proc_t proc, handler *); ThrPool* dispatchpool_; tcpsconn* listener_; public: - rpcs(unsigned int port, int counts=0); + rpcs(unsigned int port, size_t counts=0); ~rpcs(); - inline int port() { return listener_->port(); } + inline unsigned int port() { return listener_->port(); } //RPC handler for clients binding - int rpcbind(int &r, int a); + int rpcbind(unsigned int &r, int a); void set_reachable(bool r) { reachable_ = r; } - bool got_pdu(connection *c, char *b, int sz); + bool got_pdu(connection *c, char *b, size_t sz); + + template void reg(proc_t proc, F f, C *c=nullptr); +}; - template void reg(unsigned int proc, F f, C *c=nullptr); +struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } }; -template void rpcs::reg(unsigned int proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); +template void rpcs::reg(proc_t proc, F f, C *c) { + reg1(proc, marshalled_func::wrap(f, c)); } -void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); -void make_sockaddr(const char *host, const char *port, - struct sockaddr_in *dst); +sockaddr_in make_sockaddr(const string &hostandport); +sockaddr_in make_sockaddr(const string &host, const string &port); #endif