2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
67 #include "lang/verify.h"
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
77 rpcc::caller::~caller()
84 auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85 srandom((int)now.time_since_epoch().count()^((int)getpid()));
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
94 clt_nonce_ = random();
96 // special client nonce 0 means this client does not
97 // require at-most-once logic from the server
98 // because it uses tcp and never retries a failed connection
102 char *loss_env = getenv("RPC_LOSSY");
103 if(loss_env != NULL){
104 lossytest_ = atoi(loss_env);
107 // xid starts with 1 and latest received reply starts with 0
108 xid_rep_window_.push_back(0);
110 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111 clt_nonce_, lossytest_);
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
118 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119 clt_nonce_, chan_?chan_->channo():-1);
124 VERIFY(calls_.size() == 0);
131 int ret = call(rpc_const::bind, 0, r, to);
137 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138 inet_ntoa(dst_.sin_addr), ret);
143 // Cancel all outstanding calls
148 printf("rpcc::cancel: force callers to fail\n");
149 std::map<int,caller*>::iterator iter;
150 for(iter = calls_.begin(); iter != calls_.end(); iter++){
151 caller *ca = iter->second;
153 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
157 ca->intret = rpc_const::cancel_failure;
162 while (calls_.size () > 0){
163 destroy_wait_ = true;
164 destroy_wait_c_.wait(ml);
166 printf("rpcc::cancel: done\n");
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
179 if((proc != rpc_const::bind && !bind_done_) ||
180 (proc == rpc_const::bind && bind_done_)){
181 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182 return rpc_const::bind_failure;
186 return rpc_const::cancel_failure;
190 calls_[ca.xid] = &ca;
192 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
193 xid_rep_window_.front());
194 req.pack_req_header(h);
195 xid_rep = xid_rep_window_.front();
199 std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200 std::chrono::steady_clock::now() +
201 std::chrono::milliseconds(to.to),
204 curr_to.to = to_min.to;
206 bool transmit = true;
207 connection *ch = NULL;
217 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
222 if (forgot.isvalid())
223 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224 ch->send(req.cstr(), req.size());
226 else jsl_log(JSL_DBG_1, "not reachable\n");
228 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
229 clt_nonce_, proc, ca.xid, clt_nonce_);
231 transmit = false; // only send once on a given channel
234 if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
237 nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238 if(nextdeadline > finaldeadline) {
239 nextdeadline = finaldeadline;
240 finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
246 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
253 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
258 if(retrans_ && (!ch || ch->isdead())){
259 // since connection is dead, retransmit
260 // on the new connection
267 // no locking of ca.m since only this thread changes ca.xid
269 calls_.erase(ca.xid);
270 // may need to update the xid again here, in case the
271 // packet times out before it's even sent by the channel.
272 // I don't think there's any harm in maybe doing it twice
273 update_xid_rep(ca.xid);
276 destroy_wait_c_.notify_one();
280 if (ca.done && lossytest_)
283 if (!dup_req_.isvalid()) {
284 dup_req_.buf.assign(req.cstr(), req.size());
285 dup_req_.xid = ca.xid;
287 if (xid_rep > xid_rep_done_)
288 xid_rep_done_ = xid_rep;
294 "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
295 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296 ntohs(dst_.sin_port), ca.done, ca.intret);
301 // destruction of req automatically frees its buffer
302 return (ca.done? ca.intret : rpc_const::timeout_failure);
306 rpcc::get_refconn(connection **ch)
309 if(!chan_ || chan_->isdead()){
312 chan_ = connect_to_dst(dst_, this, lossytest_);
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
327 // this function keeps no reference for connection *c
329 rpcc::got_pdu(connection *c, char *b, int sz)
331 unmarshall rep(b, sz);
333 rep.unpack_reply_header(&h);
336 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
342 update_xid_rep(h.xid);
344 if(calls_.find(h.xid) == calls_.end()){
345 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
348 caller *ca = calls_[h.xid];
352 ca->un->take_in(rep);
355 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
364 // assumes thread holds mutex m
366 rpcc::update_xid_rep(unsigned int xid)
368 std::list<unsigned int>::iterator it;
370 if(xid <= xid_rep_window_.front()){
374 for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
376 xid_rep_window_.insert(it, xid);
380 xid_rep_window_.push_back(xid);
383 it = xid_rep_window_.begin();
384 for (it++; it != xid_rep_window_.end(); it++){
385 while (xid_rep_window_.front() + 1 == *it)
386 xid_rep_window_.pop_front();
391 rpcs::rpcs(unsigned int p1, int count)
392 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
396 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
398 char *loss_env = getenv("RPC_LOSSY");
399 if(loss_env != NULL){
400 lossytest_ = atoi(loss_env);
403 reg(rpc_const::bind, this, &rpcs::rpcbind);
404 dispatchpool_ = new ThrPool(6,false);
406 listener_ = new tcpsconn(this, port_, lossytest_);
411 // must delete listener before dispatchpool
413 delete dispatchpool_;
418 rpcs::got_pdu(connection *c, char *b, int sz)
421 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
425 djob_t *j = new djob_t(c, b, sz);
427 bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
428 if(!succ || !reachable_){
436 rpcs::reg1(unsigned int proc, handler *h)
439 VERIFY(procs_.count(proc) == 0);
441 VERIFY(procs_.count(proc) >= 1);
445 rpcs::updatestat(unsigned int proc)
450 if(curr_counts_ == 0){
451 std::map<int, int>::iterator i;
452 printf("RPC STATS: ");
453 for (i = counts_.begin(); i != counts_.end(); i++){
454 printf("%x:%d ", i->first, i->second);
458 lock rwl(reply_window_m_);
459 std::map<unsigned int,std::list<reply_t> >::iterator clt;
461 unsigned int totalrep = 0, maxrep = 0;
462 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
463 totalrep += clt->second.size();
464 if(clt->second.size() > maxrep)
465 maxrep = clt->second.size();
467 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
468 (int) reply_window_.size()-1, totalrep, maxrep);
469 curr_counts_ = counting_;
474 rpcs::dispatch(djob_t *j)
476 connection *c = j->conn;
477 unmarshall req(j->buf, j->sz);
481 req.unpack_req_header(&h);
485 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
491 "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
492 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
495 reply_header rh(h.xid,0);
497 // is client sending to an old instance of server?
498 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
500 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
501 h.srv_nonce, nonce_, h.proc);
502 rh.ret = rpc_const::oldsrv_failure;
503 rep.pack_reply_header(rh);
504 c->send(rep.cstr(),rep.size());
509 // is RPC proc a registered procedure?
512 if(procs_.count(proc) < 1){
513 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
523 rpcs::rpcstate_t stat;
528 // have i seen this client before?
530 lock rwl(reply_window_m_);
531 // if we don't know about this clt_nonce, create a cleanup object
532 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
533 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
534 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
536 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
537 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
541 // save the latest good connection to the client
544 if(conns_.find(h.clt_nonce) == conns_.end()){
546 conns_[h.clt_nonce] = c;
547 } else if(conns_[h.clt_nonce]->compare(c) < 0){
548 conns_[h.clt_nonce]->decref();
550 conns_[h.clt_nonce] = c;
554 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
555 h.xid_rep, &b1, &sz1);
557 // this client does not require at most once logic
562 case NEW: // new request
567 rh.ret = f->fn(req, rep);
568 if (rh.ret == rpc_const::unmarshal_args_failure) {
569 fprintf(stderr, "rpcs::dispatch: failed to"
570 " unmarshall the arguments. You are"
571 " probably calling RPC 0x%x with wrong"
572 " types of arguments.\n", proc);
577 rep.pack_reply_header(rh);
578 rep.take_buf(&b1,&sz1);
581 "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
582 sz1, h.xid, proc, rh.ret, h.clt_nonce);
585 // only record replies for clients that require at-most-once logic
586 add_reply(h.clt_nonce, h.xid, b1, sz1);
589 // get the latest connection to the client
592 if(c->isdead() && c != conns_[h.clt_nonce]){
594 c = conns_[h.clt_nonce];
600 if(h.clt_nonce == 0){
601 // reply is not added to at-most-once window, free it
605 case INPROGRESS: // server is working on this request
607 case DONE: // duplicate and we still have the response
610 case FORGOTTEN: // very old request and we don't have the response anymore
611 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
613 rh.ret = rpc_const::atmostonce_failure;
614 rep.pack_reply_header(rh);
615 c->send(rep.cstr(),rep.size());
621 // rpcs::dispatch calls this when an RPC request arrives.
623 // checks to see if an RPC with xid from clt_nonce has already been received.
624 // if not, remembers the request in reply_window_.
626 // deletes remembered requests with XIDs <= xid_rep; the client
627 // says it has received a reply for every RPC up through xid_rep.
628 // frees the reply_t::buf of each such request.
631 // NEW: never seen this xid before.
632 // INPROGRESS: seen this xid, and still processing it.
633 // DONE: seen this xid, previous reply returned in *b and *sz.
634 // FORGOTTEN: might have seen this xid, but deleted previous reply.
636 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
637 unsigned int xid_rep, char **b, int *sz)
639 lock rwl(reply_window_m_);
641 std::list<reply_t> &l = reply_window_[clt_nonce];
643 VERIFY(l.size() > 0);
644 VERIFY(xid >= xid_rep);
646 unsigned int past_xid_rep = l.begin()->xid;
648 std::list<reply_t>::iterator start = l.begin(), it;
651 if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
652 // scan for deletion candidates
653 for (; it != l.end() && it->xid < xid_rep; it++) {
658 l.begin()->xid = xid_rep;
661 if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
664 // skip non-deletion candidates
665 while (it != l.end() && it->xid < xid)
668 // if it's in the list it must be right here
669 if (it != l.end() && it->xid == xid) {
670 if (it->cb_present) {
671 // return information about the remembered reply
679 // remember that a new request has arrived
680 l.insert(it, reply_t(xid));
685 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
686 // and passes the return value in b and sz.
687 // add_reply() should remember b and sz.
688 // free_reply_window() and checkduplicate_and_update is responsible for
691 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
694 lock rwl(reply_window_m_);
695 // remember the RPC reply value
696 std::list<reply_t> &l = reply_window_[clt_nonce];
697 std::list<reply_t>::iterator it = l.begin();
698 // skip to our place in the list
699 for (it++; it != l.end() && it->xid < xid; it++);
700 // there should already be an entry, so whine if there isn't
701 if (it == l.end() || it->xid != xid) {
702 fprintf(stderr, "Could not find reply struct in add_reply");
703 l.insert(it, reply_t(xid, b, sz));
705 *it = reply_t(xid, b, sz);
710 rpcs::free_reply_window(void)
712 std::map<unsigned int,std::list<reply_t> >::iterator clt;
713 std::list<reply_t>::iterator it;
715 lock rwl(reply_window_m_);
716 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
717 for (it = clt->second.begin(); it != clt->second.end(); it++){
723 reply_window_.clear();
728 rpcs::rpcbind(int a, int &r)
730 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
736 marshall::rawbyte(unsigned char x)
740 VERIFY (_buf != NULL);
741 _buf = (char *)realloc(_buf, _capa);
748 marshall::rawbytes(const char *p, int n)
750 if((_ind+n) > _capa){
751 _capa = _capa > n? 2*_capa:(_capa+n);
752 VERIFY (_buf != NULL);
753 _buf = (char *)realloc(_buf, _capa);
756 memcpy(_buf+_ind, p, n);
761 operator<<(marshall &m, bool x)
768 operator<<(marshall &m, unsigned char x)
775 operator<<(marshall &m, char x)
777 m << (unsigned char) x;
783 operator<<(marshall &m, unsigned short x)
785 m.rawbyte((x >> 8) & 0xff);
791 operator<<(marshall &m, short x)
793 m << (unsigned short) x;
798 operator<<(marshall &m, unsigned int x)
800 // network order is big-endian
801 m.rawbyte((x >> 24) & 0xff);
802 m.rawbyte((x >> 16) & 0xff);
803 m.rawbyte((x >> 8) & 0xff);
809 operator<<(marshall &m, int x)
811 m << (unsigned int) x;
816 operator<<(marshall &m, const std::string &s)
818 m << (unsigned int) s.size();
819 m.rawbytes(s.data(), s.size());
824 operator<<(marshall &m, unsigned long long x)
826 m << (unsigned int) (x >> 32);
827 m << (unsigned int) x;
832 marshall::pack(int x)
834 rawbyte((x >> 24) & 0xff);
835 rawbyte((x >> 16) & 0xff);
836 rawbyte((x >> 8) & 0xff);
841 unmarshall::unpack(int *x)
843 (*x) = (rawbyte() & 0xff) << 24;
844 (*x) |= (rawbyte() & 0xff) << 16;
845 (*x) |= (rawbyte() & 0xff) << 8;
846 (*x) |= rawbyte() & 0xff;
849 // take the contents from another unmarshall object
851 unmarshall::take_in(unmarshall &another)
855 another.take_buf(&_buf, &_sz);
856 _ind = RPC_HEADER_SZ;
857 _ok = _sz >= RPC_HEADER_SZ?true:false;
863 if(ok() && _ind == _sz){
871 unmarshall::rawbyte()
882 operator>>(unmarshall &u, bool &x)
884 x = (bool) u.rawbyte() ;
889 operator>>(unmarshall &u, unsigned char &x)
891 x = (unsigned char) u.rawbyte() ;
896 operator>>(unmarshall &u, char &x)
898 x = (char) u.rawbyte();
904 operator>>(unmarshall &u, unsigned short &x)
906 x = (u.rawbyte() & 0xff) << 8;
907 x |= u.rawbyte() & 0xff;
912 operator>>(unmarshall &u, short &x)
914 x = (u.rawbyte() & 0xff) << 8;
915 x |= u.rawbyte() & 0xff;
920 operator>>(unmarshall &u, unsigned int &x)
922 x = (u.rawbyte() & 0xff) << 24;
923 x |= (u.rawbyte() & 0xff) << 16;
924 x |= (u.rawbyte() & 0xff) << 8;
925 x |= u.rawbyte() & 0xff;
930 operator>>(unmarshall &u, int &x)
932 x = (u.rawbyte() & 0xff) << 24;
933 x |= (u.rawbyte() & 0xff) << 16;
934 x |= (u.rawbyte() & 0xff) << 8;
935 x |= u.rawbyte() & 0xff;
940 operator>>(unmarshall &u, unsigned long long &x)
945 x = l | ((unsigned long long) h << 32);
950 operator>>(unmarshall &u, std::string &s)
960 unmarshall::rawbytes(std::string &ss, unsigned int n)
962 if((_ind+n) > (unsigned)_sz){
965 std::string tmps = std::string(_buf+_ind, n);
967 VERIFY(ss.size() == n);
972 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
973 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
974 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
975 ((a.sin_port < b.sin_port))));
978 /*---------------auxilary function--------------*/
980 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
983 const char *localhost = "127.0.0.1";
984 const char *port = index(hostandport, ':');
986 memcpy(host, localhost, strlen(localhost)+1);
989 memcpy(host, hostandport, port-hostandport);
990 host[port-hostandport] = '\0';
994 make_sockaddr(host, port, dst);
999 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1003 bzero(dst, sizeof(*dst));
1004 dst->sin_family = AF_INET;
1006 a = inet_addr(host);
1007 if(a != INADDR_NONE){
1008 dst->sin_addr.s_addr = a;
1010 struct hostent *hp = gethostbyname(host);
1011 if(hp == 0 || hp->h_length != 4){
1012 fprintf(stderr, "cannot find host name %s\n", host);
1015 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1017 dst->sin_port = htons(atoi(port));