2 The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
3 server. The jobs of rpcc include maintaining a connection to server, sending
4 RPC requests and waiting for responses, retransmissions, at-most-once delivery
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has
15 failed (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The thread pool allows
29 us to control the number of threads spawned at the server (spawning one thread
30 per request will hurt when the server faces thousands of requests).
32 In order to delete a connection object, we must maintain a reference count.
33 For rpcc, multiple client threads might be invoking the rpcc::call() functions
34 and thus holding multiple references to the underlying connection object. For
35 rpcs, multiple dispatch threads might be holding references to the same
36 connection object. A connection object is deleted only when the underlying
37 connection is dead and the reference count reaches zero.
39 This version of the RPC library explicitly joins exited threads to make sure
40 no outstanding references exist before deleting objects.
42 To delete a rpcc object safely, the users of the library must ensure that
43 there are no outstanding calls on the rpcc object.
45 To delete a rpcs object safely, we do the following in sequence: 1. stop
46 accepting new incoming connections. 2. close existing active connections. 3.
47 delete the dispatch thread pool which involves waiting for current active RPC
48 handlers to finish. It is interesting how a thread pool can be deleted
49 without using thread cancellation. The trick is to inject x "poison pills" for
50 a thread pool of x threads. Upon getting a poison pill instead of a normal
51 task, a worker thread will exit (and thread pool destructor waits to join all
52 x exited worker threads).
58 #include <sys/types.h>
59 #include <arpa/inet.h>
60 #include <netinet/tcp.h>
64 inline void set_rand_seed() {
65 auto now = time_point_cast<nanoseconds>(steady_clock::now());
66 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
69 static sockaddr_in make_sockaddr(const string &hostandport);
71 rpcc::rpcc(const string & d, bool retrans) :
72 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
73 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
77 clt_nonce_ = (unsigned int)random();
79 // special client nonce 0 means this client does not
80 // require at-most-once logic from the server
81 // because it uses tcp and never retries a failed connection
85 char *loss_env = getenv("RPC_LOSSY");
87 lossytest_ = atoi(loss_env);
89 // xid starts with 1 and latest received reply starts with 0
90 xid_rep_window_.push_back(0);
92 IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
95 // IMPORTANT: destruction should happen only when no external threads
96 // are blocked inside rpcc or will use rpcc in the future
98 IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
103 VERIFY(calls_.size() == 0);
106 int rpcc::bind(milliseconds to) {
108 int ret = call_timeout(rpc_const::bind, to, r, 0);
114 IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
119 // Cancel all outstanding calls
120 void rpcc::cancel(void) {
122 LOG("force callers to fail");
123 for(auto &p : calls_){
124 caller *ca = p.second;
126 IF_LEVEL(2) LOG("force caller to fail");
130 ca->intret = rpc_const::cancel_failure;
135 while (calls_.size () > 0){
136 destroy_wait_ = true;
137 destroy_wait_c_.wait(ml);
142 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
149 if((proc != rpc_const::bind && !bind_done_) ||
150 (proc == rpc_const::bind && bind_done_)){
151 IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
152 return rpc_const::bind_failure;
156 return rpc_const::cancel_failure;
160 calls_[ca.xid] = &ca;
162 req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
163 xid_rep = xid_rep_window_.front();
166 milliseconds curr_to = rpc::to_min;
167 auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
169 bool transmit = true;
170 connection *ch = NULL;
180 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
185 if (forgot.isvalid())
186 ch->send(forgot.buf);
189 else IF_LEVEL(1) LOG("not reachable");
190 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
191 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
193 transmit = false; // only send once on a given channel
196 if(finaldeadline == time_point<steady_clock>::min())
199 nextdeadline = steady_clock::now() + curr_to;
200 if(nextdeadline > finaldeadline) {
201 nextdeadline = finaldeadline;
202 finaldeadline = time_point<steady_clock>::min();
208 IF_LEVEL(2) LOG("wait");
209 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
210 IF_LEVEL(2) LOG("timeout");
215 IF_LEVEL(2) LOG("reply received");
220 if(retrans_ && (!ch || ch->isdead())){
221 // since connection is dead, retransmit
222 // on the new connection
229 // no locking of ca.m since only this thread changes ca.xid
231 calls_.erase(ca.xid);
232 // may need to update the xid again here, in case the
233 // packet times out before it's even sent by the channel.
234 // I don't think there's any harm in maybe doing it twice
235 update_xid_rep(ca.xid);
238 destroy_wait_c_.notify_one();
242 if (ca.done && lossytest_)
245 if (!dup_req_.isvalid()) {
247 dup_req_.xid = ca.xid;
249 if (xid_rep > xid_rep_done_)
250 xid_rep_done_ = xid_rep;
255 IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
256 " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
257 ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
262 // destruction of req automatically frees its buffer
263 return (ca.done? ca.intret : rpc_const::timeout_failure);
267 rpcc::get_refconn(connection **ch)
270 if(!chan_ || chan_->isdead()){
273 chan_ = connect_to_dst(dst_, this, lossytest_);
284 // PollMgr's thread is being used to
285 // make this upcall from connection object to rpcc.
286 // this funtion must not block.
288 // this function keeps no reference for connection *c
290 rpcc::got_pdu(connection *, const string & b)
292 unmarshall rep(b, true);
294 rep.unpack_header(h);
297 IF_LEVEL(1) LOG("unmarshall header failed!!!");
303 update_xid_rep(h.xid);
305 if(calls_.find(h.xid) == calls_.end()){
306 IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
309 caller *ca = calls_[h.xid];
316 IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
324 // assumes thread holds mutex m
326 rpcc::update_xid_rep(int xid)
328 if(xid <= xid_rep_window_.front()){
332 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
334 xid_rep_window_.insert(it, xid);
338 xid_rep_window_.push_back(xid);
341 auto it = xid_rep_window_.begin();
342 for (it++; it != xid_rep_window_.end(); it++){
343 while (xid_rep_window_.front() + 1 == *it)
344 xid_rep_window_.pop_front();
348 rpcs::rpcs(in_port_t p1, size_t count)
349 : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
352 nonce_ = (unsigned int)random();
353 IF_LEVEL(2) LOG("created with nonce " << nonce_);
355 reg(rpc_const::bind, &rpcs::rpcbind, this);
356 dispatchpool_ = new ThrPool(6, false);
358 char *loss_env = getenv("RPC_LOSSY");
359 listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
364 // must delete listener before dispatchpool
366 delete dispatchpool_;
371 rpcs::got_pdu(connection *c, const string & b)
374 IF_LEVEL(1) LOG("not reachable");
378 djob_t *j = new djob_t{c, b};
380 bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
381 if(!succ || !reachable_){
389 rpcs::reg1(proc_t proc, handler *h)
392 VERIFY(procs_.count(proc) == 0);
394 VERIFY(procs_.count(proc) >= 1);
398 rpcs::updatestat(proc_t proc)
403 if(curr_counts_ == 0){
405 for (auto i = counts_.begin(); i != counts_.end(); i++)
406 LOG(hex << i->first << ":" << dec << i->second);
408 lock rwl(reply_window_m_);
410 size_t totalrep = 0, maxrep = 0;
411 for (auto clt : reply_window_) {
412 totalrep += clt.second.size();
413 if(clt.second.size() > maxrep)
414 maxrep = clt.second.size();
416 IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
417 totalrep << " max per client " << maxrep);
418 curr_counts_ = counting_;
423 rpcs::dispatch(djob_t *j)
425 connection *c = j->conn;
426 unmarshall req(j->buf, true);
430 req.unpack_header(h);
431 proc_t proc = h.proc;
434 IF_LEVEL(1) LOG("unmarshall header failed!!!");
439 IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
440 dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
443 reply_header rh{h.xid,0};
445 // is client sending to an old instance of server?
446 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
447 IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
448 " (current " << nonce_ << ") proc " << hex << h.proc);
449 rh.ret = rpc_const::oldsrv_failure;
456 // is RPC proc a registered procedure?
459 if(procs_.count(proc) < 1){
460 cerr << "unknown proc " << hex << proc << "." << endl;
469 rpcs::rpcstate_t stat;
473 // have i seen this client before?
475 lock rwl(reply_window_m_);
476 // if we don't know about this clt_nonce, create a cleanup object
477 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
478 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
479 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
480 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
481 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
485 // save the latest good connection to the client
488 if(conns_.find(h.clt_nonce) == conns_.end()){
490 conns_[h.clt_nonce] = c;
491 } else if(conns_[h.clt_nonce]->compare(c) < 0){
492 conns_[h.clt_nonce]->decref();
494 conns_[h.clt_nonce] = c;
498 stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
500 // this client does not require at most once logic
505 case NEW: // new request
510 rh.ret = (*f)(req, rep);
511 if (rh.ret == rpc_const::unmarshal_args_failure) {
512 cerr << "failed to unmarshall the arguments. You are " <<
513 "probably calling RPC 0x" << hex << proc << " with the wrong " <<
514 "types of arguments." << endl;
522 IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
523 h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
525 if (h.clt_nonce > 0) {
526 // only record replies for clients that require at-most-once logic
527 add_reply(h.clt_nonce, h.xid, b1);
530 // get the latest connection to the client
533 if(c->isdead() && c != conns_[h.clt_nonce]){
535 c = conns_[h.clt_nonce];
542 case INPROGRESS: // server is working on this request
544 case DONE: // duplicate and we still have the response
547 case FORGOTTEN: // very old request and we don't have the response anymore
548 IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
549 rh.ret = rpc_const::atmostonce_failure;
557 // rpcs::dispatch calls this when an RPC request arrives.
559 // checks to see if an RPC with xid from clt_nonce has already been received.
560 // if not, remembers the request in reply_window_.
562 // deletes remembered requests with XIDs <= xid_rep; the client
563 // says it has received a reply for every RPC up through xid_rep.
564 // frees the reply_t::buf of each such request.
567 // NEW: never seen this xid before.
568 // INPROGRESS: seen this xid, and still processing it.
569 // DONE: seen this xid, previous reply returned in b.
570 // FORGOTTEN: might have seen this xid, but deleted previous reply.
572 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
573 int xid_rep, string & b)
575 lock rwl(reply_window_m_);
577 list<reply_t> &l = reply_window_[clt_nonce];
579 VERIFY(l.size() > 0);
580 VERIFY(xid >= xid_rep);
582 int past_xid_rep = l.begin()->xid;
584 list<reply_t>::iterator start = l.begin(), it = ++start;
586 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
587 // scan for deletion candidates
588 while (it != l.end() && it->xid < xid_rep)
591 l.begin()->xid = xid_rep;
594 if (xid < past_xid_rep && past_xid_rep != -1)
597 // skip non-deletion candidates
598 while (it != l.end() && it->xid < xid)
601 // if it's in the list it must be right here
602 if (it != l.end() && it->xid == xid) {
603 if (it->cb_present) {
604 // return information about the remembered reply
610 // remember that a new request has arrived
611 l.insert(it, reply_t(xid));
616 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
617 // and passes the return value in b.
618 // add_reply() should remember b.
619 // free_reply_window() and checkduplicate_and_update are responsible for
620 // cleaning up the remembered values.
621 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
622 lock rwl(reply_window_m_);
623 // remember the RPC reply value
624 list<reply_t> &l = reply_window_[clt_nonce];
625 list<reply_t>::iterator it = l.begin();
626 // skip to our place in the list
627 for (it++; it != l.end() && it->xid < xid; it++);
628 // there should already be an entry, so whine if there isn't
629 if (it == l.end() || it->xid != xid) {
630 cerr << "Could not find reply struct in add_reply" << endl;
631 l.insert(it, reply_t(xid, b));
633 *it = reply_t(xid, b);
637 void rpcs::free_reply_window(void) {
638 lock rwl(reply_window_m_);
639 reply_window_.clear();
642 int rpcs::rpcbind(unsigned int &r, int) {
643 IF_LEVEL(2) LOG("called return nonce " << nonce_);
648 static sockaddr_in make_sockaddr(const string &host, const string &port);
650 static sockaddr_in make_sockaddr(const string &hostandport) {
651 auto colon = hostandport.find(':');
652 if (colon == string::npos)
653 return make_sockaddr("127.0.0.1", hostandport);
655 return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
658 static sockaddr_in make_sockaddr(const string &host, const string &port) {
660 bzero(&dst, sizeof(dst));
661 dst.sin_family = AF_INET;
663 struct in_addr a{inet_addr(host.c_str())};
665 if(a.s_addr != INADDR_NONE)
666 dst.sin_addr.s_addr = a.s_addr;
668 struct hostent *hp = gethostbyname(host.c_str());
670 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
671 cerr << "cannot find host name " << host << endl;
674 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
675 dst.sin_addr.s_addr = a.s_addr;
677 dst.sin_port = hton((in_port_t)stoi(port));