80e429efca259984b5a737c94c61500622844c9f
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
3  server.  The jobs of rpcc include maintaining a connection to server, sending
4  RPC requests and waiting for responses, retransmissions, at-most-once delivery
5  etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has
15  failed (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The thread pool allows
29  us to control the number of threads spawned at the server (spawning one thread
30  per request will hurt when the server faces thousands of requests).
31
32  In order to delete a connection object, we must maintain a reference count.
33  For rpcc, multiple client threads might be invoking the rpcc::call() functions
34  and thus holding multiple references to the underlying connection object. For
35  rpcs, multiple dispatch threads might be holding references to the same
36  connection object.  A connection object is deleted only when the underlying
37  connection is dead and the reference count reaches zero.
38
39  This version of the RPC library explicitly joins exited threads to make sure
40  no outstanding references exist before deleting objects.
41
42  To delete a rpcc object safely, the users of the library must ensure that
43  there are no outstanding calls on the rpcc object.
44
45  To delete a rpcs object safely, we do the following in sequence: 1. stop
46  accepting new incoming connections. 2. close existing active connections.  3.
47  delete the dispatch thread pool which involves waiting for current active RPC
48  handlers to finish.  It is interesting how a thread pool can be deleted
49  without using thread cancellation. The trick is to inject x "poison pills" for
50  a thread pool of x threads. Upon getting a poison pill instead of a normal
51  task, a worker thread will exit (and thread pool destructor waits to join all
52  x exited worker threads).
53  */
54
55 #include "rpc.h"
56
57 #include <arpa/inet.h>
58 #include <netinet/tcp.h>
59 #include <netdb.h>
60 #include <unistd.h>
61
62 inline void set_rand_seed() {
63     auto now = time_point_cast<nanoseconds>(steady_clock::now());
64     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
65 }
66
67 static sockaddr_in make_sockaddr(const string &hostandport);
68
69 rpcc::rpcc(const string & d, bool retrans) :
70     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
71     retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
72 {
73     if (retrans) {
74         set_rand_seed();
75         clt_nonce_ = (unsigned int)random();
76     } else {
77         // special client nonce 0 means this client does not
78         // require at-most-once logic from the server
79         // because it uses tcp and never retries a failed connection
80         clt_nonce_ = 0;
81     }
82
83     char *loss_env = getenv("RPC_LOSSY");
84     if (loss_env)
85         lossytest_ = atoi(loss_env);
86
87     // xid starts with 1 and latest received reply starts with 0
88     xid_rep_window_.push_back(0);
89
90     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
91 }
92
93 // IMPORTANT: destruction should happen only when no external threads
94 // are blocked inside rpcc or will use rpcc in the future
95 rpcc::~rpcc() {
96     cancel();
97     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
98     if (chan_)
99         chan_->closeconn();
100     VERIFY(calls_.size() == 0);
101 }
102
103 int rpcc::bind(milliseconds to) {
104     unsigned int r;
105     int ret = call_timeout(rpc_const::bind, to, r, 0);
106     if (ret == 0) {
107         lock ml(m_);
108         bind_done_ = true;
109         srv_nonce_ = r;
110     } else {
111         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
112     }
113     return ret;
114 };
115
116 // Cancel all outstanding calls
117 void rpcc::cancel(void) {
118     lock ml(m_);
119     if (calls_.size()) {
120         LOG("force callers to fail");
121         for (auto &p : calls_) {
122             caller *ca = p.second;
123
124             IF_LEVEL(2) LOG("force caller to fail");
125
126             lock cl(ca->m);
127             ca->done = true;
128             ca->intret = rpc_const::cancel_failure;
129             ca->c.notify_one();
130         }
131
132         destroy_wait_ = true;
133         while (calls_.size () > 0)
134             destroy_wait_c_.wait(ml);
135
136         LOG("done");
137     }
138 }
139
140 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
141
142     caller ca(0, &rep);
143     int xid_rep;
144     {
145         lock ml(m_);
146
147         if ((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)) {
148             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
149             return rpc_const::bind_failure;
150         }
151
152         if (destroy_wait_)
153             return rpc_const::cancel_failure;
154
155         ca.xid = xid_++;
156         calls_[ca.xid] = &ca;
157
158         req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
159         xid_rep = xid_rep_window_.front();
160     }
161
162     milliseconds curr_to = rpc::to_min;
163     auto finaldeadline = steady_clock::now() + to;
164
165     bool transmit = true;
166     shared_ptr<connection> ch;
167
168     while (1) {
169         if (transmit) {
170             get_refconn(ch);
171             if (ch) {
172                 if (reachable_) {
173                     request forgot;
174                     {
175                         lock ml(m_);
176                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
177                             forgot = dup_req_;
178                             dup_req_.clear();
179                         }
180                     }
181                     if (forgot.isvalid())
182                         ch->send(forgot.buf);
183                     ch->send(req);
184                 }
185                 else IF_LEVEL(1) LOG("not reachable");
186                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
187                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
188             }
189             transmit = false; // only send once on a given channel
190         }
191
192         auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline);
193         curr_to *= 2;
194
195         {
196             lock cal(ca.m);
197             while (!ca.done) {
198                 IF_LEVEL(2) LOG("wait");
199                 if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
200                     IF_LEVEL(2) LOG("timeout");
201                     break;
202                 }
203             }
204             if (ca.done) {
205                 IF_LEVEL(2) LOG("reply received");
206                 break;
207             }
208         }
209
210         if (nextdeadline >= finaldeadline)
211             break;
212
213         if (retrans_ && (!ch || ch->isdead())) {
214             // since connection is dead, retransmit
215             // on the new connection
216             transmit = true;
217         }
218     }
219
220     {
221         // no locking of ca.m since only this thread changes ca.xid
222         lock ml(m_);
223         calls_.erase(ca.xid);
224         // may need to update the xid again here, in case the
225         // packet times out before it's even sent by the channel.
226         // I don't think there's any harm in maybe doing it twice
227         update_xid_rep(ca.xid);
228
229         if (destroy_wait_)
230             destroy_wait_c_.notify_one();
231     }
232
233     if (ca.done && lossytest_)
234     {
235         lock ml(m_);
236         if (!dup_req_.isvalid()) {
237             dup_req_.buf = req;
238             dup_req_.xid = ca.xid;
239         }
240         if (xid_rep > xid_rep_done_)
241             xid_rep_done_ = xid_rep;
242     }
243
244     lock cal(ca.m);
245
246     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
247                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
248                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
249
250     // destruction of req automatically frees its buffer
251     return (ca.done? ca.intret : rpc_const::timeout_failure);
252 }
253
254 void rpcc::get_refconn(shared_ptr<connection> & ch) {
255     lock ml(chan_m_);
256     if (!chan_ || chan_->isdead())
257         chan_ = connection::to_dst(dst_, this, lossytest_);
258
259     if (chan_)
260         ch = chan_;
261 }
262
263 // PollMgr's thread is being used to
264 // make this upcall from connection object to rpcc.
265 // this funtion must not block.
266 //
267 // this function keeps no reference for connection *c
268 bool
269 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
270 {
271     unmarshall rep(b, true);
272     reply_header h;
273     rep.unpack_header(h);
274
275     if (!rep.ok()) {
276         IF_LEVEL(1) LOG("unmarshall header failed!!!");
277         return true;
278     }
279
280     lock ml(m_);
281
282     update_xid_rep(h.xid);
283
284     if (calls_.find(h.xid) == calls_.end()) {
285         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
286         return true;
287     }
288     caller *ca = calls_[h.xid];
289
290     lock cl(ca->m);
291     if (!ca->done) {
292         *ca->rep = b;
293         ca->intret = h.ret;
294         if (ca->intret < 0) {
295             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
296         }
297         ca->done = 1;
298     }
299     ca->c.notify_all();
300     return true;
301 }
302
303 // assumes thread holds mutex m
304 void
305 rpcc::update_xid_rep(int xid)
306 {
307     if (xid <= xid_rep_window_.front())
308         return;
309
310     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++) {
311         if (*it > xid) {
312             xid_rep_window_.insert(it, xid);
313             goto compress;
314         }
315     }
316     xid_rep_window_.push_back(xid);
317
318 compress:
319     auto it = xid_rep_window_.begin();
320     for (it++; it != xid_rep_window_.end(); it++) {
321         while (xid_rep_window_.front() + 1 == *it)
322             xid_rep_window_.pop_front();
323     }
324 }
325
326 rpcs::rpcs(in_port_t p1, size_t count)
327   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
328 {
329     set_rand_seed();
330     nonce_ = (unsigned int)random();
331     IF_LEVEL(2) LOG("created with nonce " << nonce_);
332
333     reg(rpc_const::bind, &rpcs::rpcbind, this);
334     dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
335 }
336
337 void rpcs::start() {
338     char *loss_env = getenv("RPC_LOSSY");
339     listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
340 }
341
342 rpcs::~rpcs() {
343     // must delete listener before dispatchpool
344     listener_ = nullptr;
345     dispatchpool_ = nullptr;
346     free_reply_window();
347 }
348
349 bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
350     if (!reachable_) {
351         IF_LEVEL(1) LOG("not reachable");
352         return true;
353     }
354
355     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
356 }
357
358 void rpcs::reg1(proc_t proc, handler *h) {
359     lock pl(procs_m_);
360     VERIFY(procs_.count(proc) == 0);
361     procs_[proc] = h;
362     VERIFY(procs_.count(proc) >= 1);
363 }
364
365 void rpcs::updatestat(proc_t proc) {
366     lock cl(count_m_);
367     counts_[proc]++;
368     curr_counts_--;
369     if (curr_counts_ == 0) {
370         LOG("RPC STATS: ");
371         for (auto i = counts_.begin(); i != counts_.end(); i++)
372             LOG(hex << i->first << ":" << dec << i->second);
373
374         lock rwl(reply_window_m_);
375
376         size_t totalrep = 0, maxrep = 0;
377         for (auto clt : reply_window_) {
378             totalrep += clt.second.size();
379             if (clt.second.size() > maxrep)
380                 maxrep = clt.second.size();
381         }
382         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
383                         totalrep << " max per client " << maxrep);
384         curr_counts_ = counting_;
385     }
386 }
387
388 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
389     unmarshall req(buf, true);
390
391     request_header h;
392     req.unpack_header(h);
393     proc_t proc = h.proc;
394
395     if (!req.ok()) {
396         IF_LEVEL(1) LOG("unmarshall header failed");
397         return;
398     }
399
400     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
401                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
402
403     marshall rep;
404     reply_header rh{h.xid,0};
405
406     // is client sending to an old instance of server?
407     if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
408         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
409                         " (current " << nonce_ << ") proc " << hex << h.proc);
410         rh.ret = rpc_const::oldsrv_failure;
411         rep.pack_header(rh);
412         c->send(rep);
413         return;
414     }
415
416     handler *f;
417     // is RPC proc a registered procedure?
418     {
419         lock pl(procs_m_);
420         if (procs_.count(proc) < 1) {
421             LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
422             VERIFY(0);
423             return;
424         }
425
426         f = procs_[proc];
427     }
428
429     rpcs::rpcstate_t stat;
430     string b1;
431
432     if (h.clt_nonce) {
433         // have i seen this client before?
434         {
435             lock rwl(reply_window_m_);
436             // if we don't know about this clt_nonce, create a cleanup object
437             if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
438                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
439                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
440                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
441                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
442             }
443         }
444
445         // save the latest good connection to the client
446         {
447             lock rwl(conns_m_);
448             if (conns_.find(h.clt_nonce) == conns_.end())
449                 conns_[h.clt_nonce] = c;
450             else if (conns_[h.clt_nonce]->create_time() < c->create_time())
451                 conns_[h.clt_nonce] = c;
452         }
453
454         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
455     } else {
456         // this client does not require at most once logic
457         stat = NEW;
458     }
459
460     switch (stat) {
461         case NEW: // new request
462             if (counting_)
463                 updatestat(proc);
464
465             rh.ret = (*f)(req, rep);
466             if (rh.ret == rpc_const::unmarshal_args_failure) {
467                 cerr << "failed to unmarshall the arguments. You are " <<
468                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
469                         "types of arguments." << endl;
470                 VERIFY(0);
471             }
472             VERIFY(rh.ret >= 0);
473
474             rep.pack_header(rh);
475             b1 = rep;
476
477             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
478                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
479
480             if (h.clt_nonce > 0) {
481                 // only record replies for clients that require at-most-once logic
482                 add_reply(h.clt_nonce, h.xid, b1);
483             }
484
485             // get the latest connection to the client
486             {
487                 lock rwl(conns_m_);
488                 if (c->isdead())
489                     c = conns_[h.clt_nonce];
490             }
491
492             c->send(rep);
493             break;
494         case INPROGRESS: // server is working on this request
495             break;
496         case DONE: // duplicate and we still have the response
497             c->send(b1);
498             break;
499         case FORGOTTEN: // very old request and we don't have the response anymore
500             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
501             rh.ret = rpc_const::atmostonce_failure;
502             rep.pack_header(rh);
503             c->send(rep);
504             break;
505     }
506 }
507
508 // rpcs::dispatch calls this when an RPC request arrives.
509 //
510 // checks to see if an RPC with xid from clt_nonce has already been received.
511 // if not, remembers the request in reply_window_.
512 //
513 // deletes remembered requests with XIDs <= xid_rep; the client
514 // says it has received a reply for every RPC up through xid_rep.
515 // frees the reply_t::buf of each such request.
516 //
517 // returns one of:
518 //   NEW: never seen this xid before.
519 //   INPROGRESS: seen this xid, and still processing it.
520 //   DONE: seen this xid, previous reply returned in b.
521 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
522 rpcs::rpcstate_t
523 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
524         int xid_rep, string & b)
525 {
526     lock rwl(reply_window_m_);
527
528     list<reply_t> &l = reply_window_[clt_nonce];
529
530     VERIFY(l.size() > 0);
531     VERIFY(xid >= xid_rep);
532
533     int past_xid_rep = l.begin()->xid;
534
535     list<reply_t>::iterator start = l.begin(), it = ++start;
536
537     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
538         // scan for deletion candidates
539         while (it != l.end() && it->xid < xid_rep)
540             it++;
541         l.erase(start, it);
542         l.begin()->xid = xid_rep;
543     }
544
545     if (xid < past_xid_rep && past_xid_rep != -1)
546         return FORGOTTEN;
547
548     // skip non-deletion candidates
549     while (it != l.end() && it->xid < xid)
550         it++;
551
552     // if it's in the list it must be right here
553     if (it != l.end() && it->xid == xid) {
554         if (it->cb_present) {
555             // return information about the remembered reply
556             b = it->buf;
557             return DONE;
558         }
559         return INPROGRESS;
560     } else {
561         // remember that a new request has arrived
562         l.insert(it, reply_t(xid));
563         return NEW;
564     }
565 }
566
567 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
568 // and passes the return value in b.
569 // add_reply() should remember b.
570 // free_reply_window() and checkduplicate_and_update are responsible for
571 // cleaning up the remembered values.
572 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
573     lock rwl(reply_window_m_);
574     // remember the RPC reply value
575     list<reply_t> &l = reply_window_[clt_nonce];
576     list<reply_t>::iterator it = l.begin();
577     // skip to our place in the list
578     for (it++; it != l.end() && it->xid < xid; it++);
579     // there should already be an entry, so whine if there isn't
580     if (it == l.end() || it->xid != xid) {
581         cerr << "Could not find reply struct in add_reply" << endl;
582         l.insert(it, reply_t(xid, b));
583     } else {
584         *it = reply_t(xid, b);
585     }
586 }
587
588 void rpcs::free_reply_window(void) {
589     lock rwl(reply_window_m_);
590     reply_window_.clear();
591 }
592
593 int rpcs::rpcbind(unsigned int &r, int) {
594     IF_LEVEL(2) LOG("called return nonce " << nonce_);
595     r = nonce_;
596     return 0;
597 }
598
599 static sockaddr_in make_sockaddr(const string &hostandport) {
600     string host = "127.0.0.1";
601     string port = hostandport;
602     auto colon = hostandport.find(':');
603     if (colon != string::npos) {
604         host = hostandport.substr(0, colon);
605         port = hostandport.substr(colon+1);
606     }
607
608     sockaddr_in dst{}; // zero initialize
609     dst.sin_family = AF_INET;
610
611     struct in_addr a{inet_addr(host.c_str())};
612
613     if (a.s_addr != INADDR_NONE)
614         dst.sin_addr.s_addr = a.s_addr;
615     else {
616         struct hostent *hp = gethostbyname(host.c_str());
617
618         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
619             cerr << "cannot find host name " << host << endl;
620             exit(1);
621         }
622         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
623         dst.sin_addr.s_addr = a.s_addr;
624     }
625     dst.sin_port = hton((in_port_t)stoi(port));
626     return dst;
627 }