2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
66 const rpcc::TO rpcc::to_max = { 120000 };
67 const rpcc::TO rpcc::to_min = { 1000 };
69 inline void set_rand_seed() {
70 auto now = time_point_cast<nanoseconds>(steady_clock::now());
71 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
74 static sockaddr_in make_sockaddr(const string &hostandport);
76 rpcc::rpcc(const string & d, bool retrans) :
77 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
78 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
82 clt_nonce_ = (unsigned int)random();
84 // special client nonce 0 means this client does not
85 // require at-most-once logic from the server
86 // because it uses tcp and never retries a failed connection
90 char *loss_env = getenv("RPC_LOSSY");
92 lossytest_ = atoi(loss_env);
94 // xid starts with 1 and latest received reply starts with 0
95 xid_rep_window_.push_back(0);
97 IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
100 // IMPORTANT: destruction should happen only when no external threads
101 // are blocked inside rpcc or will use rpcc in the future
103 IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
108 VERIFY(calls_.size() == 0);
111 int rpcc::bind(TO to) {
113 int ret = call_timeout(rpc_const::bind, to, r, 0);
119 IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
124 // Cancel all outstanding calls
125 void rpcc::cancel(void) {
127 LOG("force callers to fail");
128 for(auto &p : calls_){
129 caller *ca = p.second;
131 IF_LEVEL(2) LOG("force caller to fail");
135 ca->intret = rpc_const::cancel_failure;
140 while (calls_.size () > 0){
141 destroy_wait_ = true;
142 destroy_wait_c_.wait(ml);
147 int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
154 if((proc != rpc_const::bind && !bind_done_) ||
155 (proc == rpc_const::bind && bind_done_)){
156 IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
157 return rpc_const::bind_failure;
161 return rpc_const::cancel_failure;
165 calls_[ca.xid] = &ca;
167 req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
168 xid_rep = xid_rep_window_.front();
172 auto finaldeadline = steady_clock::now() + milliseconds(to.to),
173 nextdeadline = finaldeadline;
175 curr_to.to = to_min.to;
177 bool transmit = true;
178 connection *ch = NULL;
188 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
193 if (forgot.isvalid())
194 ch->send(forgot.buf);
197 else IF_LEVEL(1) LOG("not reachable");
198 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
199 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
201 transmit = false; // only send once on a given channel
204 if(finaldeadline == time_point<steady_clock>::min())
207 nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
208 if(nextdeadline > finaldeadline) {
209 nextdeadline = finaldeadline;
210 finaldeadline = time_point<steady_clock>::min();
216 IF_LEVEL(2) LOG("wait");
217 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
218 IF_LEVEL(2) LOG("timeout");
223 IF_LEVEL(2) LOG("reply received");
228 if(retrans_ && (!ch || ch->isdead())){
229 // since connection is dead, retransmit
230 // on the new connection
237 // no locking of ca.m since only this thread changes ca.xid
239 calls_.erase(ca.xid);
240 // may need to update the xid again here, in case the
241 // packet times out before it's even sent by the channel.
242 // I don't think there's any harm in maybe doing it twice
243 update_xid_rep(ca.xid);
246 destroy_wait_c_.notify_one();
250 if (ca.done && lossytest_)
253 if (!dup_req_.isvalid()) {
255 dup_req_.xid = ca.xid;
257 if (xid_rep > xid_rep_done_)
258 xid_rep_done_ = xid_rep;
263 IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
264 " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
265 ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
270 // destruction of req automatically frees its buffer
271 return (ca.done? ca.intret : rpc_const::timeout_failure);
275 rpcc::get_refconn(connection **ch)
278 if(!chan_ || chan_->isdead()){
281 chan_ = connect_to_dst(dst_, this, lossytest_);
292 // PollMgr's thread is being used to
293 // make this upcall from connection object to rpcc.
294 // this funtion must not block.
296 // this function keeps no reference for connection *c
298 rpcc::got_pdu(connection *, const string & b)
300 unmarshall rep(b, true);
302 rep.unpack_header(h);
305 IF_LEVEL(1) LOG("unmarshall header failed!!!");
311 update_xid_rep(h.xid);
313 if(calls_.find(h.xid) == calls_.end()){
314 IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
317 caller *ca = calls_[h.xid];
324 IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
332 // assumes thread holds mutex m
334 rpcc::update_xid_rep(int xid)
336 if(xid <= xid_rep_window_.front()){
340 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
342 xid_rep_window_.insert(it, xid);
346 xid_rep_window_.push_back(xid);
349 auto it = xid_rep_window_.begin();
350 for (it++; it != xid_rep_window_.end(); it++){
351 while (xid_rep_window_.front() + 1 == *it)
352 xid_rep_window_.pop_front();
356 rpcs::rpcs(in_port_t p1, size_t count)
357 : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
360 nonce_ = (unsigned int)random();
361 IF_LEVEL(2) LOG("created with nonce " << nonce_);
363 reg(rpc_const::bind, &rpcs::rpcbind, this);
364 dispatchpool_ = new ThrPool(6, false);
366 char *loss_env = getenv("RPC_LOSSY");
367 listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
372 // must delete listener before dispatchpool
374 delete dispatchpool_;
379 rpcs::got_pdu(connection *c, const string & b)
382 IF_LEVEL(1) LOG("not reachable");
386 djob_t *j = new djob_t{c, b};
388 bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
389 if(!succ || !reachable_){
397 rpcs::reg1(proc_t proc, handler *h)
400 VERIFY(procs_.count(proc) == 0);
402 VERIFY(procs_.count(proc) >= 1);
406 rpcs::updatestat(proc_t proc)
411 if(curr_counts_ == 0){
413 for (auto i = counts_.begin(); i != counts_.end(); i++)
414 LOG(hex << i->first << ":" << dec << i->second);
416 lock rwl(reply_window_m_);
418 size_t totalrep = 0, maxrep = 0;
419 for (auto clt : reply_window_) {
420 totalrep += clt.second.size();
421 if(clt.second.size() > maxrep)
422 maxrep = clt.second.size();
424 IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
425 totalrep << " max per client " << maxrep);
426 curr_counts_ = counting_;
431 rpcs::dispatch(djob_t *j)
433 connection *c = j->conn;
434 unmarshall req(j->buf, true);
438 req.unpack_header(h);
439 proc_t proc = h.proc;
442 IF_LEVEL(1) LOG("unmarshall header failed!!!");
447 IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
448 dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
451 reply_header rh{h.xid,0};
453 // is client sending to an old instance of server?
454 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
455 IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
456 " (current " << nonce_ << ") proc " << hex << h.proc);
457 rh.ret = rpc_const::oldsrv_failure;
464 // is RPC proc a registered procedure?
467 if(procs_.count(proc) < 1){
468 cerr << "unknown proc " << hex << proc << "." << endl;
477 rpcs::rpcstate_t stat;
481 // have i seen this client before?
483 lock rwl(reply_window_m_);
484 // if we don't know about this clt_nonce, create a cleanup object
485 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
486 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
487 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
488 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
489 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
493 // save the latest good connection to the client
496 if(conns_.find(h.clt_nonce) == conns_.end()){
498 conns_[h.clt_nonce] = c;
499 } else if(conns_[h.clt_nonce]->compare(c) < 0){
500 conns_[h.clt_nonce]->decref();
502 conns_[h.clt_nonce] = c;
506 stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
508 // this client does not require at most once logic
513 case NEW: // new request
518 rh.ret = (*f)(req, rep);
519 if (rh.ret == rpc_const::unmarshal_args_failure) {
520 cerr << "failed to unmarshall the arguments. You are " <<
521 "probably calling RPC 0x" << hex << proc << " with the wrong " <<
522 "types of arguments." << endl;
530 IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
531 h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
533 if (h.clt_nonce > 0) {
534 // only record replies for clients that require at-most-once logic
535 add_reply(h.clt_nonce, h.xid, b1);
538 // get the latest connection to the client
541 if(c->isdead() && c != conns_[h.clt_nonce]){
543 c = conns_[h.clt_nonce];
550 case INPROGRESS: // server is working on this request
552 case DONE: // duplicate and we still have the response
555 case FORGOTTEN: // very old request and we don't have the response anymore
556 IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
557 rh.ret = rpc_const::atmostonce_failure;
565 // rpcs::dispatch calls this when an RPC request arrives.
567 // checks to see if an RPC with xid from clt_nonce has already been received.
568 // if not, remembers the request in reply_window_.
570 // deletes remembered requests with XIDs <= xid_rep; the client
571 // says it has received a reply for every RPC up through xid_rep.
572 // frees the reply_t::buf of each such request.
575 // NEW: never seen this xid before.
576 // INPROGRESS: seen this xid, and still processing it.
577 // DONE: seen this xid, previous reply returned in b.
578 // FORGOTTEN: might have seen this xid, but deleted previous reply.
580 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
581 int xid_rep, string & b)
583 lock rwl(reply_window_m_);
585 list<reply_t> &l = reply_window_[clt_nonce];
587 VERIFY(l.size() > 0);
588 VERIFY(xid >= xid_rep);
590 int past_xid_rep = l.begin()->xid;
592 list<reply_t>::iterator start = l.begin(), it = ++start;
594 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
595 // scan for deletion candidates
596 while (it != l.end() && it->xid < xid_rep)
599 l.begin()->xid = xid_rep;
602 if (xid < past_xid_rep && past_xid_rep != -1)
605 // skip non-deletion candidates
606 while (it != l.end() && it->xid < xid)
609 // if it's in the list it must be right here
610 if (it != l.end() && it->xid == xid) {
611 if (it->cb_present) {
612 // return information about the remembered reply
618 // remember that a new request has arrived
619 l.insert(it, reply_t(xid));
624 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
625 // and passes the return value in b.
626 // add_reply() should remember b.
627 // free_reply_window() and checkduplicate_and_update are responsible for
628 // cleaning up the remembered values.
629 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
630 lock rwl(reply_window_m_);
631 // remember the RPC reply value
632 list<reply_t> &l = reply_window_[clt_nonce];
633 list<reply_t>::iterator it = l.begin();
634 // skip to our place in the list
635 for (it++; it != l.end() && it->xid < xid; it++);
636 // there should already be an entry, so whine if there isn't
637 if (it == l.end() || it->xid != xid) {
638 cerr << "Could not find reply struct in add_reply" << endl;
639 l.insert(it, reply_t(xid, b));
641 *it = reply_t(xid, b);
645 void rpcs::free_reply_window(void) {
646 lock rwl(reply_window_m_);
647 reply_window_.clear();
650 int rpcs::rpcbind(unsigned int &r, int) {
651 IF_LEVEL(2) LOG("called return nonce " << nonce_);
656 static sockaddr_in make_sockaddr(const string &host, const string &port);
658 static sockaddr_in make_sockaddr(const string &hostandport) {
659 auto colon = hostandport.find(':');
660 if (colon == string::npos)
661 return make_sockaddr("127.0.0.1", hostandport);
663 return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
666 static sockaddr_in make_sockaddr(const string &host, const string &port) {
668 bzero(&dst, sizeof(dst));
669 dst.sin_family = AF_INET;
671 struct in_addr a{inet_addr(host.c_str())};
673 if(a.s_addr != INADDR_NONE)
674 dst.sin_addr.s_addr = a.s_addr;
676 struct hostent *hp = gethostbyname(host.c_str());
678 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
679 cerr << "cannot find host name " << host << endl;
682 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
683 dst.sin_addr.s_addr = a.s_addr;
685 dst.sin_port = hton((in_port_t)stoi(port));