2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
68 const rpcc::TO rpcc::to_max = { 120000 };
69 const rpcc::TO rpcc::to_min = { 1000 };
71 inline void set_rand_seed() {
72 auto now = time_point_cast<nanoseconds>(steady_clock::now());
73 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
76 rpcc::rpcc(const string & d, bool retrans) :
77 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
78 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
82 clt_nonce_ = (unsigned int)random();
84 // special client nonce 0 means this client does not
85 // require at-most-once logic from the server
86 // because it uses tcp and never retries a failed connection
90 char *loss_env = getenv("RPC_LOSSY");
92 lossytest_ = atoi(loss_env);
95 // xid starts with 1 and latest received reply starts with 0
96 xid_rep_window_.push_back(0);
98 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
99 clt_nonce_, lossytest_);
102 // IMPORTANT: destruction should happen only when no external threads
103 // are blocked inside rpcc or will use rpcc in the future
106 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
107 clt_nonce_, chan_?chan_->channo():-1);
112 VERIFY(calls_.size() == 0);
119 int ret = call_timeout(rpc_const::bind, to, r, 0);
125 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
126 inet_ntoa(dst_.sin_addr), ret);
131 // Cancel all outstanding calls
136 LOG("rpcc::cancel: force callers to fail");
137 for(auto &p : calls_){
138 caller *ca = p.second;
140 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
144 ca->intret = rpc_const::cancel_failure;
149 while (calls_.size () > 0){
150 destroy_wait_ = true;
151 destroy_wait_c_.wait(ml);
153 LOG("rpcc::cancel: done");
157 rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
166 if((proc != rpc_const::bind && !bind_done_) ||
167 (proc == rpc_const::bind && bind_done_)){
168 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
169 return rpc_const::bind_failure;
173 return rpc_const::cancel_failure;
177 calls_[ca.xid] = &ca;
179 req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
180 xid_rep = xid_rep_window_.front();
184 auto finaldeadline = steady_clock::now() + milliseconds(to.to),
185 nextdeadline = finaldeadline;
187 curr_to.to = to_min.to;
189 bool transmit = true;
190 connection *ch = NULL;
200 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
205 if (forgot.isvalid())
206 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
207 ch->send(req.cstr(), req.size());
209 else jsl_log(JSL_DBG_1, "not reachable\n");
211 "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
212 clt_nonce_, proc, ca.xid, clt_nonce_);
214 transmit = false; // only send once on a given channel
217 if(finaldeadline == time_point<steady_clock>::min())
220 nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
221 if(nextdeadline > finaldeadline) {
222 nextdeadline = finaldeadline;
223 finaldeadline = time_point<steady_clock>::min();
229 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
230 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
231 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
236 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
241 if(retrans_ && (!ch || ch->isdead())){
242 // since connection is dead, retransmit
243 // on the new connection
250 // no locking of ca.m since only this thread changes ca.xid
252 calls_.erase(ca.xid);
253 // may need to update the xid again here, in case the
254 // packet times out before it's even sent by the channel.
255 // I don't think there's any harm in maybe doing it twice
256 update_xid_rep(ca.xid);
259 destroy_wait_c_.notify_one();
263 if (ca.done && lossytest_)
266 if (!dup_req_.isvalid()) {
267 dup_req_.buf.assign(req.cstr(), req.size());
268 dup_req_.xid = ca.xid;
270 if (xid_rep > xid_rep_done_)
271 xid_rep_done_ = xid_rep;
277 "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
278 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
279 ntohs(dst_.sin_port), ca.done, ca.intret);
284 // destruction of req automatically frees its buffer
285 return (ca.done? ca.intret : rpc_const::timeout_failure);
289 rpcc::get_refconn(connection **ch)
292 if(!chan_ || chan_->isdead()){
295 chan_ = connect_to_dst(dst_, this, lossytest_);
306 // PollMgr's thread is being used to
307 // make this upcall from connection object to rpcc.
308 // this funtion must not block.
310 // this function keeps no reference for connection *c
312 rpcc::got_pdu(connection *, char *b, size_t sz)
314 unmarshall rep(b, sz);
316 rep.unpack_reply_header(&h);
319 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
325 update_xid_rep(h.xid);
327 if(calls_.find(h.xid) == calls_.end()){
328 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
331 caller *ca = calls_[h.xid];
335 ca->un->take_in(rep);
338 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
347 // assumes thread holds mutex m
349 rpcc::update_xid_rep(int xid)
351 if(xid <= xid_rep_window_.front()){
355 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
357 xid_rep_window_.insert(it, xid);
361 xid_rep_window_.push_back(xid);
364 auto it = xid_rep_window_.begin();
365 for (it++; it != xid_rep_window_.end(); it++){
366 while (xid_rep_window_.front() + 1 == *it)
367 xid_rep_window_.pop_front();
371 rpcs::rpcs(unsigned int p1, size_t count)
372 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
375 nonce_ = (unsigned int)random();
376 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
378 char *loss_env = getenv("RPC_LOSSY");
379 if(loss_env != NULL){
380 lossytest_ = atoi(loss_env);
383 reg(rpc_const::bind, &rpcs::rpcbind, this);
384 dispatchpool_ = new ThrPool(6,false);
386 listener_ = new tcpsconn(this, port_, lossytest_);
391 // must delete listener before dispatchpool
393 delete dispatchpool_;
398 rpcs::got_pdu(connection *c, char *b, size_t sz)
401 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
405 djob_t *j = new djob_t(c, b, sz);
407 bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
408 if(!succ || !reachable_){
416 rpcs::reg1(proc_t proc, handler *h)
419 VERIFY(procs_.count(proc) == 0);
421 VERIFY(procs_.count(proc) >= 1);
425 rpcs::updatestat(proc_t proc)
430 if(curr_counts_ == 0){
432 for (auto i = counts_.begin(); i != counts_.end(); i++)
433 LOG(hex << i->first << ":" << dec << i->second);
435 lock rwl(reply_window_m_);
437 size_t totalrep = 0, maxrep = 0;
438 for (auto clt : reply_window_) {
439 totalrep += clt.second.size();
440 if(clt.second.size() > maxrep)
441 maxrep = clt.second.size();
443 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
444 (int) reply_window_.size()-1, totalrep, maxrep);
445 curr_counts_ = counting_;
450 rpcs::dispatch(djob_t *j)
452 connection *c = j->conn;
453 unmarshall req(j->buf, j->sz);
457 req.unpack_req_header(&h);
458 proc_t proc = h.proc;
461 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
467 "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
468 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
471 reply_header rh(h.xid,0);
473 // is client sending to an old instance of server?
474 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
476 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
477 h.srv_nonce, nonce_, h.proc);
478 rh.ret = rpc_const::oldsrv_failure;
479 rep.pack_reply_header(rh);
480 c->send(rep.cstr(),rep.size());
485 // is RPC proc a registered procedure?
488 if(procs_.count(proc) < 1){
489 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
499 rpcs::rpcstate_t stat;
504 // have i seen this client before?
506 lock rwl(reply_window_m_);
507 // if we don't know about this clt_nonce, create a cleanup object
508 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
509 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
510 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
512 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
513 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
517 // save the latest good connection to the client
520 if(conns_.find(h.clt_nonce) == conns_.end()){
522 conns_[h.clt_nonce] = c;
523 } else if(conns_[h.clt_nonce]->compare(c) < 0){
524 conns_[h.clt_nonce]->decref();
526 conns_[h.clt_nonce] = c;
530 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
531 h.xid_rep, &b1, &sz1);
533 // this client does not require at most once logic
538 case NEW: // new request
543 rh.ret = (*f)(req, rep);
544 if (rh.ret == rpc_const::unmarshal_args_failure) {
545 fprintf(stderr, "rpcs::dispatch: failed to"
546 " unmarshall the arguments. You are"
547 " probably calling RPC 0x%x with wrong"
548 " types of arguments.\n", proc);
553 rep.pack_reply_header(rh);
554 rep.take_buf(&b1,&sz1);
557 "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
558 sz1, h.xid, proc, rh.ret, h.clt_nonce);
561 // only record replies for clients that require at-most-once logic
562 add_reply(h.clt_nonce, h.xid, b1, sz1);
565 // get the latest connection to the client
568 if(c->isdead() && c != conns_[h.clt_nonce]){
570 c = conns_[h.clt_nonce];
576 if(h.clt_nonce == 0){
577 // reply is not added to at-most-once window, free it
581 case INPROGRESS: // server is working on this request
583 case DONE: // duplicate and we still have the response
586 case FORGOTTEN: // very old request and we don't have the response anymore
587 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
589 rh.ret = rpc_const::atmostonce_failure;
590 rep.pack_reply_header(rh);
591 c->send(rep.cstr(),rep.size());
597 // rpcs::dispatch calls this when an RPC request arrives.
599 // checks to see if an RPC with xid from clt_nonce has already been received.
600 // if not, remembers the request in reply_window_.
602 // deletes remembered requests with XIDs <= xid_rep; the client
603 // says it has received a reply for every RPC up through xid_rep.
604 // frees the reply_t::buf of each such request.
607 // NEW: never seen this xid before.
608 // INPROGRESS: seen this xid, and still processing it.
609 // DONE: seen this xid, previous reply returned in *b and *sz.
610 // FORGOTTEN: might have seen this xid, but deleted previous reply.
612 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
613 int xid_rep, char **b, size_t *sz)
615 lock rwl(reply_window_m_);
617 list<reply_t> &l = reply_window_[clt_nonce];
619 VERIFY(l.size() > 0);
620 VERIFY(xid >= xid_rep);
622 int past_xid_rep = l.begin()->xid;
624 list<reply_t>::iterator start = l.begin(), it = ++start;
626 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
627 // scan for deletion candidates
628 for (; it != l.end() && it->xid < xid_rep; it++) {
633 l.begin()->xid = xid_rep;
636 if (xid < past_xid_rep && past_xid_rep != -1)
639 // skip non-deletion candidates
640 while (it != l.end() && it->xid < xid)
643 // if it's in the list it must be right here
644 if (it != l.end() && it->xid == xid) {
645 if (it->cb_present) {
646 // return information about the remembered reply
654 // remember that a new request has arrived
655 l.insert(it, reply_t(xid));
660 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
661 // and passes the return value in b and sz.
662 // add_reply() should remember b and sz.
663 // free_reply_window() and checkduplicate_and_update is responsible for
666 rpcs::add_reply(unsigned int clt_nonce, int xid,
669 lock rwl(reply_window_m_);
670 // remember the RPC reply value
671 list<reply_t> &l = reply_window_[clt_nonce];
672 list<reply_t>::iterator it = l.begin();
673 // skip to our place in the list
674 for (it++; it != l.end() && it->xid < xid; it++);
675 // there should already be an entry, so whine if there isn't
676 if (it == l.end() || it->xid != xid) {
677 fprintf(stderr, "Could not find reply struct in add_reply");
678 l.insert(it, reply_t(xid, b, sz));
680 *it = reply_t(xid, b, sz);
684 void rpcs::free_reply_window(void) {
685 lock rwl(reply_window_m_);
686 for (auto clt : reply_window_) {
687 for (auto it : clt.second){
693 reply_window_.clear();
696 int rpcs::rpcbind(unsigned int &r, int) {
697 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
703 operator<<(marshall &m, uint8_t x) {
709 operator<<(marshall &m, uint16_t x) {
711 m.rawbytes((char *)&x, 2);
716 operator<<(marshall &m, uint32_t x) {
718 m.rawbytes((char *)&x, 4);
722 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
723 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
724 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
725 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
726 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
729 operator<<(marshall &m, const string &s) {
730 m << (unsigned int) s.size();
731 m.rawbytes(s.data(), s.size());
735 void marshall::pack_req_header(const request_header &h) {
736 size_t saved_sz = index_;
737 //leave the first 4-byte empty for channel to fill size of pdu
738 index_ = sizeof(rpc_sz_t);
739 *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
743 void marshall::pack_reply_header(const reply_header &h) {
744 size_t saved_sz = index_;
745 //leave the first 4-byte empty for channel to fill size of pdu
746 index_ = sizeof(rpc_sz_t);
747 *this << h.xid << h.ret;
751 // take the contents from another unmarshall object
753 unmarshall::take_in(unmarshall &another)
757 another.take_buf(&buf_, &sz_);
758 index_ = RPC_HEADER_SZ;
759 ok_ = sz_ >= RPC_HEADER_SZ?true:false;
763 unmarshall::ensure(size_t n) {
770 unmarshall::rawbyte()
774 return (uint8_t)buf_[index_++];
778 unmarshall::rawbytes(string &ss, size_t n)
781 ss.assign(buf_+index_, n);
787 unmarshall::rawbytes(T &t)
789 const size_t n = sizeof(T);
791 memcpy(&t, buf_+index_, n);
796 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
797 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
798 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
799 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
800 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
801 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
802 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
803 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
804 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
805 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
806 unmarshall & operator>>(unmarshall &u, string &s) {
807 unsigned sz = u.grab<unsigned>();
813 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
814 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
815 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
816 ((a.sin_port < b.sin_port))));
819 /*---------------auxilary function--------------*/
820 sockaddr_in make_sockaddr(const string &hostandport) {
821 auto colon = hostandport.find(':');
822 if (colon == string::npos)
823 return make_sockaddr("127.0.0.1", hostandport);
825 return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
828 sockaddr_in make_sockaddr(const string &host, const string &port) {
830 bzero(&dst, sizeof(dst));
831 dst.sin_family = AF_INET;
833 struct in_addr a{inet_addr(host.c_str())};
835 if(a.s_addr != INADDR_NONE)
836 dst.sin_addr.s_addr = a.s_addr;
838 struct hostent *hp = gethostbyname(host.c_str());
840 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
841 fprintf(stderr, "cannot find host name %s\n", host.c_str());
844 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
845 dst.sin_addr.s_addr = a.s_addr;
847 dst.sin_port = hton((uint16_t)stoi(port));