Includes cleanups
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
3  server.  The jobs of rpcc include maintaining a connection to server, sending
4  RPC requests and waiting for responses, retransmissions, at-most-once delivery
5  etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has
15  failed (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The thread pool allows
29  us to control the number of threads spawned at the server (spawning one thread
30  per request will hurt when the server faces thousands of requests).
31
32  In order to delete a connection object, we must maintain a reference count.
33  For rpcc, multiple client threads might be invoking the rpcc::call() functions
34  and thus holding multiple references to the underlying connection object. For
35  rpcs, multiple dispatch threads might be holding references to the same
36  connection object.  A connection object is deleted only when the underlying
37  connection is dead and the reference count reaches zero.
38
39  This version of the RPC library explicitly joins exited threads to make sure
40  no outstanding references exist before deleting objects.
41
42  To delete a rpcc object safely, the users of the library must ensure that
43  there are no outstanding calls on the rpcc object.
44
45  To delete a rpcs object safely, we do the following in sequence: 1. stop
46  accepting new incoming connections. 2. close existing active connections.  3.
47  delete the dispatch thread pool which involves waiting for current active RPC
48  handlers to finish.  It is interesting how a thread pool can be deleted
49  without using thread cancellation. The trick is to inject x "poison pills" for
50  a thread pool of x threads. Upon getting a poison pill instead of a normal
51  task, a worker thread will exit (and thread pool destructor waits to join all
52  x exited worker threads).
53  */
54
55 #include "rpc.h"
56
57 #include <arpa/inet.h>
58 #include <netinet/tcp.h>
59 #include <netdb.h>
60 #include <unistd.h>
61
62 inline void set_rand_seed() {
63     auto now = time_point_cast<nanoseconds>(steady_clock::now());
64     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
65 }
66
67 static sockaddr_in make_sockaddr(const string &hostandport);
68
69 rpcc::rpcc(const string & d, bool retrans) :
70     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
71     retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
72 {
73     if(retrans){
74         set_rand_seed();
75         clt_nonce_ = (unsigned int)random();
76     } else {
77         // special client nonce 0 means this client does not
78         // require at-most-once logic from the server
79         // because it uses tcp and never retries a failed connection
80         clt_nonce_ = 0;
81     }
82
83     char *loss_env = getenv("RPC_LOSSY");
84     if(loss_env)
85         lossytest_ = atoi(loss_env);
86
87     // xid starts with 1 and latest received reply starts with 0
88     xid_rep_window_.push_back(0);
89
90     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
91 }
92
93 // IMPORTANT: destruction should happen only when no external threads
94 // are blocked inside rpcc or will use rpcc in the future
95 rpcc::~rpcc() {
96     cancel();
97     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
98     if(chan_)
99         chan_->closeconn();
100     VERIFY(calls_.size() == 0);
101 }
102
103 int rpcc::bind(milliseconds to) {
104     unsigned int r;
105     int ret = call_timeout(rpc_const::bind, to, r, 0);
106     if(ret == 0){
107         lock ml(m_);
108         bind_done_ = true;
109         srv_nonce_ = r;
110     } else {
111         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
112     }
113     return ret;
114 };
115
116 // Cancel all outstanding calls
117 void rpcc::cancel(void) {
118     lock ml(m_);
119     if (calls_.size()) {
120         LOG("force callers to fail");
121         for(auto &p : calls_){
122             caller *ca = p.second;
123
124             IF_LEVEL(2) LOG("force caller to fail");
125             {
126                 lock cl(ca->m);
127                 ca->done = true;
128                 ca->intret = rpc_const::cancel_failure;
129                 ca->c.notify_one();
130             }
131         }
132
133         while (calls_.size () > 0){
134             destroy_wait_ = true;
135             destroy_wait_c_.wait(ml);
136         }
137         LOG("done");
138     }
139 }
140
141 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
142
143     caller ca(0, &rep);
144     int xid_rep;
145     {
146         lock ml(m_);
147
148         if((proc != rpc_const::bind && !bind_done_) ||
149                 (proc == rpc_const::bind && bind_done_)){
150             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
151             return rpc_const::bind_failure;
152         }
153
154         if(destroy_wait_){
155           return rpc_const::cancel_failure;
156         }
157
158         ca.xid = xid_++;
159         calls_[ca.xid] = &ca;
160
161         req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
162         xid_rep = xid_rep_window_.front();
163     }
164
165     milliseconds curr_to = rpc::to_min;
166     auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
167
168     bool transmit = true;
169     shared_ptr<connection> ch;
170
171     while (1) {
172         if(transmit) {
173             get_refconn(ch);
174             if (ch) {
175                 if (reachable_) {
176                     request forgot;
177                     {
178                         lock ml(m_);
179                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
180                             forgot = dup_req_;
181                             dup_req_.clear();
182                         }
183                     }
184                     if (forgot.isvalid())
185                         ch->send(forgot.buf);
186                     ch->send(req);
187                 }
188                 else IF_LEVEL(1) LOG("not reachable");
189                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
190                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
191             }
192             transmit = false; // only send once on a given channel
193         }
194
195         if(finaldeadline == time_point<steady_clock>::min())
196             break;
197
198         nextdeadline = steady_clock::now() + curr_to;
199         if(nextdeadline > finaldeadline) {
200             nextdeadline = finaldeadline;
201             finaldeadline = time_point<steady_clock>::min();
202         }
203
204         {
205             lock cal(ca.m);
206             while (!ca.done){
207                 IF_LEVEL(2) LOG("wait");
208                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
209                     IF_LEVEL(2) LOG("timeout");
210                     break;
211                 }
212             }
213             if(ca.done){
214                 IF_LEVEL(2) LOG("reply received");
215                 break;
216             }
217         }
218
219         if(retrans_ && (!ch || ch->isdead())) {
220             // since connection is dead, retransmit
221             // on the new connection
222             transmit = true;
223         }
224         curr_to *= 2;
225     }
226
227     {
228         // no locking of ca.m since only this thread changes ca.xid
229         lock ml(m_);
230         calls_.erase(ca.xid);
231         // may need to update the xid again here, in case the
232         // packet times out before it's even sent by the channel.
233         // I don't think there's any harm in maybe doing it twice
234         update_xid_rep(ca.xid);
235
236         if(destroy_wait_){
237           destroy_wait_c_.notify_one();
238         }
239     }
240
241     if (ca.done && lossytest_)
242     {
243         lock ml(m_);
244         if (!dup_req_.isvalid()) {
245             dup_req_.buf = req;
246             dup_req_.xid = ca.xid;
247         }
248         if (xid_rep > xid_rep_done_)
249             xid_rep_done_ = xid_rep;
250     }
251
252     lock cal(ca.m);
253
254     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
255                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
256                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
257
258     // destruction of req automatically frees its buffer
259     return (ca.done? ca.intret : rpc_const::timeout_failure);
260 }
261
262 void
263 rpcc::get_refconn(shared_ptr<connection> & ch)
264 {
265     lock ml(chan_m_);
266     if (!chan_ || chan_->isdead())
267         chan_ = connect_to_dst(dst_, this, lossytest_);
268
269     if (chan_)
270         ch = chan_;
271 }
272
273 // PollMgr's thread is being used to
274 // make this upcall from connection object to rpcc.
275 // this funtion must not block.
276 //
277 // this function keeps no reference for connection *c
278 bool
279 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
280 {
281     unmarshall rep(b, true);
282     reply_header h;
283     rep.unpack_header(h);
284
285     if(!rep.ok()){
286         IF_LEVEL(1) LOG("unmarshall header failed!!!");
287         return true;
288     }
289
290     lock ml(m_);
291
292     update_xid_rep(h.xid);
293
294     if(calls_.find(h.xid) == calls_.end()){
295         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
296         return true;
297     }
298     caller *ca = calls_[h.xid];
299
300     lock cl(ca->m);
301     if(!ca->done){
302         *ca->rep = b;
303         ca->intret = h.ret;
304         if(ca->intret < 0){
305             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
306         }
307         ca->done = 1;
308     }
309     ca->c.notify_all();
310     return true;
311 }
312
313 // assumes thread holds mutex m
314 void
315 rpcc::update_xid_rep(int xid)
316 {
317     if(xid <= xid_rep_window_.front()){
318         return;
319     }
320
321     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
322         if(*it > xid){
323             xid_rep_window_.insert(it, xid);
324             goto compress;
325         }
326     }
327     xid_rep_window_.push_back(xid);
328
329 compress:
330     auto it = xid_rep_window_.begin();
331     for (it++; it != xid_rep_window_.end(); it++){
332         while (xid_rep_window_.front() + 1 == *it)
333             xid_rep_window_.pop_front();
334     }
335 }
336
337 rpcs::rpcs(in_port_t p1, size_t count)
338   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
339 {
340     set_rand_seed();
341     nonce_ = (unsigned int)random();
342     IF_LEVEL(2) LOG("created with nonce " << nonce_);
343
344     reg(rpc_const::bind, &rpcs::rpcbind, this);
345     dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
346 }
347
348 void rpcs::start() {
349     char *loss_env = getenv("RPC_LOSSY");
350     listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
351 }
352
353 rpcs::~rpcs()
354 {
355     // must delete listener before dispatchpool
356     listener_ = nullptr;
357     dispatchpool_ = nullptr;
358     free_reply_window();
359 }
360
361 bool
362 rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
363 {
364     if(!reachable_){
365         IF_LEVEL(1) LOG("not reachable");
366         return true;
367     }
368
369     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
370 }
371
372 void
373 rpcs::reg1(proc_t proc, handler *h)
374 {
375     lock pl(procs_m_);
376     VERIFY(procs_.count(proc) == 0);
377     procs_[proc] = h;
378     VERIFY(procs_.count(proc) >= 1);
379 }
380
381 void
382 rpcs::updatestat(proc_t proc)
383 {
384     lock cl(count_m_);
385     counts_[proc]++;
386     curr_counts_--;
387     if(curr_counts_ == 0){
388         LOG("RPC STATS: ");
389         for (auto i = counts_.begin(); i != counts_.end(); i++)
390             LOG(hex << i->first << ":" << dec << i->second);
391
392         lock rwl(reply_window_m_);
393
394         size_t totalrep = 0, maxrep = 0;
395         for (auto clt : reply_window_) {
396             totalrep += clt.second.size();
397             if(clt.second.size() > maxrep)
398                 maxrep = clt.second.size();
399         }
400         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
401                         totalrep << " max per client " << maxrep);
402         curr_counts_ = counting_;
403     }
404 }
405
406 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
407     unmarshall req(buf, true);
408
409     request_header h;
410     req.unpack_header(h);
411     proc_t proc = h.proc;
412
413     if (!req.ok()) {
414         IF_LEVEL(1) LOG("unmarshall header failed");
415         return;
416     }
417
418     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
419                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
420
421     marshall rep;
422     reply_header rh{h.xid,0};
423
424     // is client sending to an old instance of server?
425     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
426         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
427                         " (current " << nonce_ << ") proc " << hex << h.proc);
428         rh.ret = rpc_const::oldsrv_failure;
429         rep.pack_header(rh);
430         c->send(rep);
431         return;
432     }
433
434     handler *f;
435     // is RPC proc a registered procedure?
436     {
437         lock pl(procs_m_);
438         if(procs_.count(proc) < 1){
439             LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
440             VERIFY(0);
441             return;
442         }
443
444         f = procs_[proc];
445     }
446
447     rpcs::rpcstate_t stat;
448     string b1;
449
450     if(h.clt_nonce){
451         // have i seen this client before?
452         {
453             lock rwl(reply_window_m_);
454             // if we don't know about this clt_nonce, create a cleanup object
455             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
456                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
457                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
458                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
459                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
460             }
461         }
462
463         // save the latest good connection to the client
464         {
465             lock rwl(conns_m_);
466             if (conns_.find(h.clt_nonce) == conns_.end())
467                 conns_[h.clt_nonce] = c;
468             else if(conns_[h.clt_nonce]->create_time() < c->create_time())
469                 conns_[h.clt_nonce] = c;
470         }
471
472         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
473     } else {
474         // this client does not require at most once logic
475         stat = NEW;
476     }
477
478     switch (stat) {
479         case NEW: // new request
480             if (counting_)
481                 updatestat(proc);
482
483             rh.ret = (*f)(req, rep);
484             if (rh.ret == rpc_const::unmarshal_args_failure) {
485                 cerr << "failed to unmarshall the arguments. You are " <<
486                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
487                         "types of arguments." << endl;
488                 VERIFY(0);
489             }
490             VERIFY(rh.ret >= 0);
491
492             rep.pack_header(rh);
493             b1 = rep;
494
495             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
496                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
497
498             if (h.clt_nonce > 0) {
499                 // only record replies for clients that require at-most-once logic
500                 add_reply(h.clt_nonce, h.xid, b1);
501             }
502
503             // get the latest connection to the client
504             {
505                 lock rwl(conns_m_);
506                 if (c->isdead())
507                     c = conns_[h.clt_nonce];
508             }
509
510             c->send(rep);
511             break;
512         case INPROGRESS: // server is working on this request
513             break;
514         case DONE: // duplicate and we still have the response
515             c->send(b1);
516             break;
517         case FORGOTTEN: // very old request and we don't have the response anymore
518             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
519             rh.ret = rpc_const::atmostonce_failure;
520             rep.pack_header(rh);
521             c->send(rep);
522             break;
523     }
524 }
525
526 // rpcs::dispatch calls this when an RPC request arrives.
527 //
528 // checks to see if an RPC with xid from clt_nonce has already been received.
529 // if not, remembers the request in reply_window_.
530 //
531 // deletes remembered requests with XIDs <= xid_rep; the client
532 // says it has received a reply for every RPC up through xid_rep.
533 // frees the reply_t::buf of each such request.
534 //
535 // returns one of:
536 //   NEW: never seen this xid before.
537 //   INPROGRESS: seen this xid, and still processing it.
538 //   DONE: seen this xid, previous reply returned in b.
539 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
540 rpcs::rpcstate_t
541 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
542         int xid_rep, string & b)
543 {
544     lock rwl(reply_window_m_);
545
546     list<reply_t> &l = reply_window_[clt_nonce];
547
548     VERIFY(l.size() > 0);
549     VERIFY(xid >= xid_rep);
550
551     int past_xid_rep = l.begin()->xid;
552
553     list<reply_t>::iterator start = l.begin(), it = ++start;
554
555     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
556         // scan for deletion candidates
557         while (it != l.end() && it->xid < xid_rep)
558             it++;
559         l.erase(start, it);
560         l.begin()->xid = xid_rep;
561     }
562
563     if (xid < past_xid_rep && past_xid_rep != -1)
564         return FORGOTTEN;
565
566     // skip non-deletion candidates
567     while (it != l.end() && it->xid < xid)
568         it++;
569
570     // if it's in the list it must be right here
571     if (it != l.end() && it->xid == xid) {
572         if (it->cb_present) {
573             // return information about the remembered reply
574             b = it->buf;
575             return DONE;
576         }
577         return INPROGRESS;
578     } else {
579         // remember that a new request has arrived
580         l.insert(it, reply_t(xid));
581         return NEW;
582     }
583 }
584
585 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
586 // and passes the return value in b.
587 // add_reply() should remember b.
588 // free_reply_window() and checkduplicate_and_update are responsible for
589 // cleaning up the remembered values.
590 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
591     lock rwl(reply_window_m_);
592     // remember the RPC reply value
593     list<reply_t> &l = reply_window_[clt_nonce];
594     list<reply_t>::iterator it = l.begin();
595     // skip to our place in the list
596     for (it++; it != l.end() && it->xid < xid; it++);
597     // there should already be an entry, so whine if there isn't
598     if (it == l.end() || it->xid != xid) {
599         cerr << "Could not find reply struct in add_reply" << endl;
600         l.insert(it, reply_t(xid, b));
601     } else {
602         *it = reply_t(xid, b);
603     }
604 }
605
606 void rpcs::free_reply_window(void) {
607     lock rwl(reply_window_m_);
608     reply_window_.clear();
609 }
610
611 int rpcs::rpcbind(unsigned int &r, int) {
612     IF_LEVEL(2) LOG("called return nonce " << nonce_);
613     r = nonce_;
614     return 0;
615 }
616
617 static sockaddr_in make_sockaddr(const string &host, const string &port);
618
619 static sockaddr_in make_sockaddr(const string &hostandport) {
620     auto colon = hostandport.find(':');
621     if (colon == string::npos)
622         return make_sockaddr("127.0.0.1", hostandport);
623     else
624         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
625 }
626
627 static sockaddr_in make_sockaddr(const string &host, const string &port) {
628     sockaddr_in dst;
629     bzero(&dst, sizeof(dst));
630     dst.sin_family = AF_INET;
631
632     struct in_addr a{inet_addr(host.c_str())};
633
634     if(a.s_addr != INADDR_NONE)
635         dst.sin_addr.s_addr = a.s_addr;
636     else {
637         struct hostent *hp = gethostbyname(host.c_str());
638
639         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
640             cerr << "cannot find host name " << host << endl;
641             exit(1);
642         }
643         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
644         dst.sin_addr.s_addr = a.s_addr;
645     }
646     dst.sin_port = hton((in_port_t)stoi(port));
647     return dst;
648 }