2 The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
3 server. The jobs of rpcc include maintaining a connection to server, sending
4 RPC requests and waiting for responses, retransmissions, at-most-once delivery
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has
15 failed (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The thread pool allows
29 us to control the number of threads spawned at the server (spawning one thread
30 per request will hurt when the server faces thousands of requests).
32 In order to delete a connection object, we must maintain a reference count.
33 For rpcc, multiple client threads might be invoking the rpcc::call() functions
34 and thus holding multiple references to the underlying connection object. For
35 rpcs, multiple dispatch threads might be holding references to the same
36 connection object. A connection object is deleted only when the underlying
37 connection is dead and the reference count reaches zero.
39 This version of the RPC library explicitly joins exited threads to make sure
40 no outstanding references exist before deleting objects.
42 To delete a rpcc object safely, the users of the library must ensure that
43 there are no outstanding calls on the rpcc object.
45 To delete a rpcs object safely, we do the following in sequence: 1. stop
46 accepting new incoming connections. 2. close existing active connections. 3.
47 delete the dispatch thread pool which involves waiting for current active RPC
48 handlers to finish. It is interesting how a thread pool can be deleted
49 without using thread cancellation. The trick is to inject x "poison pills" for
50 a thread pool of x threads. Upon getting a poison pill instead of a normal
51 task, a worker thread will exit (and thread pool destructor waits to join all
52 x exited worker threads).
57 #include <arpa/inet.h>
58 #include <netinet/tcp.h>
62 inline void set_rand_seed() {
63 auto now = time_point_cast<nanoseconds>(steady_clock::now());
64 srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
67 static sockaddr_in make_sockaddr(const string &hostandport);
69 rpcc::rpcc(const string & d, bool retrans) :
70 dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
71 retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
75 clt_nonce_ = (unsigned int)random();
77 // special client nonce 0 means this client does not
78 // require at-most-once logic from the server
79 // because it uses tcp and never retries a failed connection
83 char *loss_env = getenv("RPC_LOSSY");
85 lossytest_ = atoi(loss_env);
87 // xid starts with 1 and latest received reply starts with 0
88 xid_rep_window_.push_back(0);
90 IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
93 // IMPORTANT: destruction should happen only when no external threads
94 // are blocked inside rpcc or will use rpcc in the future
97 IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
100 VERIFY(calls_.size() == 0);
103 int rpcc::bind(milliseconds to) {
105 int ret = call_timeout(rpc_protocol::bind, to, r, 0);
111 IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
116 // Cancel all outstanding calls
117 void rpcc::cancel(void) {
120 LOG("force callers to fail");
121 for (auto &p : calls_) {
122 caller *ca = p.second;
124 IF_LEVEL(2) LOG("force caller to fail");
128 ca->intret = rpc_protocol::cancel_failure;
132 destroy_wait_ = true;
133 while (calls_.size () > 0)
134 destroy_wait_c_.wait(ml);
140 int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
147 if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
148 IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
149 return rpc_protocol::bind_failure;
153 return rpc_protocol::cancel_failure;
156 calls_[ca.xid] = &ca;
158 req.pack_header(rpc_protocol::request_header{
159 ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
161 xid_rep = xid_rep_window_.front();
164 milliseconds curr_to = rpc::to_min;
165 auto finaldeadline = steady_clock::now() + to;
167 bool transmit = true;
168 shared_ptr<connection> ch;
178 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
183 if (forgot.isvalid())
184 ch->send(forgot.buf);
187 else IF_LEVEL(1) LOG("not reachable");
188 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
189 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
191 transmit = false; // only send once on a given channel
194 auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline);
200 IF_LEVEL(2) LOG("wait");
201 if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
202 IF_LEVEL(2) LOG("timeout");
207 IF_LEVEL(2) LOG("reply received");
212 if (nextdeadline >= finaldeadline)
215 if (retrans_ && (!ch || ch->isdead())) {
216 // since connection is dead, retransmit
217 // on the new connection
223 // no locking of ca.m since only this thread changes ca.xid
225 calls_.erase(ca.xid);
226 // may need to update the xid again here, in case the
227 // packet times out before it's even sent by the channel.
228 // I don't think there's any harm in maybe doing it twice
229 update_xid_rep(ca.xid);
232 destroy_wait_c_.notify_one();
235 if (ca.done && lossytest_)
238 if (!dup_req_.isvalid()) {
240 dup_req_.xid = ca.xid;
242 if (xid_rep > xid_rep_done_)
243 xid_rep_done_ = xid_rep;
248 IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
249 " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
250 ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
252 // destruction of req automatically frees its buffer
253 return (ca.done? ca.intret : rpc_protocol::timeout_failure);
256 void rpcc::get_refconn(shared_ptr<connection> & ch) {
258 if (!chan_ || chan_->isdead())
259 chan_ = connection::to_dst(dst_, this, lossytest_);
265 // PollMgr's thread is being used to
266 // make this upcall from connection object to rpcc.
267 // this funtion must not block.
269 // this function keeps no reference for connection *c
271 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
273 unmarshall rep(b, true);
274 rpc_protocol::reply_header h;
275 rep.unpack_header(h);
278 IF_LEVEL(1) LOG("unmarshall header failed!!!");
284 update_xid_rep(h.xid);
286 if (calls_.find(h.xid) == calls_.end()) {
287 IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
290 caller *ca = calls_[h.xid];
296 if (ca->intret < 0) {
297 IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
305 // assumes thread holds mutex m
307 rpcc::update_xid_rep(int xid)
309 if (xid <= xid_rep_window_.front())
312 for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++) {
314 xid_rep_window_.insert(it, xid);
318 xid_rep_window_.push_back(xid);
321 auto it = xid_rep_window_.begin();
322 for (it++; it != xid_rep_window_.end(); it++) {
323 while (xid_rep_window_.front() + 1 == *it)
324 xid_rep_window_.pop_front();
328 rpcs::rpcs(in_port_t p1, size_t count)
329 : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
332 nonce_ = (unsigned int)random();
333 IF_LEVEL(2) LOG("created with nonce " << nonce_);
335 reg(rpc_protocol::bind, &rpcs::rpcbind, this);
336 dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
340 char *loss_env = getenv("RPC_LOSSY");
341 listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
345 // must delete listener before dispatchpool
347 dispatchpool_ = nullptr;
351 bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
353 IF_LEVEL(1) LOG("not reachable");
357 return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
360 void rpcs::reg1(proc_id_t proc, handler *h) {
362 VERIFY(procs_.count(proc) == 0);
364 VERIFY(procs_.count(proc) >= 1);
367 void rpcs::updatestat(proc_id_t proc) {
371 if (curr_counts_ == 0) {
373 for (auto i = counts_.begin(); i != counts_.end(); i++)
374 LOG(hex << i->first << ":" << dec << i->second);
376 lock rwl(reply_window_m_);
378 size_t totalrep = 0, maxrep = 0;
379 for (auto clt : reply_window_) {
380 totalrep += clt.second.size();
381 if (clt.second.size() > maxrep)
382 maxrep = clt.second.size();
384 IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
385 totalrep << " max per client " << maxrep);
386 curr_counts_ = counting_;
390 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
391 unmarshall req(buf, true);
393 rpc_protocol::request_header h;
394 req.unpack_header(h);
395 proc_id_t proc = h.proc;
398 IF_LEVEL(1) LOG("unmarshall header failed");
402 IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
403 dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
406 rpc_protocol::reply_header rh{h.xid,0};
408 // is client sending to an old instance of server?
409 if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
410 IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
411 " (current " << nonce_ << ") proc " << hex << h.proc);
412 rh.ret = rpc_protocol::oldsrv_failure;
419 // is RPC proc a registered procedure?
422 if (procs_.count(proc) < 1) {
423 LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
431 rpcs::rpcstate_t stat;
435 // have i seen this client before?
437 lock rwl(reply_window_m_);
438 // if we don't know about this clt_nonce, create a cleanup object
439 if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
440 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
441 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
442 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
443 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
447 // save the latest good connection to the client
450 if (conns_.find(h.clt_nonce) == conns_.end())
451 conns_[h.clt_nonce] = c;
452 else if (conns_[h.clt_nonce]->create_time() < c->create_time())
453 conns_[h.clt_nonce] = c;
456 stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
458 // this client does not require at most once logic
463 case NEW: // new request
467 rh.ret = (*f)(req, rep);
468 if (rh.ret == rpc_protocol::unmarshal_args_failure) {
469 LOG("failed to unmarshall the arguments. You are " <<
470 "probably calling RPC 0x" << hex << proc << " with the wrong " <<
471 "types of arguments.");
479 IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
480 h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
482 if (h.clt_nonce > 0) {
483 // only record replies for clients that require at-most-once logic
484 add_reply(h.clt_nonce, h.xid, b1);
487 // get the latest connection to the client
491 c = conns_[h.clt_nonce];
496 case INPROGRESS: // server is working on this request
498 case DONE: // duplicate and we still have the response
501 case FORGOTTEN: // very old request and we don't have the response anymore
502 IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
503 rh.ret = rpc_protocol::atmostonce_failure;
510 // rpcs::dispatch calls this when an RPC request arrives.
512 // checks to see if an RPC with xid from clt_nonce has already been received.
513 // if not, remembers the request in reply_window_.
515 // deletes remembered requests with XIDs <= xid_rep; the client
516 // says it has received a reply for every RPC up through xid_rep.
517 // frees the reply_t::buf of each such request.
520 // NEW: never seen this xid before.
521 // INPROGRESS: seen this xid, and still processing it.
522 // DONE: seen this xid, previous reply returned in b.
523 // FORGOTTEN: might have seen this xid, but deleted previous reply.
525 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
526 int xid_rep, string & b)
528 lock rwl(reply_window_m_);
530 list<reply_t> &l = reply_window_[clt_nonce];
532 VERIFY(l.size() > 0);
533 VERIFY(xid >= xid_rep);
535 int past_xid_rep = l.begin()->xid;
537 list<reply_t>::iterator start = l.begin(), it = ++start;
539 if (past_xid_rep < xid_rep || past_xid_rep == -1) {
540 // scan for deletion candidates
541 while (it != l.end() && it->xid < xid_rep)
544 l.begin()->xid = xid_rep;
547 if (xid < past_xid_rep && past_xid_rep != -1)
550 // skip non-deletion candidates
551 while (it != l.end() && it->xid < xid)
554 // if it's in the list it must be right here
555 if (it != l.end() && it->xid == xid) {
556 if (it->cb_present) {
557 // return information about the remembered reply
563 // remember that a new request has arrived
564 l.insert(it, reply_t(xid));
569 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
570 // and passes the return value in b.
571 // add_reply() should remember b.
572 // free_reply_window() and checkduplicate_and_update are responsible for
573 // cleaning up the remembered values.
574 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
575 lock rwl(reply_window_m_);
576 // remember the RPC reply value
577 list<reply_t> &l = reply_window_[clt_nonce];
578 list<reply_t>::iterator it = l.begin();
579 // skip to our place in the list
580 for (it++; it != l.end() && it->xid < xid; it++);
581 // there should already be an entry, so whine if there isn't
582 if (it == l.end() || it->xid != xid) {
583 LOG("Could not find reply struct in add_reply");
584 l.insert(it, reply_t(xid, b));
586 *it = reply_t(xid, b);
590 void rpcs::free_reply_window(void) {
591 lock rwl(reply_window_m_);
592 reply_window_.clear();
595 int rpcs::rpcbind(unsigned int &r, int) {
596 IF_LEVEL(2) LOG("called return nonce " << nonce_);
601 static sockaddr_in make_sockaddr(const string &hostandport) {
602 string host = "127.0.0.1";
603 string port = hostandport;
604 auto colon = hostandport.find(':');
605 if (colon != string::npos) {
606 host = hostandport.substr(0, colon);
607 port = hostandport.substr(colon+1);
610 sockaddr_in dst{}; // zero initialize
611 dst.sin_family = AF_INET;
613 struct in_addr a{inet_addr(host.c_str())};
615 if (a.s_addr != INADDR_NONE)
616 dst.sin_addr.s_addr = a.s_addr;
618 struct hostent *hp = gethostbyname(host.c_str());
620 if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
621 LOG_NONMEMBER("cannot find host name " << host);
624 memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
625 dst.sin_addr.s_addr = a.s_addr;
627 dst.sin_port = hton((in_port_t)stoi(port));