2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
67 #include "lang/verify.h"
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
77 rpcc::caller::~caller()
84 auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85 srandom((int)now.time_since_epoch().count()^((int)getpid()));
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
94 clt_nonce_ = random();
96 // special client nonce 0 means this client does not
97 // require at-most-once logic from the server
98 // because it uses tcp and never retries a failed connection
102 char *loss_env = getenv("RPC_LOSSY");
103 if(loss_env != NULL){
104 lossytest_ = atoi(loss_env);
107 // xid starts with 1 and latest received reply starts with 0
108 xid_rep_window_.push_back(0);
110 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111 clt_nonce_, lossytest_);
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
118 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119 clt_nonce_, chan_?chan_->channo():-1);
124 VERIFY(calls_.size() == 0);
131 int ret = call_timeout(rpc_const::bind, to, r, 0);
137 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138 inet_ntoa(dst_.sin_addr), ret);
143 // Cancel all outstanding calls
148 printf("rpcc::cancel: force callers to fail\n");
149 std::map<int,caller*>::iterator iter;
150 for(iter = calls_.begin(); iter != calls_.end(); iter++){
151 caller *ca = iter->second;
153 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
157 ca->intret = rpc_const::cancel_failure;
162 while (calls_.size () > 0){
163 destroy_wait_ = true;
164 destroy_wait_c_.wait(ml);
166 printf("rpcc::cancel: done\n");
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
179 if((proc != rpc_const::bind && !bind_done_) ||
180 (proc == rpc_const::bind && bind_done_)){
181 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182 return rpc_const::bind_failure;
186 return rpc_const::cancel_failure;
190 calls_[ca.xid] = &ca;
192 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
193 xid_rep_window_.front());
194 req.pack_req_header(h);
195 xid_rep = xid_rep_window_.front();
199 std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200 std::chrono::steady_clock::now() +
201 std::chrono::milliseconds(to.to),
204 curr_to.to = to_min.to;
206 bool transmit = true;
207 connection *ch = NULL;
217 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
222 if (forgot.isvalid())
223 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224 ch->send(req.cstr(), req.size());
226 else jsl_log(JSL_DBG_1, "not reachable\n");
228 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
229 clt_nonce_, proc, ca.xid, clt_nonce_);
231 transmit = false; // only send once on a given channel
234 if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
237 nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238 if(nextdeadline > finaldeadline) {
239 nextdeadline = finaldeadline;
240 finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
246 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
253 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
258 if(retrans_ && (!ch || ch->isdead())){
259 // since connection is dead, retransmit
260 // on the new connection
267 // no locking of ca.m since only this thread changes ca.xid
269 calls_.erase(ca.xid);
270 // may need to update the xid again here, in case the
271 // packet times out before it's even sent by the channel.
272 // I don't think there's any harm in maybe doing it twice
273 update_xid_rep(ca.xid);
276 destroy_wait_c_.notify_one();
280 if (ca.done && lossytest_)
283 if (!dup_req_.isvalid()) {
284 dup_req_.buf.assign(req.cstr(), req.size());
285 dup_req_.xid = ca.xid;
287 if (xid_rep > xid_rep_done_)
288 xid_rep_done_ = xid_rep;
294 "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
295 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296 ntohs(dst_.sin_port), ca.done, ca.intret);
301 // destruction of req automatically frees its buffer
302 return (ca.done? ca.intret : rpc_const::timeout_failure);
306 rpcc::get_refconn(connection **ch)
309 if(!chan_ || chan_->isdead()){
312 chan_ = connect_to_dst(dst_, this, lossytest_);
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
327 // this function keeps no reference for connection *c
329 rpcc::got_pdu(connection *c, char *b, int sz)
331 unmarshall rep(b, sz);
333 rep.unpack_reply_header(&h);
336 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
342 update_xid_rep(h.xid);
344 if(calls_.find(h.xid) == calls_.end()){
345 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
348 caller *ca = calls_[h.xid];
352 ca->un->take_in(rep);
355 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
364 // assumes thread holds mutex m
366 rpcc::update_xid_rep(unsigned int xid)
368 std::list<unsigned int>::iterator it;
370 if(xid <= xid_rep_window_.front()){
374 for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
376 xid_rep_window_.insert(it, xid);
380 xid_rep_window_.push_back(xid);
383 it = xid_rep_window_.begin();
384 for (it++; it != xid_rep_window_.end(); it++){
385 while (xid_rep_window_.front() + 1 == *it)
386 xid_rep_window_.pop_front();
390 rpcs::rpcs(unsigned int p1, int count)
391 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
395 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
397 char *loss_env = getenv("RPC_LOSSY");
398 if(loss_env != NULL){
399 lossytest_ = atoi(loss_env);
402 reg(rpc_const::bind, &rpcs::rpcbind, this);
403 dispatchpool_ = new ThrPool(6,false);
405 listener_ = new tcpsconn(this, port_, lossytest_);
410 // must delete listener before dispatchpool
412 delete dispatchpool_;
417 rpcs::got_pdu(connection *c, char *b, int sz)
420 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
424 djob_t *j = new djob_t(c, b, sz);
426 bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
427 if(!succ || !reachable_){
435 rpcs::reg1(unsigned int proc, handler *h)
438 VERIFY(procs_.count(proc) == 0);
440 VERIFY(procs_.count(proc) >= 1);
444 rpcs::updatestat(unsigned int proc)
449 if(curr_counts_ == 0){
450 std::map<int, int>::iterator i;
451 printf("RPC STATS: ");
452 for (i = counts_.begin(); i != counts_.end(); i++){
453 printf("%x:%d ", i->first, i->second);
457 lock rwl(reply_window_m_);
458 std::map<unsigned int,std::list<reply_t> >::iterator clt;
460 unsigned int totalrep = 0, maxrep = 0;
461 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
462 totalrep += clt->second.size();
463 if(clt->second.size() > maxrep)
464 maxrep = clt->second.size();
466 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
467 (int) reply_window_.size()-1, totalrep, maxrep);
468 curr_counts_ = counting_;
473 rpcs::dispatch(djob_t *j)
475 connection *c = j->conn;
476 unmarshall req(j->buf, j->sz);
480 req.unpack_req_header(&h);
484 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
490 "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
491 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
494 reply_header rh(h.xid,0);
496 // is client sending to an old instance of server?
497 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
499 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
500 h.srv_nonce, nonce_, h.proc);
501 rh.ret = rpc_const::oldsrv_failure;
502 rep.pack_reply_header(rh);
503 c->send(rep.cstr(),rep.size());
508 // is RPC proc a registered procedure?
511 if(procs_.count(proc) < 1){
512 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
522 rpcs::rpcstate_t stat;
527 // have i seen this client before?
529 lock rwl(reply_window_m_);
530 // if we don't know about this clt_nonce, create a cleanup object
531 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
532 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
533 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
535 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
536 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
540 // save the latest good connection to the client
543 if(conns_.find(h.clt_nonce) == conns_.end()){
545 conns_[h.clt_nonce] = c;
546 } else if(conns_[h.clt_nonce]->compare(c) < 0){
547 conns_[h.clt_nonce]->decref();
549 conns_[h.clt_nonce] = c;
553 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
554 h.xid_rep, &b1, &sz1);
556 // this client does not require at most once logic
561 case NEW: // new request
566 rh.ret = (*f)(req, rep);
567 if (rh.ret == rpc_const::unmarshal_args_failure) {
568 fprintf(stderr, "rpcs::dispatch: failed to"
569 " unmarshall the arguments. You are"
570 " probably calling RPC 0x%x with wrong"
571 " types of arguments.\n", proc);
576 rep.pack_reply_header(rh);
577 rep.take_buf(&b1,&sz1);
580 "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
581 sz1, h.xid, proc, rh.ret, h.clt_nonce);
584 // only record replies for clients that require at-most-once logic
585 add_reply(h.clt_nonce, h.xid, b1, sz1);
588 // get the latest connection to the client
591 if(c->isdead() && c != conns_[h.clt_nonce]){
593 c = conns_[h.clt_nonce];
599 if(h.clt_nonce == 0){
600 // reply is not added to at-most-once window, free it
604 case INPROGRESS: // server is working on this request
606 case DONE: // duplicate and we still have the response
609 case FORGOTTEN: // very old request and we don't have the response anymore
610 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
612 rh.ret = rpc_const::atmostonce_failure;
613 rep.pack_reply_header(rh);
614 c->send(rep.cstr(),rep.size());
620 // rpcs::dispatch calls this when an RPC request arrives.
622 // checks to see if an RPC with xid from clt_nonce has already been received.
623 // if not, remembers the request in reply_window_.
625 // deletes remembered requests with XIDs <= xid_rep; the client
626 // says it has received a reply for every RPC up through xid_rep.
627 // frees the reply_t::buf of each such request.
630 // NEW: never seen this xid before.
631 // INPROGRESS: seen this xid, and still processing it.
632 // DONE: seen this xid, previous reply returned in *b and *sz.
633 // FORGOTTEN: might have seen this xid, but deleted previous reply.
635 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
636 unsigned int xid_rep, char **b, int *sz)
638 lock rwl(reply_window_m_);
640 std::list<reply_t> &l = reply_window_[clt_nonce];
642 VERIFY(l.size() > 0);
643 VERIFY(xid >= xid_rep);
645 unsigned int past_xid_rep = l.begin()->xid;
647 std::list<reply_t>::iterator start = l.begin(), it;
650 if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
651 // scan for deletion candidates
652 for (; it != l.end() && it->xid < xid_rep; it++) {
657 l.begin()->xid = xid_rep;
660 if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
663 // skip non-deletion candidates
664 while (it != l.end() && it->xid < xid)
667 // if it's in the list it must be right here
668 if (it != l.end() && it->xid == xid) {
669 if (it->cb_present) {
670 // return information about the remembered reply
678 // remember that a new request has arrived
679 l.insert(it, reply_t(xid));
684 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
685 // and passes the return value in b and sz.
686 // add_reply() should remember b and sz.
687 // free_reply_window() and checkduplicate_and_update is responsible for
690 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
693 lock rwl(reply_window_m_);
694 // remember the RPC reply value
695 std::list<reply_t> &l = reply_window_[clt_nonce];
696 std::list<reply_t>::iterator it = l.begin();
697 // skip to our place in the list
698 for (it++; it != l.end() && it->xid < xid; it++);
699 // there should already be an entry, so whine if there isn't
700 if (it == l.end() || it->xid != xid) {
701 fprintf(stderr, "Could not find reply struct in add_reply");
702 l.insert(it, reply_t(xid, b, sz));
704 *it = reply_t(xid, b, sz);
709 rpcs::free_reply_window(void)
711 std::map<unsigned int,std::list<reply_t> >::iterator clt;
712 std::list<reply_t>::iterator it;
714 lock rwl(reply_window_m_);
715 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
716 for (it = clt->second.begin(); it != clt->second.end(); it++){
722 reply_window_.clear();
727 rpcs::rpcbind(int &r, int a)
729 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
735 marshall::rawbyte(unsigned char x)
739 VERIFY (_buf != NULL);
740 _buf = (char *)realloc(_buf, _capa);
747 marshall::rawbytes(const char *p, int n)
749 if((_ind+n) > _capa){
750 _capa = _capa > n? 2*_capa:(_capa+n);
751 VERIFY (_buf != NULL);
752 _buf = (char *)realloc(_buf, _capa);
755 memcpy(_buf+_ind, p, n);
760 operator<<(marshall &m, bool x)
767 operator<<(marshall &m, unsigned char x)
774 operator<<(marshall &m, char x)
776 m << (unsigned char) x;
782 operator<<(marshall &m, unsigned short x)
784 m.rawbyte((x >> 8) & 0xff);
790 operator<<(marshall &m, short x)
792 m << (unsigned short) x;
797 operator<<(marshall &m, unsigned int x)
799 // network order is big-endian
800 m.rawbyte((x >> 24) & 0xff);
801 m.rawbyte((x >> 16) & 0xff);
802 m.rawbyte((x >> 8) & 0xff);
808 operator<<(marshall &m, int x)
810 m << (unsigned int) x;
815 operator<<(marshall &m, const std::string &s)
817 m << (unsigned int) s.size();
818 m.rawbytes(s.data(), s.size());
823 operator<<(marshall &m, unsigned long long x)
825 m << (unsigned int) (x >> 32);
826 m << (unsigned int) x;
831 marshall::pack(int x)
833 rawbyte((x >> 24) & 0xff);
834 rawbyte((x >> 16) & 0xff);
835 rawbyte((x >> 8) & 0xff);
840 unmarshall::unpack(int *x)
842 (*x) = (rawbyte() & 0xff) << 24;
843 (*x) |= (rawbyte() & 0xff) << 16;
844 (*x) |= (rawbyte() & 0xff) << 8;
845 (*x) |= rawbyte() & 0xff;
848 // take the contents from another unmarshall object
850 unmarshall::take_in(unmarshall &another)
854 another.take_buf(&_buf, &_sz);
855 _ind = RPC_HEADER_SZ;
856 _ok = _sz >= RPC_HEADER_SZ?true:false;
862 if(ok() && _ind == _sz){
870 unmarshall::rawbyte()
881 operator>>(unmarshall &u, bool &x)
883 x = (bool) u.rawbyte() ;
888 operator>>(unmarshall &u, unsigned char &x)
890 x = (unsigned char) u.rawbyte() ;
895 operator>>(unmarshall &u, char &x)
897 x = (char) u.rawbyte();
903 operator>>(unmarshall &u, unsigned short &x)
905 x = (u.rawbyte() & 0xff) << 8;
906 x |= u.rawbyte() & 0xff;
911 operator>>(unmarshall &u, short &x)
913 x = (u.rawbyte() & 0xff) << 8;
914 x |= u.rawbyte() & 0xff;
919 operator>>(unmarshall &u, unsigned int &x)
921 x = (u.rawbyte() & 0xff) << 24;
922 x |= (u.rawbyte() & 0xff) << 16;
923 x |= (u.rawbyte() & 0xff) << 8;
924 x |= u.rawbyte() & 0xff;
929 operator>>(unmarshall &u, int &x)
931 x = (u.rawbyte() & 0xff) << 24;
932 x |= (u.rawbyte() & 0xff) << 16;
933 x |= (u.rawbyte() & 0xff) << 8;
934 x |= u.rawbyte() & 0xff;
939 operator>>(unmarshall &u, unsigned long long &x)
944 x = l | ((unsigned long long) h << 32);
949 operator>>(unmarshall &u, std::string &s)
959 unmarshall::rawbytes(std::string &ss, unsigned int n)
961 if((_ind+n) > (unsigned)_sz){
964 std::string tmps = std::string(_buf+_ind, n);
966 VERIFY(ss.size() == n);
971 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
972 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
973 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
974 ((a.sin_port < b.sin_port))));
977 /*---------------auxilary function--------------*/
979 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
982 const char *localhost = "127.0.0.1";
983 const char *port = index(hostandport, ':');
985 memcpy(host, localhost, strlen(localhost)+1);
988 memcpy(host, hostandport, port-hostandport);
989 host[port-hostandport] = '\0';
993 make_sockaddr(host, port, dst);
998 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1002 bzero(dst, sizeof(*dst));
1003 dst->sin_family = AF_INET;
1005 a = inet_addr(host);
1006 if(a != INADDR_NONE){
1007 dst->sin_addr.s_addr = a;
1009 struct hostent *hp = gethostbyname(host);
1010 if(hp == 0 || hp->h_length != 4){
1011 fprintf(stderr, "cannot find host name %s\n", host);
1014 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1016 dst->sin_port = htons(atoi(port));