2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
68 #include "lang/verify.h"
70 const rpcc::TO rpcc::to_max = { 120000 };
71 const rpcc::TO rpcc::to_min = { 1000 };
73 rpcc::caller::caller(int xxid, unmarshall *xun)
74 : xid(xxid), un(xun), done(false)
78 rpcc::caller::~caller()
85 auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
86 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
89 rpcc::rpcc(sockaddr_in d, bool retrans) :
90 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
91 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
95 clt_nonce_ = (unsigned int)random();
97 // special client nonce 0 means this client does not
98 // require at-most-once logic from the server
99 // because it uses tcp and never retries a failed connection
103 char *loss_env = getenv("RPC_LOSSY");
104 if(loss_env != NULL){
105 lossytest_ = atoi(loss_env);
108 // xid starts with 1 and latest received reply starts with 0
109 xid_rep_window_.push_back(0);
111 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
112 clt_nonce_, lossytest_);
115 // IMPORTANT: destruction should happen only when no external threads
116 // are blocked inside rpcc or will use rpcc in the future
119 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
120 clt_nonce_, chan_?chan_->channo():-1);
125 VERIFY(calls_.size() == 0);
132 int ret = call_timeout(rpc_const::bind, to, r, 0);
138 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
139 inet_ntoa(dst_.sin_addr), ret);
144 // Cancel all outstanding calls
149 tprintf("rpcc::cancel: force callers to fail");
150 for(auto &p : calls_){
151 caller *ca = p.second;
153 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
157 ca->intret = rpc_const::cancel_failure;
162 while (calls_.size () > 0){
163 destroy_wait_ = true;
164 destroy_wait_c_.wait(ml);
166 tprintf("rpcc::cancel: done");
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
179 if((proc != rpc_const::bind && !bind_done_) ||
180 (proc == rpc_const::bind && bind_done_)){
181 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182 return rpc_const::bind_failure;
186 return rpc_const::cancel_failure;
190 calls_[ca.xid] = &ca;
192 req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
193 xid_rep = xid_rep_window_.front();
197 std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
198 std::chrono::steady_clock::now() +
199 std::chrono::milliseconds(to.to),
202 curr_to.to = to_min.to;
204 bool transmit = true;
205 connection *ch = NULL;
215 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
220 if (forgot.isvalid())
221 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
222 ch->send(req.cstr(), req.size());
224 else jsl_log(JSL_DBG_1, "not reachable\n");
226 "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
227 clt_nonce_, proc, ca.xid, clt_nonce_);
229 transmit = false; // only send once on a given channel
232 if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
235 nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
236 if(nextdeadline > finaldeadline) {
237 nextdeadline = finaldeadline;
238 finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
244 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
245 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
246 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
251 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
256 if(retrans_ && (!ch || ch->isdead())){
257 // since connection is dead, retransmit
258 // on the new connection
265 // no locking of ca.m since only this thread changes ca.xid
267 calls_.erase(ca.xid);
268 // may need to update the xid again here, in case the
269 // packet times out before it's even sent by the channel.
270 // I don't think there's any harm in maybe doing it twice
271 update_xid_rep(ca.xid);
274 destroy_wait_c_.notify_one();
278 if (ca.done && lossytest_)
281 if (!dup_req_.isvalid()) {
282 dup_req_.buf.assign(req.cstr(), req.size());
283 dup_req_.xid = ca.xid;
285 if (xid_rep > xid_rep_done_)
286 xid_rep_done_ = xid_rep;
292 "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
293 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
294 ntohs(dst_.sin_port), ca.done, ca.intret);
299 // destruction of req automatically frees its buffer
300 return (ca.done? ca.intret : rpc_const::timeout_failure);
304 rpcc::get_refconn(connection **ch)
307 if(!chan_ || chan_->isdead()){
310 chan_ = connect_to_dst(dst_, this, lossytest_);
321 // PollMgr's thread is being used to
322 // make this upcall from connection object to rpcc.
323 // this funtion must not block.
325 // this function keeps no reference for connection *c
327 rpcc::got_pdu(connection *, char *b, size_t sz)
329 unmarshall rep(b, sz);
331 rep.unpack_reply_header(&h);
334 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
340 update_xid_rep(h.xid);
342 if(calls_.find(h.xid) == calls_.end()){
343 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
346 caller *ca = calls_[h.xid];
350 ca->un->take_in(rep);
353 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
362 // assumes thread holds mutex m
364 rpcc::update_xid_rep(int xid)
366 if(xid <= xid_rep_window_.front()){
370 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
372 xid_rep_window_.insert(it, xid);
376 xid_rep_window_.push_back(xid);
379 auto it = xid_rep_window_.begin();
380 for (it++; it != xid_rep_window_.end(); it++){
381 while (xid_rep_window_.front() + 1 == *it)
382 xid_rep_window_.pop_front();
386 rpcs::rpcs(unsigned int p1, size_t count)
387 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
390 nonce_ = (unsigned int)random();
391 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
393 char *loss_env = getenv("RPC_LOSSY");
394 if(loss_env != NULL){
395 lossytest_ = atoi(loss_env);
398 reg(rpc_const::bind, &rpcs::rpcbind, this);
399 dispatchpool_ = new ThrPool(6,false);
401 listener_ = new tcpsconn(this, port_, lossytest_);
406 // must delete listener before dispatchpool
408 delete dispatchpool_;
413 rpcs::got_pdu(connection *c, char *b, size_t sz)
416 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
420 djob_t *j = new djob_t(c, b, sz);
422 bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
423 if(!succ || !reachable_){
431 rpcs::reg1(unsigned int proc, handler *h)
434 VERIFY(procs_.count(proc) == 0);
436 VERIFY(procs_.count(proc) >= 1);
440 rpcs::updatestat(unsigned int proc)
445 if(curr_counts_ == 0){
446 tprintf("RPC STATS: ");
447 for (auto i = counts_.begin(); i != counts_.end(); i++)
448 tprintf("%x:%lu ", i->first, i->second);
450 lock rwl(reply_window_m_);
451 std::map<unsigned int,std::list<reply_t> >::iterator clt;
453 size_t totalrep = 0, maxrep = 0;
454 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
455 totalrep += clt->second.size();
456 if(clt->second.size() > maxrep)
457 maxrep = clt->second.size();
459 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
460 (int) reply_window_.size()-1, totalrep, maxrep);
461 curr_counts_ = counting_;
466 rpcs::dispatch(djob_t *j)
468 connection *c = j->conn;
469 unmarshall req(j->buf, j->sz);
473 req.unpack_req_header(&h);
474 unsigned int proc = (unsigned int)h.proc;
477 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
483 "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
484 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
487 reply_header rh(h.xid,0);
489 // is client sending to an old instance of server?
490 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
492 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
493 h.srv_nonce, nonce_, h.proc);
494 rh.ret = rpc_const::oldsrv_failure;
495 rep.pack_reply_header(rh);
496 c->send(rep.cstr(),rep.size());
501 // is RPC proc a registered procedure?
504 if(procs_.count(proc) < 1){
505 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
515 rpcs::rpcstate_t stat;
520 // have i seen this client before?
522 lock rwl(reply_window_m_);
523 // if we don't know about this clt_nonce, create a cleanup object
524 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
525 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
526 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
528 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
529 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
533 // save the latest good connection to the client
536 if(conns_.find(h.clt_nonce) == conns_.end()){
538 conns_[h.clt_nonce] = c;
539 } else if(conns_[h.clt_nonce]->compare(c) < 0){
540 conns_[h.clt_nonce]->decref();
542 conns_[h.clt_nonce] = c;
546 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
547 h.xid_rep, &b1, &sz1);
549 // this client does not require at most once logic
554 case NEW: // new request
559 rh.ret = (*f)(req, rep);
560 if (rh.ret == rpc_const::unmarshal_args_failure) {
561 fprintf(stderr, "rpcs::dispatch: failed to"
562 " unmarshall the arguments. You are"
563 " probably calling RPC 0x%x with wrong"
564 " types of arguments.\n", proc);
569 rep.pack_reply_header(rh);
570 rep.take_buf(&b1,&sz1);
573 "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
574 sz1, h.xid, proc, rh.ret, h.clt_nonce);
577 // only record replies for clients that require at-most-once logic
578 add_reply(h.clt_nonce, h.xid, b1, sz1);
581 // get the latest connection to the client
584 if(c->isdead() && c != conns_[h.clt_nonce]){
586 c = conns_[h.clt_nonce];
592 if(h.clt_nonce == 0){
593 // reply is not added to at-most-once window, free it
597 case INPROGRESS: // server is working on this request
599 case DONE: // duplicate and we still have the response
602 case FORGOTTEN: // very old request and we don't have the response anymore
603 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
605 rh.ret = rpc_const::atmostonce_failure;
606 rep.pack_reply_header(rh);
607 c->send(rep.cstr(),rep.size());
613 // rpcs::dispatch calls this when an RPC request arrives.
615 // checks to see if an RPC with xid from clt_nonce has already been received.
616 // if not, remembers the request in reply_window_.
618 // deletes remembered requests with XIDs <= xid_rep; the client
619 // says it has received a reply for every RPC up through xid_rep.
620 // frees the reply_t::buf of each such request.
623 // NEW: never seen this xid before.
624 // INPROGRESS: seen this xid, and still processing it.
625 // DONE: seen this xid, previous reply returned in *b and *sz.
626 // FORGOTTEN: might have seen this xid, but deleted previous reply.
628 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
629 int xid_rep, char **b, size_t *sz)
631 lock rwl(reply_window_m_);
633 std::list<reply_t> &l = reply_window_[clt_nonce];
635 VERIFY(l.size() > 0);
636 VERIFY(xid >= xid_rep);
638 int past_xid_rep = l.begin()->xid;
640 std::list<reply_t>::iterator start = l.begin(), it;
643 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
644 // scan for deletion candidates
645 for (; it != l.end() && it->xid < xid_rep; it++) {
650 l.begin()->xid = xid_rep;
653 if (xid < past_xid_rep && past_xid_rep != -1)
656 // skip non-deletion candidates
657 while (it != l.end() && it->xid < xid)
660 // if it's in the list it must be right here
661 if (it != l.end() && it->xid == xid) {
662 if (it->cb_present) {
663 // return information about the remembered reply
671 // remember that a new request has arrived
672 l.insert(it, reply_t(xid));
677 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
678 // and passes the return value in b and sz.
679 // add_reply() should remember b and sz.
680 // free_reply_window() and checkduplicate_and_update is responsible for
683 rpcs::add_reply(unsigned int clt_nonce, int xid,
686 lock rwl(reply_window_m_);
687 // remember the RPC reply value
688 std::list<reply_t> &l = reply_window_[clt_nonce];
689 std::list<reply_t>::iterator it = l.begin();
690 // skip to our place in the list
691 for (it++; it != l.end() && it->xid < xid; it++);
692 // there should already be an entry, so whine if there isn't
693 if (it == l.end() || it->xid != xid) {
694 fprintf(stderr, "Could not find reply struct in add_reply");
695 l.insert(it, reply_t(xid, b, sz));
697 *it = reply_t(xid, b, sz);
702 rpcs::free_reply_window(void)
704 lock rwl(reply_window_m_);
705 for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
706 for (auto it = clt->second.begin(); it != clt->second.end(); it++){
712 reply_window_.clear();
717 rpcs::rpcbind(unsigned int &r, int)
719 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
725 operator<<(marshall &m, uint8_t x) {
731 operator<<(marshall &m, uint16_t x) {
733 m.rawbytes((char *)&x, 2);
738 operator<<(marshall &m, uint32_t x) {
740 m.rawbytes((char *)&x, 4);
744 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
745 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
746 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
747 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
748 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
751 operator<<(marshall &m, const std::string &s) {
752 m << (unsigned int) s.size();
753 m.rawbytes(s.data(), s.size());
757 void marshall::pack_req_header(const request_header &h) {
758 size_t saved_sz = index_;
759 //leave the first 4-byte empty for channel to fill size of pdu
760 index_ = sizeof(rpc_sz_t);
761 *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
765 void marshall::pack_reply_header(const reply_header &h) {
766 size_t saved_sz = index_;
767 //leave the first 4-byte empty for channel to fill size of pdu
768 index_ = sizeof(rpc_sz_t);
769 *this << h.xid << h.ret;
773 // take the contents from another unmarshall object
775 unmarshall::take_in(unmarshall &another)
779 another.take_buf(&buf_, &sz_);
780 index_ = RPC_HEADER_SZ;
781 ok_ = sz_ >= RPC_HEADER_SZ?true:false;
785 unmarshall::ensure(size_t n) {
792 unmarshall::rawbyte()
796 return (uint8_t)buf_[index_++];
800 unmarshall::rawbytes(std::string &ss, size_t n)
803 ss.assign(buf_+index_, n);
809 unmarshall::rawbytes(T &t)
811 const size_t n = sizeof(T);
813 memcpy(&t, buf_+index_, n);
818 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
819 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
820 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
821 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
822 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
823 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
824 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
825 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
826 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
827 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
828 unmarshall & operator>>(unmarshall &u, std::string &s) {
829 unsigned sz = u.grab<unsigned>();
835 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
836 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
837 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
838 ((a.sin_port < b.sin_port))));
841 /*---------------auxilary function--------------*/
843 make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
844 auto colon = hostandport.find(':');
845 if (colon == std::string::npos)
846 make_sockaddr("127.0.0.1", hostandport, dst);
848 make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
852 make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
853 bzero(dst, sizeof(*dst));
854 dst->sin_family = AF_INET;
856 struct in_addr a{inet_addr(host.c_str())};
858 if(a.s_addr != INADDR_NONE)
859 dst->sin_addr.s_addr = a.s_addr;
861 struct hostent *hp = gethostbyname(host.c_str());
863 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
864 fprintf(stderr, "cannot find host name %s\n", host.c_str());
867 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
868 dst->sin_addr.s_addr = a.s_addr;
870 dst->sin_port = hton((uint16_t)std::stoi(port));