2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
67 #include "lang/verify.h"
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
77 rpcc::caller::~caller()
84 auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85 srandom((int)now.time_since_epoch().count()^((int)getpid()));
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
94 clt_nonce_ = random();
96 // special client nonce 0 means this client does not
97 // require at-most-once logic from the server
98 // because it uses tcp and never retries a failed connection
102 char *loss_env = getenv("RPC_LOSSY");
103 if(loss_env != NULL){
104 lossytest_ = atoi(loss_env);
107 // xid starts with 1 and latest received reply starts with 0
108 xid_rep_window_.push_back(0);
110 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111 clt_nonce_, lossytest_);
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
118 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119 clt_nonce_, chan_?chan_->channo():-1);
124 VERIFY(calls_.size() == 0);
131 int ret = call_timeout(rpc_const::bind, to, r, 0);
137 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138 inet_ntoa(dst_.sin_addr), ret);
143 // Cancel all outstanding calls
148 printf("rpcc::cancel: force callers to fail\n");
149 std::map<int,caller*>::iterator iter;
150 for(iter = calls_.begin(); iter != calls_.end(); iter++){
151 caller *ca = iter->second;
153 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
157 ca->intret = rpc_const::cancel_failure;
162 while (calls_.size () > 0){
163 destroy_wait_ = true;
164 destroy_wait_c_.wait(ml);
166 printf("rpcc::cancel: done\n");
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
179 if((proc != rpc_const::bind && !bind_done_) ||
180 (proc == rpc_const::bind && bind_done_)){
181 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182 return rpc_const::bind_failure;
186 return rpc_const::cancel_failure;
190 calls_[ca.xid] = &ca;
192 req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
193 xid_rep = xid_rep_window_.front();
197 std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
198 std::chrono::steady_clock::now() +
199 std::chrono::milliseconds(to.to),
202 curr_to.to = to_min.to;
204 bool transmit = true;
205 connection *ch = NULL;
215 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
220 if (forgot.isvalid())
221 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
222 ch->send(req.cstr(), req.size());
224 else jsl_log(JSL_DBG_1, "not reachable\n");
226 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
227 clt_nonce_, proc, ca.xid, clt_nonce_);
229 transmit = false; // only send once on a given channel
232 if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
235 nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
236 if(nextdeadline > finaldeadline) {
237 nextdeadline = finaldeadline;
238 finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
244 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
245 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
246 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
251 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
256 if(retrans_ && (!ch || ch->isdead())){
257 // since connection is dead, retransmit
258 // on the new connection
265 // no locking of ca.m since only this thread changes ca.xid
267 calls_.erase(ca.xid);
268 // may need to update the xid again here, in case the
269 // packet times out before it's even sent by the channel.
270 // I don't think there's any harm in maybe doing it twice
271 update_xid_rep(ca.xid);
274 destroy_wait_c_.notify_one();
278 if (ca.done && lossytest_)
281 if (!dup_req_.isvalid()) {
282 dup_req_.buf.assign(req.cstr(), req.size());
283 dup_req_.xid = ca.xid;
285 if (xid_rep > xid_rep_done_)
286 xid_rep_done_ = xid_rep;
292 "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
293 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
294 ntohs(dst_.sin_port), ca.done, ca.intret);
299 // destruction of req automatically frees its buffer
300 return (ca.done? ca.intret : rpc_const::timeout_failure);
304 rpcc::get_refconn(connection **ch)
307 if(!chan_ || chan_->isdead()){
310 chan_ = connect_to_dst(dst_, this, lossytest_);
321 // PollMgr's thread is being used to
322 // make this upcall from connection object to rpcc.
323 // this funtion must not block.
325 // this function keeps no reference for connection *c
327 rpcc::got_pdu(connection *c, char *b, size_t sz)
329 unmarshall rep(b, sz);
331 rep.unpack_reply_header(&h);
334 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
340 update_xid_rep(h.xid);
342 if(calls_.find(h.xid) == calls_.end()){
343 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
346 caller *ca = calls_[h.xid];
350 ca->un->take_in(rep);
353 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
362 // assumes thread holds mutex m
364 rpcc::update_xid_rep(unsigned int xid)
366 std::list<unsigned int>::iterator it;
368 if(xid <= xid_rep_window_.front()){
372 for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
374 xid_rep_window_.insert(it, xid);
378 xid_rep_window_.push_back(xid);
381 it = xid_rep_window_.begin();
382 for (it++; it != xid_rep_window_.end(); it++){
383 while (xid_rep_window_.front() + 1 == *it)
384 xid_rep_window_.pop_front();
388 rpcs::rpcs(unsigned int p1, int count)
389 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
393 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
395 char *loss_env = getenv("RPC_LOSSY");
396 if(loss_env != NULL){
397 lossytest_ = atoi(loss_env);
400 reg(rpc_const::bind, &rpcs::rpcbind, this);
401 dispatchpool_ = new ThrPool(6,false);
403 listener_ = new tcpsconn(this, port_, lossytest_);
408 // must delete listener before dispatchpool
410 delete dispatchpool_;
415 rpcs::got_pdu(connection *c, char *b, size_t sz)
418 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
422 djob_t *j = new djob_t(c, b, sz);
424 bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
425 if(!succ || !reachable_){
433 rpcs::reg1(unsigned int proc, handler *h)
436 VERIFY(procs_.count(proc) == 0);
438 VERIFY(procs_.count(proc) >= 1);
442 rpcs::updatestat(unsigned int proc)
447 if(curr_counts_ == 0){
448 std::map<int, int>::iterator i;
449 printf("RPC STATS: ");
450 for (i = counts_.begin(); i != counts_.end(); i++){
451 printf("%x:%d ", i->first, i->second);
455 lock rwl(reply_window_m_);
456 std::map<unsigned int,std::list<reply_t> >::iterator clt;
458 unsigned int totalrep = 0, maxrep = 0;
459 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
460 totalrep += clt->second.size();
461 if(clt->second.size() > maxrep)
462 maxrep = clt->second.size();
464 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
465 (int) reply_window_.size()-1, totalrep, maxrep);
466 curr_counts_ = counting_;
471 rpcs::dispatch(djob_t *j)
473 connection *c = j->conn;
474 unmarshall req(j->buf, j->sz);
478 req.unpack_req_header(&h);
482 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
488 "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
489 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
492 reply_header rh(h.xid,0);
494 // is client sending to an old instance of server?
495 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
497 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
498 h.srv_nonce, nonce_, h.proc);
499 rh.ret = rpc_const::oldsrv_failure;
500 rep.pack_reply_header(rh);
501 c->send(rep.cstr(),rep.size());
506 // is RPC proc a registered procedure?
509 if(procs_.count(proc) < 1){
510 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
520 rpcs::rpcstate_t stat;
525 // have i seen this client before?
527 lock rwl(reply_window_m_);
528 // if we don't know about this clt_nonce, create a cleanup object
529 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
530 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
531 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
533 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
534 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
538 // save the latest good connection to the client
541 if(conns_.find(h.clt_nonce) == conns_.end()){
543 conns_[h.clt_nonce] = c;
544 } else if(conns_[h.clt_nonce]->compare(c) < 0){
545 conns_[h.clt_nonce]->decref();
547 conns_[h.clt_nonce] = c;
551 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
552 h.xid_rep, &b1, &sz1);
554 // this client does not require at most once logic
559 case NEW: // new request
564 rh.ret = (*f)(req, rep);
565 if (rh.ret == rpc_const::unmarshal_args_failure) {
566 fprintf(stderr, "rpcs::dispatch: failed to"
567 " unmarshall the arguments. You are"
568 " probably calling RPC 0x%x with wrong"
569 " types of arguments.\n", proc);
574 rep.pack_reply_header(rh);
575 rep.take_buf(&b1,&sz1);
578 "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
579 sz1, h.xid, proc, rh.ret, h.clt_nonce);
582 // only record replies for clients that require at-most-once logic
583 add_reply(h.clt_nonce, h.xid, b1, sz1);
586 // get the latest connection to the client
589 if(c->isdead() && c != conns_[h.clt_nonce]){
591 c = conns_[h.clt_nonce];
597 if(h.clt_nonce == 0){
598 // reply is not added to at-most-once window, free it
602 case INPROGRESS: // server is working on this request
604 case DONE: // duplicate and we still have the response
607 case FORGOTTEN: // very old request and we don't have the response anymore
608 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
610 rh.ret = rpc_const::atmostonce_failure;
611 rep.pack_reply_header(rh);
612 c->send(rep.cstr(),rep.size());
618 // rpcs::dispatch calls this when an RPC request arrives.
620 // checks to see if an RPC with xid from clt_nonce has already been received.
621 // if not, remembers the request in reply_window_.
623 // deletes remembered requests with XIDs <= xid_rep; the client
624 // says it has received a reply for every RPC up through xid_rep.
625 // frees the reply_t::buf of each such request.
628 // NEW: never seen this xid before.
629 // INPROGRESS: seen this xid, and still processing it.
630 // DONE: seen this xid, previous reply returned in *b and *sz.
631 // FORGOTTEN: might have seen this xid, but deleted previous reply.
633 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
634 unsigned int xid_rep, char **b, int *sz)
636 lock rwl(reply_window_m_);
638 std::list<reply_t> &l = reply_window_[clt_nonce];
640 VERIFY(l.size() > 0);
641 VERIFY(xid >= xid_rep);
643 unsigned int past_xid_rep = l.begin()->xid;
645 std::list<reply_t>::iterator start = l.begin(), it;
648 if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
649 // scan for deletion candidates
650 for (; it != l.end() && it->xid < xid_rep; it++) {
655 l.begin()->xid = xid_rep;
658 if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
661 // skip non-deletion candidates
662 while (it != l.end() && it->xid < xid)
665 // if it's in the list it must be right here
666 if (it != l.end() && it->xid == xid) {
667 if (it->cb_present) {
668 // return information about the remembered reply
676 // remember that a new request has arrived
677 l.insert(it, reply_t(xid));
682 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
683 // and passes the return value in b and sz.
684 // add_reply() should remember b and sz.
685 // free_reply_window() and checkduplicate_and_update is responsible for
688 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
691 lock rwl(reply_window_m_);
692 // remember the RPC reply value
693 std::list<reply_t> &l = reply_window_[clt_nonce];
694 std::list<reply_t>::iterator it = l.begin();
695 // skip to our place in the list
696 for (it++; it != l.end() && it->xid < xid; it++);
697 // there should already be an entry, so whine if there isn't
698 if (it == l.end() || it->xid != xid) {
699 fprintf(stderr, "Could not find reply struct in add_reply");
700 l.insert(it, reply_t(xid, b, sz));
702 *it = reply_t(xid, b, sz);
707 rpcs::free_reply_window(void)
709 std::map<unsigned int,std::list<reply_t> >::iterator clt;
710 std::list<reply_t>::iterator it;
712 lock rwl(reply_window_m_);
713 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
714 for (it = clt->second.begin(); it != clt->second.end(); it++){
720 reply_window_.clear();
725 rpcs::rpcbind(int &r, int a)
727 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
733 operator<<(marshall &m, uint8_t x) {
739 operator<<(marshall &m, uint16_t x) {
741 m.rawbytes((char *)&x, 2);
746 operator<<(marshall &m, uint32_t x) {
748 m.rawbytes((char *)&x, 4);
752 marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
753 marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
754 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
755 marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
756 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
759 operator<<(marshall &m, const std::string &s) {
760 m << (unsigned int) s.size();
761 m.rawbytes(s.data(), s.size());
765 void marshall::pack_req_header(const request_header &h) {
766 int saved_sz = index_;
767 //leave the first 4-byte empty for channel to fill size of pdu
768 index_ = sizeof(rpc_sz_t);
769 *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
773 void marshall::pack_reply_header(const reply_header &h) {
774 int saved_sz = index_;
775 //leave the first 4-byte empty for channel to fill size of pdu
776 index_ = sizeof(rpc_sz_t);
777 *this << h.xid << h.ret;
782 unmarshall::unpack(int *x)
784 (*x) = (rawbyte() & 0xff) << 24;
785 (*x) |= (rawbyte() & 0xff) << 16;
786 (*x) |= (rawbyte() & 0xff) << 8;
787 (*x) |= rawbyte() & 0xff;
790 // take the contents from another unmarshall object
792 unmarshall::take_in(unmarshall &another)
796 another.take_buf(&buf_, &sz_);
797 index_ = RPC_HEADER_SZ;
798 ok_ = sz_ >= RPC_HEADER_SZ?true:false;
802 unmarshall::ensure(size_t n) {
809 unmarshall::rawbyte()
813 return buf_[index_++];
817 unmarshall::rawbytes(std::string &ss, size_t n)
820 ss.assign(buf_+index_, n);
825 operator>>(unmarshall &u, bool &x)
827 x = (bool)u.rawbyte();
832 operator>>(unmarshall &u, unsigned char &x)
834 x = (unsigned char)u.rawbyte();
839 operator>>(unmarshall &u, char &x)
841 x = (char)u.rawbyte();
846 operator>>(unmarshall &u, unsigned short &x)
848 x = (u.rawbyte() & 0xff) << 8;
849 x |= u.rawbyte() & 0xff;
854 operator>>(unmarshall &u, short &x)
856 x = (u.rawbyte() & 0xff) << 8;
857 x |= u.rawbyte() & 0xff;
862 operator>>(unmarshall &u, unsigned int &x)
864 x = (u.rawbyte() & 0xff) << 24;
865 x |= (u.rawbyte() & 0xff) << 16;
866 x |= (u.rawbyte() & 0xff) << 8;
867 x |= u.rawbyte() & 0xff;
872 operator>>(unmarshall &u, int &x)
874 x = (u.rawbyte() & 0xff) << 24;
875 x |= (u.rawbyte() & 0xff) << 16;
876 x |= (u.rawbyte() & 0xff) << 8;
877 x |= u.rawbyte() & 0xff;
882 operator>>(unmarshall &u, unsigned long long &x)
887 x = l | ((unsigned long long) h << 32);
892 operator>>(unmarshall &u, std::string &s)
901 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
902 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
903 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
904 ((a.sin_port < b.sin_port))));
907 /*---------------auxilary function--------------*/
909 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
912 const char *localhost = "127.0.0.1";
913 const char *port = index(hostandport, ':');
915 memcpy(host, localhost, strlen(localhost)+1);
918 memcpy(host, hostandport, port-hostandport);
919 host[port-hostandport] = '\0';
923 make_sockaddr(host, port, dst);
928 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
932 bzero(dst, sizeof(*dst));
933 dst->sin_family = AF_INET;
936 if(a != INADDR_NONE){
937 dst->sin_addr.s_addr = a;
939 struct hostent *hp = gethostbyname(host);
940 if(hp == 0 || hp->h_length != 4){
941 fprintf(stderr, "cannot find host name %s\n", host);
944 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
946 dst->sin_port = htons(atoi(port));