X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/d54215aea2a7321ab0f2dc7b0042fea2b7ff5df5..4e881433f37417ccbda89c09ffdf936855d462d4:/rpc/rpc.cc diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 964102f..0c3a97d 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -1,56 +1,56 @@ -/* - The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC - server. The jobs of rpcc include maintaining a connection to server, sending - RPC requests and waiting for responses, retransmissions, at-most-once delivery - etc. - - The rpcs class handles the server side of RPC. Each rpcs handles multiple - connections from different rpcc objects. The jobs of rpcs include accepting - connections, dispatching requests to registered RPC handlers, at-most-once - delivery etc. - - Both rpcc and rpcs use the connection class as an abstraction for the - underlying communication channel. To send an RPC request/reply, one calls - connection::send() which blocks until data is sent or the connection has - failed (thus the caller can free the buffer when send() returns). When a - request/reply is received, connection makes a callback into the corresponding - rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). - - Thread organization: - rpcc uses application threads to send RPC requests and blocks to receive the - reply or error. All connections use a single PollMgr object to perform async - socket IO. PollMgr creates a single thread to examine the readiness of socket - file descriptors and informs the corresponding connection whenever a socket is - ready to be read or written. (We use asynchronous socket IO to reduce the - number of threads needed to manage these connections; without async IO, at - least one thread is needed per connection to read data without blocking other - activities.) Each rpcs object creates one thread for listening on the server - port and a pool of threads for executing RPC requests. The thread pool allows - us to control the number of threads spawned at the server (spawning one thread - per request will hurt when the server faces thousands of requests). - - In order to delete a connection object, we must maintain a reference count. - For rpcc, multiple client threads might be invoking the rpcc::call() functions - and thus holding multiple references to the underlying connection object. For - rpcs, multiple dispatch threads might be holding references to the same - connection object. A connection object is deleted only when the underlying - connection is dead and the reference count reaches zero. - - This version of the RPC library explicitly joins exited threads to make sure - no outstanding references exist before deleting objects. - - To delete a rpcc object safely, the users of the library must ensure that - there are no outstanding calls on the rpcc object. - - To delete a rpcs object safely, we do the following in sequence: 1. stop - accepting new incoming connections. 2. close existing active connections. 3. - delete the dispatch thread pool which involves waiting for current active RPC - handlers to finish. It is interesting how a thread pool can be deleted - without using thread cancellation. The trick is to inject x "poison pills" for - a thread pool of x threads. Upon getting a poison pill instead of a normal - task, a worker thread will exit (and thread pool destructor waits to join all - x exited worker threads). - */ +// +// The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC +// server. The jobs of rpcc include maintaining a connection to server, sending +// RPC requests and waiting for responses, retransmissions, at-most-once delivery +// etc. +// +// The rpcs class handles the server side of RPC. Each rpcs handles multiple +// connections from different rpcc objects. The jobs of rpcs include accepting +// connections, dispatching requests to registered RPC handlers, at-most-once +// delivery etc. +// +// Both rpcc and rpcs use the connection class as an abstraction for the +// underlying communication channel. To send an RPC request/reply, one calls +// connection::send() which blocks until data is sent or the connection has +// failed (thus the caller can free the buffer when send() returns). When a +// request/reply is received, connection makes a callback into the corresponding +// rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). +// +// Thread organization: +// rpcc uses application threads to send RPC requests and blocks to receive the +// reply or error. All connections use a single PollMgr object to perform async +// socket IO. PollMgr creates a single thread to examine the readiness of socket +// file descriptors and informs the corresponding connection whenever a socket is +// ready to be read or written. (We use asynchronous socket IO to reduce the +// number of threads needed to manage these connections; without async IO, at +// least one thread is needed per connection to read data without blocking other +// activities.) Each rpcs object creates one thread for listening on the server +// port and a pool of threads for executing RPC requests. The thread pool allows +// us to control the number of threads spawned at the server (spawning one thread +// per request will hurt when the server faces thousands of requests). +// +// In order to delete a connection object, we must maintain a reference count. +// For rpcc, multiple client threads might be invoking the rpcc::call() functions +// and thus holding multiple references to the underlying connection object. For +// rpcs, multiple dispatch threads might be holding references to the same +// connection object. A connection object is deleted only when the underlying +// connection is dead and the reference count reaches zero. +// +// This version of the RPC library explicitly joins exited threads to make sure +// no outstanding references exist before deleting objects. +// +// To delete a rpcc object safely, the users of the library must ensure that +// there are no outstanding calls on the rpcc object. +// +// To delete a rpcs object safely, we do the following in sequence: 1. stop +// accepting new incoming connections. 2. close existing active connections. 3. +// delete the dispatch thread pool which involves waiting for current active RPC +// handlers to finish. It is interesting how a thread pool can be deleted +// without using thread cancellation. The trick is to inject x "poison pills" for +// a thread pool of x threads. Upon getting a poison pill instead of a normal +// task, a worker thread will exit (and thread pool destructor waits to join all +// x exited worker threads). +// #include "rpc.h" @@ -58,6 +58,7 @@ #include #include #include +#include inline void set_rand_seed() { auto now = time_point_cast(steady_clock::now()); @@ -72,7 +73,7 @@ rpcc::rpcc(const string & d, bool retrans) : { if (retrans) { set_rand_seed(); - clt_nonce_ = (unsigned int)random(); + clt_nonce_ = (nonce_t)random(); } else { // special client nonce 0 means this client does not // require at-most-once logic from the server @@ -94,15 +95,14 @@ rpcc::rpcc(const string & d, bool retrans) : // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { cancel(); - IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); - if (chan_) - chan_->closeconn(); + IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1)); + chan_.reset(); VERIFY(calls_.size() == 0); } int rpcc::bind(milliseconds to) { - unsigned int r; - int ret = call_timeout(rpc_protocol::bind, to, r, 0); + nonce_t r; + int ret = call_timeout(rpc_protocol::bind, to, r); if (ret == 0) { lock ml(m_); bind_done_ = true; @@ -140,7 +140,7 @@ void rpcc::cancel(void) { int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { caller ca(0, &rep); - int xid_rep; + xid_t xid_rep; { lock ml(m_); @@ -325,11 +325,11 @@ compress: } } -rpcs::rpcs(in_port_t p1, size_t count) - : port_(p1), counting_(count), curr_counts_(count), reachable_ (true) +rpcs::rpcs(in_port_t p1) + : port_(p1), reachable_ (true) { set_rand_seed(); - nonce_ = (unsigned int)random(); + nonce_ = (nonce_t)random(); IF_LEVEL(2) LOG("created with nonce " << nonce_); reg(rpc_protocol::bind, &rpcs::rpcbind, this); @@ -338,7 +338,7 @@ rpcs::rpcs(in_port_t p1, size_t count) void rpcs::start() { char *loss_env = getenv("RPC_LOSSY"); - listener_ = unique_ptr(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0)); + listener_.reset(new connection_listener(this, port_, loss_env ? atoi(loss_env) : 0)); } rpcs::~rpcs() { @@ -354,7 +354,7 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { return true; } - return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b)); + return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b)); } void rpcs::reg1(proc_id_t proc, handler *h) { @@ -364,29 +364,6 @@ void rpcs::reg1(proc_id_t proc, handler *h) { VERIFY(procs_.count(proc) >= 1); } -void rpcs::updatestat(proc_id_t proc) { - lock cl(count_m_); - counts_[proc]++; - curr_counts_--; - if (curr_counts_ == 0) { - LOG("RPC STATS: "); - for (auto i = counts_.begin(); i != counts_.end(); i++) - LOG(hex << i->first << ":" << dec << i->second); - - lock rwl(reply_window_m_); - - size_t totalrep = 0, maxrep = 0; - for (auto clt : reply_window_) { - totalrep += clt.second.size(); - if (clt.second.size() > maxrep) - maxrep = clt.second.size(); - } - IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " << - totalrep << " max per client " << maxrep); - curr_counts_ = counting_; - } -} - void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); @@ -440,7 +417,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << - " chan " << c->channo() << ", total clients " << (reply_window_.size()-1)); + " chan " << c->fd << ", total clients " << (reply_window_.size()-1)); } } @@ -449,7 +426,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { lock rwl(conns_m_); if (conns_.find(h.clt_nonce) == conns_.end()) conns_[h.clt_nonce] = c; - else if (conns_[h.clt_nonce]->create_time() < c->create_time()) + else if (conns_[h.clt_nonce]->create_time < c->create_time) conns_[h.clt_nonce] = c; } @@ -461,9 +438,6 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { switch (stat) { case NEW: // new request - if (counting_) - updatestat(proc); - rh.ret = (*f)(req, rep); if (rh.ret == rpc_protocol::unmarshal_args_failure) { LOG("failed to unmarshall the arguments. You are " << @@ -522,8 +496,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { // DONE: seen this xid, previous reply returned in b. // FORGOTTEN: might have seen this xid, but deleted previous reply. rpcs::rpcstate_t -rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, - int xid_rep, string & b) +rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, + xid_t xid_rep, string & b) { lock rwl(reply_window_m_); @@ -532,7 +506,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, VERIFY(l.size() > 0); VERIFY(xid >= xid_rep); - int past_xid_rep = l.begin()->xid; + xid_t past_xid_rep = l.begin()->xid; list::iterator start = l.begin(), it = ++start; @@ -571,7 +545,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, // add_reply() should remember b. // free_reply_window() and checkduplicate_and_update are responsible for // cleaning up the remembered values. -void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) { +void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { lock rwl(reply_window_m_); // remember the RPC reply value list &l = reply_window_[clt_nonce]; @@ -592,7 +566,7 @@ void rpcs::free_reply_window(void) { reply_window_.clear(); } -int rpcs::rpcbind(unsigned int &r, int) { +int rpcs::rpcbind(nonce_t &r) { IF_LEVEL(2) LOG("called return nonce " << nonce_); r = nonce_; return 0;