2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 This version of the RPC library explicitly joins exited threads to make sure
42 no outstanding references exist before deleting objects.
44 To delete a rpcc object safely, the users of the library must ensure that
45 there are no outstanding calls on the rpcc object.
47 To delete a rpcs object safely, we do the following in sequence: 1. stop
48 accepting new incoming connections. 2. close existing active connections.
49 3. delete the dispatch thread pool which involves waiting for current active
50 RPC handlers to finish. It is interesting how a thread pool can be deleted
51 without using thread cancellation. The trick is to inject x "poison pills" for
52 a thread pool of x threads. Upon getting a poison pill instead of a normal
53 task, a worker thread will exit (and thread pool destructor waits to join all
54 x exited worker threads).
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
67 #include "threaded_log.h"
68 #include "lang/verify.h"
72 const rpcc::TO rpcc::to_max = { 120000 };
73 const rpcc::TO rpcc::to_min = { 1000 };
75 rpcc::caller::caller(int xxid, unmarshall *xun)
76 : xid(xxid), un(xun), done(false)
80 rpcc::caller::~caller()
87 auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
88 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
91 rpcc::rpcc(const string & d, bool retrans) :
92 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
93 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
97 clt_nonce_ = (unsigned int)random();
99 // special client nonce 0 means this client does not
100 // require at-most-once logic from the server
101 // because it uses tcp and never retries a failed connection
105 char *loss_env = getenv("RPC_LOSSY");
106 if(loss_env != NULL){
107 lossytest_ = atoi(loss_env);
110 // xid starts with 1 and latest received reply starts with 0
111 xid_rep_window_.push_back(0);
113 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
114 clt_nonce_, lossytest_);
117 // IMPORTANT: destruction should happen only when no external threads
118 // are blocked inside rpcc or will use rpcc in the future
121 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
122 clt_nonce_, chan_?chan_->channo():-1);
127 VERIFY(calls_.size() == 0);
134 int ret = call_timeout(rpc_const::bind, to, r, 0);
140 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
141 inet_ntoa(dst_.sin_addr), ret);
146 // Cancel all outstanding calls
151 LOG("rpcc::cancel: force callers to fail");
152 for(auto &p : calls_){
153 caller *ca = p.second;
155 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
159 ca->intret = rpc_const::cancel_failure;
164 while (calls_.size () > 0){
165 destroy_wait_ = true;
166 destroy_wait_c_.wait(ml);
168 LOG("rpcc::cancel: done");
172 rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
181 if((proc != rpc_const::bind && !bind_done_) ||
182 (proc == rpc_const::bind && bind_done_)){
183 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
184 return rpc_const::bind_failure;
188 return rpc_const::cancel_failure;
192 calls_[ca.xid] = &ca;
194 req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
195 xid_rep = xid_rep_window_.front();
199 std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200 std::chrono::steady_clock::now() +
201 std::chrono::milliseconds(to.to),
204 curr_to.to = to_min.to;
206 bool transmit = true;
207 connection *ch = NULL;
217 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
222 if (forgot.isvalid())
223 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224 ch->send(req.cstr(), req.size());
226 else jsl_log(JSL_DBG_1, "not reachable\n");
228 "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
229 clt_nonce_, proc, ca.xid, clt_nonce_);
231 transmit = false; // only send once on a given channel
234 if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
237 nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238 if(nextdeadline > finaldeadline) {
239 nextdeadline = finaldeadline;
240 finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
246 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
253 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
258 if(retrans_ && (!ch || ch->isdead())){
259 // since connection is dead, retransmit
260 // on the new connection
267 // no locking of ca.m since only this thread changes ca.xid
269 calls_.erase(ca.xid);
270 // may need to update the xid again here, in case the
271 // packet times out before it's even sent by the channel.
272 // I don't think there's any harm in maybe doing it twice
273 update_xid_rep(ca.xid);
276 destroy_wait_c_.notify_one();
280 if (ca.done && lossytest_)
283 if (!dup_req_.isvalid()) {
284 dup_req_.buf.assign(req.cstr(), req.size());
285 dup_req_.xid = ca.xid;
287 if (xid_rep > xid_rep_done_)
288 xid_rep_done_ = xid_rep;
294 "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
295 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296 ntohs(dst_.sin_port), ca.done, ca.intret);
301 // destruction of req automatically frees its buffer
302 return (ca.done? ca.intret : rpc_const::timeout_failure);
306 rpcc::get_refconn(connection **ch)
309 if(!chan_ || chan_->isdead()){
312 chan_ = connect_to_dst(dst_, this, lossytest_);
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
327 // this function keeps no reference for connection *c
329 rpcc::got_pdu(connection *, char *b, size_t sz)
331 unmarshall rep(b, sz);
333 rep.unpack_reply_header(&h);
336 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
342 update_xid_rep(h.xid);
344 if(calls_.find(h.xid) == calls_.end()){
345 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
348 caller *ca = calls_[h.xid];
352 ca->un->take_in(rep);
355 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
364 // assumes thread holds mutex m
366 rpcc::update_xid_rep(int xid)
368 if(xid <= xid_rep_window_.front()){
372 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
374 xid_rep_window_.insert(it, xid);
378 xid_rep_window_.push_back(xid);
381 auto it = xid_rep_window_.begin();
382 for (it++; it != xid_rep_window_.end(); it++){
383 while (xid_rep_window_.front() + 1 == *it)
384 xid_rep_window_.pop_front();
388 rpcs::rpcs(unsigned int p1, size_t count)
389 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
392 nonce_ = (unsigned int)random();
393 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
395 char *loss_env = getenv("RPC_LOSSY");
396 if(loss_env != NULL){
397 lossytest_ = atoi(loss_env);
400 reg(rpc_const::bind, &rpcs::rpcbind, this);
401 dispatchpool_ = new ThrPool(6,false);
403 listener_ = new tcpsconn(this, port_, lossytest_);
408 // must delete listener before dispatchpool
410 delete dispatchpool_;
415 rpcs::got_pdu(connection *c, char *b, size_t sz)
418 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
422 djob_t *j = new djob_t(c, b, sz);
424 bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
425 if(!succ || !reachable_){
433 rpcs::reg1(proc_t proc, handler *h)
436 VERIFY(procs_.count(proc) == 0);
438 VERIFY(procs_.count(proc) >= 1);
442 rpcs::updatestat(proc_t proc)
447 if(curr_counts_ == 0){
449 for (auto i = counts_.begin(); i != counts_.end(); i++)
450 LOG(std::hex << i->first << ":" << std::dec << i->second);
452 lock rwl(reply_window_m_);
453 map<unsigned int,list<reply_t> >::iterator clt;
455 size_t totalrep = 0, maxrep = 0;
456 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
457 totalrep += clt->second.size();
458 if(clt->second.size() > maxrep)
459 maxrep = clt->second.size();
461 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
462 (int) reply_window_.size()-1, totalrep, maxrep);
463 curr_counts_ = counting_;
468 rpcs::dispatch(djob_t *j)
470 connection *c = j->conn;
471 unmarshall req(j->buf, j->sz);
475 req.unpack_req_header(&h);
476 proc_t proc = h.proc;
479 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
485 "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
486 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
489 reply_header rh(h.xid,0);
491 // is client sending to an old instance of server?
492 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
494 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
495 h.srv_nonce, nonce_, h.proc);
496 rh.ret = rpc_const::oldsrv_failure;
497 rep.pack_reply_header(rh);
498 c->send(rep.cstr(),rep.size());
503 // is RPC proc a registered procedure?
506 if(procs_.count(proc) < 1){
507 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
517 rpcs::rpcstate_t stat;
522 // have i seen this client before?
524 lock rwl(reply_window_m_);
525 // if we don't know about this clt_nonce, create a cleanup object
526 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
527 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
528 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
530 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
531 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
535 // save the latest good connection to the client
538 if(conns_.find(h.clt_nonce) == conns_.end()){
540 conns_[h.clt_nonce] = c;
541 } else if(conns_[h.clt_nonce]->compare(c) < 0){
542 conns_[h.clt_nonce]->decref();
544 conns_[h.clt_nonce] = c;
548 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
549 h.xid_rep, &b1, &sz1);
551 // this client does not require at most once logic
556 case NEW: // new request
561 rh.ret = (*f)(req, rep);
562 if (rh.ret == rpc_const::unmarshal_args_failure) {
563 fprintf(stderr, "rpcs::dispatch: failed to"
564 " unmarshall the arguments. You are"
565 " probably calling RPC 0x%x with wrong"
566 " types of arguments.\n", proc);
571 rep.pack_reply_header(rh);
572 rep.take_buf(&b1,&sz1);
575 "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
576 sz1, h.xid, proc, rh.ret, h.clt_nonce);
579 // only record replies for clients that require at-most-once logic
580 add_reply(h.clt_nonce, h.xid, b1, sz1);
583 // get the latest connection to the client
586 if(c->isdead() && c != conns_[h.clt_nonce]){
588 c = conns_[h.clt_nonce];
594 if(h.clt_nonce == 0){
595 // reply is not added to at-most-once window, free it
599 case INPROGRESS: // server is working on this request
601 case DONE: // duplicate and we still have the response
604 case FORGOTTEN: // very old request and we don't have the response anymore
605 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
607 rh.ret = rpc_const::atmostonce_failure;
608 rep.pack_reply_header(rh);
609 c->send(rep.cstr(),rep.size());
615 // rpcs::dispatch calls this when an RPC request arrives.
617 // checks to see if an RPC with xid from clt_nonce has already been received.
618 // if not, remembers the request in reply_window_.
620 // deletes remembered requests with XIDs <= xid_rep; the client
621 // says it has received a reply for every RPC up through xid_rep.
622 // frees the reply_t::buf of each such request.
625 // NEW: never seen this xid before.
626 // INPROGRESS: seen this xid, and still processing it.
627 // DONE: seen this xid, previous reply returned in *b and *sz.
628 // FORGOTTEN: might have seen this xid, but deleted previous reply.
630 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
631 int xid_rep, char **b, size_t *sz)
633 lock rwl(reply_window_m_);
635 list<reply_t> &l = reply_window_[clt_nonce];
637 VERIFY(l.size() > 0);
638 VERIFY(xid >= xid_rep);
640 int past_xid_rep = l.begin()->xid;
642 list<reply_t>::iterator start = l.begin(), it;
645 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
646 // scan for deletion candidates
647 for (; it != l.end() && it->xid < xid_rep; it++) {
652 l.begin()->xid = xid_rep;
655 if (xid < past_xid_rep && past_xid_rep != -1)
658 // skip non-deletion candidates
659 while (it != l.end() && it->xid < xid)
662 // if it's in the list it must be right here
663 if (it != l.end() && it->xid == xid) {
664 if (it->cb_present) {
665 // return information about the remembered reply
673 // remember that a new request has arrived
674 l.insert(it, reply_t(xid));
679 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
680 // and passes the return value in b and sz.
681 // add_reply() should remember b and sz.
682 // free_reply_window() and checkduplicate_and_update is responsible for
685 rpcs::add_reply(unsigned int clt_nonce, int xid,
688 lock rwl(reply_window_m_);
689 // remember the RPC reply value
690 list<reply_t> &l = reply_window_[clt_nonce];
691 list<reply_t>::iterator it = l.begin();
692 // skip to our place in the list
693 for (it++; it != l.end() && it->xid < xid; it++);
694 // there should already be an entry, so whine if there isn't
695 if (it == l.end() || it->xid != xid) {
696 fprintf(stderr, "Could not find reply struct in add_reply");
697 l.insert(it, reply_t(xid, b, sz));
699 *it = reply_t(xid, b, sz);
704 rpcs::free_reply_window(void)
706 lock rwl(reply_window_m_);
707 for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
708 for (auto it = clt->second.begin(); it != clt->second.end(); it++){
714 reply_window_.clear();
719 rpcs::rpcbind(unsigned int &r, int)
721 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
727 operator<<(marshall &m, uint8_t x) {
733 operator<<(marshall &m, uint16_t x) {
735 m.rawbytes((char *)&x, 2);
740 operator<<(marshall &m, uint32_t x) {
742 m.rawbytes((char *)&x, 4);
746 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
747 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
748 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
749 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
750 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
753 operator<<(marshall &m, const string &s) {
754 m << (unsigned int) s.size();
755 m.rawbytes(s.data(), s.size());
759 void marshall::pack_req_header(const request_header &h) {
760 size_t saved_sz = index_;
761 //leave the first 4-byte empty for channel to fill size of pdu
762 index_ = sizeof(rpc_sz_t);
763 *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
767 void marshall::pack_reply_header(const reply_header &h) {
768 size_t saved_sz = index_;
769 //leave the first 4-byte empty for channel to fill size of pdu
770 index_ = sizeof(rpc_sz_t);
771 *this << h.xid << h.ret;
775 // take the contents from another unmarshall object
777 unmarshall::take_in(unmarshall &another)
781 another.take_buf(&buf_, &sz_);
782 index_ = RPC_HEADER_SZ;
783 ok_ = sz_ >= RPC_HEADER_SZ?true:false;
787 unmarshall::ensure(size_t n) {
794 unmarshall::rawbyte()
798 return (uint8_t)buf_[index_++];
802 unmarshall::rawbytes(string &ss, size_t n)
805 ss.assign(buf_+index_, n);
811 unmarshall::rawbytes(T &t)
813 const size_t n = sizeof(T);
815 memcpy(&t, buf_+index_, n);
820 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
821 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
822 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
823 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
824 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
825 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
826 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
827 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
828 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
829 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
830 unmarshall & operator>>(unmarshall &u, string &s) {
831 unsigned sz = u.grab<unsigned>();
837 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
838 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
839 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
840 ((a.sin_port < b.sin_port))));
843 /*---------------auxilary function--------------*/
844 sockaddr_in make_sockaddr(const string &hostandport) {
845 auto colon = hostandport.find(':');
846 if (colon == string::npos)
847 return make_sockaddr("127.0.0.1", hostandport);
849 return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
852 sockaddr_in make_sockaddr(const string &host, const string &port) {
854 bzero(&dst, sizeof(dst));
855 dst.sin_family = AF_INET;
857 struct in_addr a{inet_addr(host.c_str())};
859 if(a.s_addr != INADDR_NONE)
860 dst.sin_addr.s_addr = a.s_addr;
862 struct hostent *hp = gethostbyname(host.c_str());
864 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
865 fprintf(stderr, "cannot find host name %s\n", host.c_str());
868 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
869 dst.sin_addr.s_addr = a.s_addr;
871 dst.sin_port = hton((uint16_t)stoi(port));