2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
66 const rpcc::TO rpcc::to_max = { 120000 };
67 const rpcc::TO rpcc::to_min = { 1000 };
69 inline void set_rand_seed() {
70 auto now = time_point_cast<nanoseconds>(steady_clock::now());
71 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
74 rpcc::rpcc(const string & d, bool retrans) :
75 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
76 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
80 clt_nonce_ = (unsigned int)random();
82 // special client nonce 0 means this client does not
83 // require at-most-once logic from the server
84 // because it uses tcp and never retries a failed connection
88 char *loss_env = getenv("RPC_LOSSY");
90 lossytest_ = atoi(loss_env);
93 // xid starts with 1 and latest received reply starts with 0
94 xid_rep_window_.push_back(0);
96 IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
99 // IMPORTANT: destruction should happen only when no external threads
100 // are blocked inside rpcc or will use rpcc in the future
102 IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
107 VERIFY(calls_.size() == 0);
110 int rpcc::bind(TO to) {
112 int ret = call_timeout(rpc_const::bind, to, r, 0);
118 IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
123 // Cancel all outstanding calls
124 void rpcc::cancel(void) {
126 LOG("rpcc::cancel: force callers to fail");
127 for(auto &p : calls_){
128 caller *ca = p.second;
130 IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
134 ca->intret = rpc_const::cancel_failure;
139 while (calls_.size () > 0){
140 destroy_wait_ = true;
141 destroy_wait_c_.wait(ml);
143 LOG("rpcc::cancel: done");
146 int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
153 if((proc != rpc_const::bind && !bind_done_) ||
154 (proc == rpc_const::bind && bind_done_)){
155 IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
156 return rpc_const::bind_failure;
160 return rpc_const::cancel_failure;
164 calls_[ca.xid] = &ca;
166 req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
167 xid_rep = xid_rep_window_.front();
171 auto finaldeadline = steady_clock::now() + milliseconds(to.to),
172 nextdeadline = finaldeadline;
174 curr_to.to = to_min.to;
176 bool transmit = true;
177 connection *ch = NULL;
187 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
192 if (forgot.isvalid())
193 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
194 ch->send(req.cstr(), req.size());
196 else IF_LEVEL(1) LOG("not reachable");
197 IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
198 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
200 transmit = false; // only send once on a given channel
203 if(finaldeadline == time_point<steady_clock>::min())
206 nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
207 if(nextdeadline > finaldeadline) {
208 nextdeadline = finaldeadline;
209 finaldeadline = time_point<steady_clock>::min();
215 IF_LEVEL(2) LOG("rpcc:call1: wait");
216 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
217 IF_LEVEL(2) LOG("rpcc::call1: timeout");
222 IF_LEVEL(2) LOG("rpcc::call1: reply received");
227 if(retrans_ && (!ch || ch->isdead())){
228 // since connection is dead, retransmit
229 // on the new connection
236 // no locking of ca.m since only this thread changes ca.xid
238 calls_.erase(ca.xid);
239 // may need to update the xid again here, in case the
240 // packet times out before it's even sent by the channel.
241 // I don't think there's any harm in maybe doing it twice
242 update_xid_rep(ca.xid);
245 destroy_wait_c_.notify_one();
249 if (ca.done && lossytest_)
252 if (!dup_req_.isvalid()) {
253 dup_req_.buf.assign(req.cstr(), req.size());
254 dup_req_.xid = ca.xid;
256 if (xid_rep > xid_rep_done_)
257 xid_rep_done_ = xid_rep;
262 IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
263 " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
264 ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
269 // destruction of req automatically frees its buffer
270 return (ca.done? ca.intret : rpc_const::timeout_failure);
274 rpcc::get_refconn(connection **ch)
277 if(!chan_ || chan_->isdead()){
280 chan_ = connect_to_dst(dst_, this, lossytest_);
291 // PollMgr's thread is being used to
292 // make this upcall from connection object to rpcc.
293 // this funtion must not block.
295 // this function keeps no reference for connection *c
297 rpcc::got_pdu(connection *, char *b, size_t sz)
299 unmarshall rep(b, sz);
301 rep.unpack_reply_header(&h);
304 IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
310 update_xid_rep(h.xid);
312 if(calls_.find(h.xid) == calls_.end()){
313 IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
316 caller *ca = calls_[h.xid];
320 ca->un->take_in(rep);
323 IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
331 // assumes thread holds mutex m
333 rpcc::update_xid_rep(int xid)
335 if(xid <= xid_rep_window_.front()){
339 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
341 xid_rep_window_.insert(it, xid);
345 xid_rep_window_.push_back(xid);
348 auto it = xid_rep_window_.begin();
349 for (it++; it != xid_rep_window_.end(); it++){
350 while (xid_rep_window_.front() + 1 == *it)
351 xid_rep_window_.pop_front();
355 rpcs::rpcs(unsigned int p1, size_t count)
356 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
359 nonce_ = (unsigned int)random();
360 IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
362 char *loss_env = getenv("RPC_LOSSY");
363 if(loss_env != NULL){
364 lossytest_ = atoi(loss_env);
367 reg(rpc_const::bind, &rpcs::rpcbind, this);
368 dispatchpool_ = new ThrPool(6,false);
370 listener_ = new tcpsconn(this, port_, lossytest_);
375 // must delete listener before dispatchpool
377 delete dispatchpool_;
382 rpcs::got_pdu(connection *c, char *b, size_t sz)
385 IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
389 djob_t *j = new djob_t(c, b, sz);
391 bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
392 if(!succ || !reachable_){
400 rpcs::reg1(proc_t proc, handler *h)
403 VERIFY(procs_.count(proc) == 0);
405 VERIFY(procs_.count(proc) >= 1);
409 rpcs::updatestat(proc_t proc)
414 if(curr_counts_ == 0){
416 for (auto i = counts_.begin(); i != counts_.end(); i++)
417 LOG(hex << i->first << ":" << dec << i->second);
419 lock rwl(reply_window_m_);
421 size_t totalrep = 0, maxrep = 0;
422 for (auto clt : reply_window_) {
423 totalrep += clt.second.size();
424 if(clt.second.size() > maxrep)
425 maxrep = clt.second.size();
427 IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
428 totalrep << " max per client " << maxrep);
429 curr_counts_ = counting_;
434 rpcs::dispatch(djob_t *j)
436 connection *c = j->conn;
437 unmarshall req(j->buf, j->sz);
441 req.unpack_req_header(&h);
442 proc_t proc = h.proc;
445 IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
450 IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
451 dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
454 reply_header rh(h.xid,0);
456 // is client sending to an old instance of server?
457 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
458 IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
459 " (current " << nonce_ << ") proc " << hex << h.proc);
460 rh.ret = rpc_const::oldsrv_failure;
461 rep.pack_reply_header(rh);
462 c->send(rep.cstr(),rep.size());
467 // is RPC proc a registered procedure?
470 if(procs_.count(proc) < 1){
471 cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
480 rpcs::rpcstate_t stat;
485 // have i seen this client before?
487 lock rwl(reply_window_m_);
488 // if we don't know about this clt_nonce, create a cleanup object
489 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
490 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
491 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
492 IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
493 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
497 // save the latest good connection to the client
500 if(conns_.find(h.clt_nonce) == conns_.end()){
502 conns_[h.clt_nonce] = c;
503 } else if(conns_[h.clt_nonce]->compare(c) < 0){
504 conns_[h.clt_nonce]->decref();
506 conns_[h.clt_nonce] = c;
510 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
511 h.xid_rep, &b1, &sz1);
513 // this client does not require at most once logic
518 case NEW: // new request
523 rh.ret = (*f)(req, rep);
524 if (rh.ret == rpc_const::unmarshal_args_failure) {
525 cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
526 "probably calling RPC 0x" << hex << proc << " with the wrong " <<
527 "types of arguments." << endl;
532 rep.pack_reply_header(rh);
533 rep.take_buf(&b1,&sz1);
535 IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
536 h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
539 // only record replies for clients that require at-most-once logic
540 add_reply(h.clt_nonce, h.xid, b1, sz1);
543 // get the latest connection to the client
546 if(c->isdead() && c != conns_[h.clt_nonce]){
548 c = conns_[h.clt_nonce];
554 if(h.clt_nonce == 0){
555 // reply is not added to at-most-once window, free it
559 case INPROGRESS: // server is working on this request
561 case DONE: // duplicate and we still have the response
564 case FORGOTTEN: // very old request and we don't have the response anymore
565 IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
566 rh.ret = rpc_const::atmostonce_failure;
567 rep.pack_reply_header(rh);
568 c->send(rep.cstr(),rep.size());
574 // rpcs::dispatch calls this when an RPC request arrives.
576 // checks to see if an RPC with xid from clt_nonce has already been received.
577 // if not, remembers the request in reply_window_.
579 // deletes remembered requests with XIDs <= xid_rep; the client
580 // says it has received a reply for every RPC up through xid_rep.
581 // frees the reply_t::buf of each such request.
584 // NEW: never seen this xid before.
585 // INPROGRESS: seen this xid, and still processing it.
586 // DONE: seen this xid, previous reply returned in *b and *sz.
587 // FORGOTTEN: might have seen this xid, but deleted previous reply.
589 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
590 int xid_rep, char **b, size_t *sz)
592 lock rwl(reply_window_m_);
594 list<reply_t> &l = reply_window_[clt_nonce];
596 VERIFY(l.size() > 0);
597 VERIFY(xid >= xid_rep);
599 int past_xid_rep = l.begin()->xid;
601 list<reply_t>::iterator start = l.begin(), it = ++start;
603 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
604 // scan for deletion candidates
605 for (; it != l.end() && it->xid < xid_rep; it++) {
610 l.begin()->xid = xid_rep;
613 if (xid < past_xid_rep && past_xid_rep != -1)
616 // skip non-deletion candidates
617 while (it != l.end() && it->xid < xid)
620 // if it's in the list it must be right here
621 if (it != l.end() && it->xid == xid) {
622 if (it->cb_present) {
623 // return information about the remembered reply
631 // remember that a new request has arrived
632 l.insert(it, reply_t(xid));
637 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
638 // and passes the return value in b and sz.
639 // add_reply() should remember b and sz.
640 // free_reply_window() and checkduplicate_and_update is responsible for
643 rpcs::add_reply(unsigned int clt_nonce, int xid,
646 lock rwl(reply_window_m_);
647 // remember the RPC reply value
648 list<reply_t> &l = reply_window_[clt_nonce];
649 list<reply_t>::iterator it = l.begin();
650 // skip to our place in the list
651 for (it++; it != l.end() && it->xid < xid; it++);
652 // there should already be an entry, so whine if there isn't
653 if (it == l.end() || it->xid != xid) {
654 cerr << "Could not find reply struct in add_reply" << endl;
655 l.insert(it, reply_t(xid, b, sz));
657 *it = reply_t(xid, b, sz);
661 void rpcs::free_reply_window(void) {
662 lock rwl(reply_window_m_);
663 for (auto clt : reply_window_) {
664 for (auto it : clt.second){
670 reply_window_.clear();
673 int rpcs::rpcbind(unsigned int &r, int) {
674 IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
680 operator<<(marshall &m, uint8_t x) {
686 operator<<(marshall &m, uint16_t x) {
688 m.rawbytes((char *)&x, 2);
693 operator<<(marshall &m, uint32_t x) {
695 m.rawbytes((char *)&x, 4);
699 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
700 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
701 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
702 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
703 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
706 operator<<(marshall &m, const string &s) {
707 m << (unsigned int) s.size();
708 m.rawbytes(s.data(), s.size());
712 void marshall::pack_req_header(const request_header &h) {
713 size_t saved_sz = index_;
714 //leave the first 4-byte empty for channel to fill size of pdu
715 index_ = sizeof(rpc_sz_t);
716 *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
720 void marshall::pack_reply_header(const reply_header &h) {
721 size_t saved_sz = index_;
722 //leave the first 4-byte empty for channel to fill size of pdu
723 index_ = sizeof(rpc_sz_t);
724 *this << h.xid << h.ret;
728 // take the contents from another unmarshall object
730 unmarshall::take_in(unmarshall &another)
734 another.take_buf(&buf_, &sz_);
735 index_ = RPC_HEADER_SZ;
736 ok_ = sz_ >= RPC_HEADER_SZ?true:false;
740 unmarshall::ensure(size_t n) {
747 unmarshall::rawbyte()
751 return (uint8_t)buf_[index_++];
755 unmarshall::rawbytes(string &ss, size_t n)
758 ss.assign(buf_+index_, n);
764 unmarshall::rawbytes(T &t)
766 const size_t n = sizeof(T);
768 memcpy(&t, buf_+index_, n);
773 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
774 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
775 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
776 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
777 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
778 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
779 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
780 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
781 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
782 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
783 unmarshall & operator>>(unmarshall &u, string &s) {
784 unsigned sz = u.grab<unsigned>();
790 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
791 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
792 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
793 ((a.sin_port < b.sin_port))));
796 /*---------------auxilary function--------------*/
797 sockaddr_in make_sockaddr(const string &hostandport) {
798 auto colon = hostandport.find(':');
799 if (colon == string::npos)
800 return make_sockaddr("127.0.0.1", hostandport);
802 return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
805 sockaddr_in make_sockaddr(const string &host, const string &port) {
807 bzero(&dst, sizeof(dst));
808 dst.sin_family = AF_INET;
810 struct in_addr a{inet_addr(host.c_str())};
812 if(a.s_addr != INADDR_NONE)
813 dst.sin_addr.s_addr = a.s_addr;
815 struct hostent *hp = gethostbyname(host.c_str());
817 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
818 cerr << "cannot find host name " << host << endl;
821 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
822 dst.sin_addr.s_addr = a.s_addr;
824 dst.sin_port = hton((uint16_t)stoi(port));