Lots more clean-ups
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
3  server.  The jobs of rpcc include maintaining a connection to server, sending
4  RPC requests and waiting for responses, retransmissions, at-most-once delivery
5  etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has
15  failed (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The thread pool allows
29  us to control the number of threads spawned at the server (spawning one thread
30  per request will hurt when the server faces thousands of requests).
31
32  In order to delete a connection object, we must maintain a reference count.
33  For rpcc, multiple client threads might be invoking the rpcc::call() functions
34  and thus holding multiple references to the underlying connection object. For
35  rpcs, multiple dispatch threads might be holding references to the same
36  connection object.  A connection object is deleted only when the underlying
37  connection is dead and the reference count reaches zero.
38
39  This version of the RPC library explicitly joins exited threads to make sure
40  no outstanding references exist before deleting objects.
41
42  To delete a rpcc object safely, the users of the library must ensure that
43  there are no outstanding calls on the rpcc object.
44
45  To delete a rpcs object safely, we do the following in sequence: 1. stop
46  accepting new incoming connections. 2. close existing active connections.  3.
47  delete the dispatch thread pool which involves waiting for current active RPC
48  handlers to finish.  It is interesting how a thread pool can be deleted
49  without using thread cancellation. The trick is to inject x "poison pills" for
50  a thread pool of x threads. Upon getting a poison pill instead of a normal
51  task, a worker thread will exit (and thread pool destructor waits to join all
52  x exited worker threads).
53  */
54
55 #include "types.h"
56 #include "rpc.h"
57
58 #include <sys/types.h>
59 #include <arpa/inet.h>
60 #include <netinet/tcp.h>
61 #include <netdb.h>
62 #include <unistd.h>
63
64 inline void set_rand_seed() {
65     auto now = time_point_cast<nanoseconds>(steady_clock::now());
66     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
67 }
68
69 static sockaddr_in make_sockaddr(const string &hostandport);
70
71 rpcc::rpcc(const string & d, bool retrans) :
72     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
73     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
74 {
75     if(retrans){
76         set_rand_seed();
77         clt_nonce_ = (unsigned int)random();
78     } else {
79         // special client nonce 0 means this client does not
80         // require at-most-once logic from the server
81         // because it uses tcp and never retries a failed connection
82         clt_nonce_ = 0;
83     }
84
85     char *loss_env = getenv("RPC_LOSSY");
86     if(loss_env)
87         lossytest_ = atoi(loss_env);
88
89     // xid starts with 1 and latest received reply starts with 0
90     xid_rep_window_.push_back(0);
91
92     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
93 }
94
95 // IMPORTANT: destruction should happen only when no external threads
96 // are blocked inside rpcc or will use rpcc in the future
97 rpcc::~rpcc() {
98     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
99     if(chan_){
100         chan_->closeconn();
101         chan_->decref();
102     }
103     VERIFY(calls_.size() == 0);
104 }
105
106 int rpcc::bind(milliseconds to) {
107     unsigned int r;
108     int ret = call_timeout(rpc_const::bind, to, r, 0);
109     if(ret == 0){
110         lock ml(m_);
111         bind_done_ = true;
112         srv_nonce_ = r;
113     } else {
114         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
115     }
116     return ret;
117 };
118
119 // Cancel all outstanding calls
120 void rpcc::cancel(void) {
121     lock ml(m_);
122     LOG("force callers to fail");
123     for(auto &p : calls_){
124         caller *ca = p.second;
125
126         IF_LEVEL(2) LOG("force caller to fail");
127         {
128             lock cl(ca->m);
129             ca->done = true;
130             ca->intret = rpc_const::cancel_failure;
131             ca->c.notify_one();
132         }
133     }
134
135     while (calls_.size () > 0){
136         destroy_wait_ = true;
137         destroy_wait_c_.wait(ml);
138     }
139     LOG("done");
140 }
141
142 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
143
144     caller ca(0, &rep);
145     int xid_rep;
146     {
147         lock ml(m_);
148
149         if((proc != rpc_const::bind && !bind_done_) ||
150                 (proc == rpc_const::bind && bind_done_)){
151             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
152             return rpc_const::bind_failure;
153         }
154
155         if(destroy_wait_){
156           return rpc_const::cancel_failure;
157         }
158
159         ca.xid = xid_++;
160         calls_[ca.xid] = &ca;
161
162         req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
163         xid_rep = xid_rep_window_.front();
164     }
165
166     milliseconds curr_to = rpc::to_min;
167     auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
168
169     bool transmit = true;
170     connection *ch = NULL;
171
172     while (1){
173         if(transmit){
174             get_refconn(&ch);
175             if(ch){
176                 if(reachable_) {
177                     request forgot;
178                     {
179                         lock ml(m_);
180                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
181                             forgot = dup_req_;
182                             dup_req_.clear();
183                         }
184                     }
185                     if (forgot.isvalid())
186                         ch->send(forgot.buf);
187                     ch->send(req);
188                 }
189                 else IF_LEVEL(1) LOG("not reachable");
190                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
191                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
192             }
193             transmit = false; // only send once on a given channel
194         }
195
196         if(finaldeadline == time_point<steady_clock>::min())
197             break;
198
199         nextdeadline = steady_clock::now() + curr_to;
200         if(nextdeadline > finaldeadline) {
201             nextdeadline = finaldeadline;
202             finaldeadline = time_point<steady_clock>::min();
203         }
204
205         {
206             lock cal(ca.m);
207             while (!ca.done){
208                 IF_LEVEL(2) LOG("wait");
209                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
210                     IF_LEVEL(2) LOG("timeout");
211                     break;
212                 }
213             }
214             if(ca.done){
215                 IF_LEVEL(2) LOG("reply received");
216                 break;
217             }
218         }
219
220         if(retrans_ && (!ch || ch->isdead())){
221             // since connection is dead, retransmit
222             // on the new connection
223             transmit = true;
224         }
225         curr_to *= 2;
226     }
227
228     {
229         // no locking of ca.m since only this thread changes ca.xid
230         lock ml(m_);
231         calls_.erase(ca.xid);
232         // may need to update the xid again here, in case the
233         // packet times out before it's even sent by the channel.
234         // I don't think there's any harm in maybe doing it twice
235         update_xid_rep(ca.xid);
236
237         if(destroy_wait_){
238           destroy_wait_c_.notify_one();
239         }
240     }
241
242     if (ca.done && lossytest_)
243     {
244         lock ml(m_);
245         if (!dup_req_.isvalid()) {
246             dup_req_.buf = req;
247             dup_req_.xid = ca.xid;
248         }
249         if (xid_rep > xid_rep_done_)
250             xid_rep_done_ = xid_rep;
251     }
252
253     lock cal(ca.m);
254
255     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
256                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
257                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
258
259     if(ch)
260         ch->decref();
261
262     // destruction of req automatically frees its buffer
263     return (ca.done? ca.intret : rpc_const::timeout_failure);
264 }
265
266 void
267 rpcc::get_refconn(connection **ch)
268 {
269     lock ml(chan_m_);
270     if(!chan_ || chan_->isdead()){
271         if(chan_)
272             chan_->decref();
273         chan_ = connect_to_dst(dst_, this, lossytest_);
274     }
275     if(ch && chan_){
276         if(*ch){
277             (*ch)->decref();
278         }
279         *ch = chan_;
280         (*ch)->incref();
281     }
282 }
283
284 // PollMgr's thread is being used to
285 // make this upcall from connection object to rpcc.
286 // this funtion must not block.
287 //
288 // this function keeps no reference for connection *c
289 bool
290 rpcc::got_pdu(connection *, const string & b)
291 {
292     unmarshall rep(b, true);
293     reply_header h;
294     rep.unpack_header(h);
295
296     if(!rep.ok()){
297         IF_LEVEL(1) LOG("unmarshall header failed!!!");
298         return true;
299     }
300
301     lock ml(m_);
302
303     update_xid_rep(h.xid);
304
305     if(calls_.find(h.xid) == calls_.end()){
306         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
307         return true;
308     }
309     caller *ca = calls_[h.xid];
310
311     lock cl(ca->m);
312     if(!ca->done){
313         *ca->rep = b;
314         ca->intret = h.ret;
315         if(ca->intret < 0){
316             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
317         }
318         ca->done = 1;
319     }
320     ca->c.notify_all();
321     return true;
322 }
323
324 // assumes thread holds mutex m
325 void
326 rpcc::update_xid_rep(int xid)
327 {
328     if(xid <= xid_rep_window_.front()){
329         return;
330     }
331
332     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
333         if(*it > xid){
334             xid_rep_window_.insert(it, xid);
335             goto compress;
336         }
337     }
338     xid_rep_window_.push_back(xid);
339
340 compress:
341     auto it = xid_rep_window_.begin();
342     for (it++; it != xid_rep_window_.end(); it++){
343         while (xid_rep_window_.front() + 1 == *it)
344             xid_rep_window_.pop_front();
345     }
346 }
347
348 rpcs::rpcs(in_port_t p1, size_t count)
349   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
350 {
351     set_rand_seed();
352     nonce_ = (unsigned int)random();
353     IF_LEVEL(2) LOG("created with nonce " << nonce_);
354
355     reg(rpc_const::bind, &rpcs::rpcbind, this);
356     dispatchpool_ = new ThrPool(6, false);
357
358     char *loss_env = getenv("RPC_LOSSY");
359     listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
360 }
361
362 rpcs::~rpcs()
363 {
364     // must delete listener before dispatchpool
365     delete listener_;
366     delete dispatchpool_;
367     free_reply_window();
368 }
369
370 bool
371 rpcs::got_pdu(connection *c, const string & b)
372 {
373     if(!reachable_){
374         IF_LEVEL(1) LOG("not reachable");
375         return true;
376     }
377
378     djob_t *j = new djob_t{c, b};
379     c->incref();
380     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
381     if(!succ || !reachable_){
382         c->decref();
383         delete j;
384     }
385     return succ;
386 }
387
388 void
389 rpcs::reg1(proc_t proc, handler *h)
390 {
391     lock pl(procs_m_);
392     VERIFY(procs_.count(proc) == 0);
393     procs_[proc] = h;
394     VERIFY(procs_.count(proc) >= 1);
395 }
396
397 void
398 rpcs::updatestat(proc_t proc)
399 {
400     lock cl(count_m_);
401     counts_[proc]++;
402     curr_counts_--;
403     if(curr_counts_ == 0){
404         LOG("RPC STATS: ");
405         for (auto i = counts_.begin(); i != counts_.end(); i++)
406             LOG(hex << i->first << ":" << dec << i->second);
407
408         lock rwl(reply_window_m_);
409
410         size_t totalrep = 0, maxrep = 0;
411         for (auto clt : reply_window_) {
412             totalrep += clt.second.size();
413             if(clt.second.size() > maxrep)
414                 maxrep = clt.second.size();
415         }
416         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
417                         totalrep << " max per client " << maxrep);
418         curr_counts_ = counting_;
419     }
420 }
421
422 void
423 rpcs::dispatch(djob_t *j)
424 {
425     connection *c = j->conn;
426     unmarshall req(j->buf, true);
427     delete j;
428
429     request_header h;
430     req.unpack_header(h);
431     proc_t proc = h.proc;
432
433     if(!req.ok()){
434         IF_LEVEL(1) LOG("unmarshall header failed!!!");
435         c->decref();
436         return;
437     }
438
439     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
440                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
441
442     marshall rep;
443     reply_header rh{h.xid,0};
444
445     // is client sending to an old instance of server?
446     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
447         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
448                         " (current " << nonce_ << ") proc " << hex << h.proc);
449         rh.ret = rpc_const::oldsrv_failure;
450         rep.pack_header(rh);
451         c->send(rep);
452         return;
453     }
454
455     handler *f;
456     // is RPC proc a registered procedure?
457     {
458         lock pl(procs_m_);
459         if(procs_.count(proc) < 1){
460             cerr << "unknown proc " << hex << proc << "." << endl;
461             c->decref();
462             VERIFY(0);
463             return;
464         }
465
466         f = procs_[proc];
467     }
468
469     rpcs::rpcstate_t stat;
470     string b1;
471
472     if(h.clt_nonce){
473         // have i seen this client before?
474         {
475             lock rwl(reply_window_m_);
476             // if we don't know about this clt_nonce, create a cleanup object
477             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
478                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
479                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
480                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
481                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
482             }
483         }
484
485         // save the latest good connection to the client
486         {
487             lock rwl(conns_m_);
488             if(conns_.find(h.clt_nonce) == conns_.end()){
489                 c->incref();
490                 conns_[h.clt_nonce] = c;
491             } else if(conns_[h.clt_nonce]->compare(c) < 0){
492                 conns_[h.clt_nonce]->decref();
493                 c->incref();
494                 conns_[h.clt_nonce] = c;
495             }
496         }
497
498         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
499     } else {
500         // this client does not require at most once logic
501         stat = NEW;
502     }
503
504     switch (stat) {
505         case NEW: // new request
506             if (counting_){
507                 updatestat(proc);
508             }
509
510             rh.ret = (*f)(req, rep);
511             if (rh.ret == rpc_const::unmarshal_args_failure) {
512                 cerr << "failed to unmarshall the arguments. You are " <<
513                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
514                         "types of arguments." << endl;
515                 VERIFY(0);
516             }
517             VERIFY(rh.ret >= 0);
518
519             rep.pack_header(rh);
520             b1 = rep;
521
522             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
523                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
524
525             if (h.clt_nonce > 0) {
526                 // only record replies for clients that require at-most-once logic
527                 add_reply(h.clt_nonce, h.xid, b1);
528             }
529
530             // get the latest connection to the client
531             {
532                 lock rwl(conns_m_);
533                 if(c->isdead() && c != conns_[h.clt_nonce]){
534                     c->decref();
535                     c = conns_[h.clt_nonce];
536                     c->incref();
537                 }
538             }
539
540             c->send(rep);
541             break;
542         case INPROGRESS: // server is working on this request
543             break;
544         case DONE: // duplicate and we still have the response
545             c->send(b1);
546             break;
547         case FORGOTTEN: // very old request and we don't have the response anymore
548             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
549             rh.ret = rpc_const::atmostonce_failure;
550             rep.pack_header(rh);
551             c->send(rep);
552             break;
553     }
554     c->decref();
555 }
556
557 // rpcs::dispatch calls this when an RPC request arrives.
558 //
559 // checks to see if an RPC with xid from clt_nonce has already been received.
560 // if not, remembers the request in reply_window_.
561 //
562 // deletes remembered requests with XIDs <= xid_rep; the client
563 // says it has received a reply for every RPC up through xid_rep.
564 // frees the reply_t::buf of each such request.
565 //
566 // returns one of:
567 //   NEW: never seen this xid before.
568 //   INPROGRESS: seen this xid, and still processing it.
569 //   DONE: seen this xid, previous reply returned in b.
570 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
571 rpcs::rpcstate_t
572 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
573         int xid_rep, string & b)
574 {
575     lock rwl(reply_window_m_);
576
577     list<reply_t> &l = reply_window_[clt_nonce];
578
579     VERIFY(l.size() > 0);
580     VERIFY(xid >= xid_rep);
581
582     int past_xid_rep = l.begin()->xid;
583
584     list<reply_t>::iterator start = l.begin(), it = ++start;
585
586     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
587         // scan for deletion candidates
588         while (it != l.end() && it->xid < xid_rep)
589             it++;
590         l.erase(start, it);
591         l.begin()->xid = xid_rep;
592     }
593
594     if (xid < past_xid_rep && past_xid_rep != -1)
595         return FORGOTTEN;
596
597     // skip non-deletion candidates
598     while (it != l.end() && it->xid < xid)
599         it++;
600
601     // if it's in the list it must be right here
602     if (it != l.end() && it->xid == xid) {
603         if (it->cb_present) {
604             // return information about the remembered reply
605             b = it->buf;
606             return DONE;
607         }
608         return INPROGRESS;
609     } else {
610         // remember that a new request has arrived
611         l.insert(it, reply_t(xid));
612         return NEW;
613     }
614 }
615
616 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
617 // and passes the return value in b.
618 // add_reply() should remember b.
619 // free_reply_window() and checkduplicate_and_update are responsible for
620 // cleaning up the remembered values.
621 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
622     lock rwl(reply_window_m_);
623     // remember the RPC reply value
624     list<reply_t> &l = reply_window_[clt_nonce];
625     list<reply_t>::iterator it = l.begin();
626     // skip to our place in the list
627     for (it++; it != l.end() && it->xid < xid; it++);
628     // there should already be an entry, so whine if there isn't
629     if (it == l.end() || it->xid != xid) {
630         cerr << "Could not find reply struct in add_reply" << endl;
631         l.insert(it, reply_t(xid, b));
632     } else {
633         *it = reply_t(xid, b);
634     }
635 }
636
637 void rpcs::free_reply_window(void) {
638     lock rwl(reply_window_m_);
639     reply_window_.clear();
640 }
641
642 int rpcs::rpcbind(unsigned int &r, int) {
643     IF_LEVEL(2) LOG("called return nonce " << nonce_);
644     r = nonce_;
645     return 0;
646 }
647
648 static sockaddr_in make_sockaddr(const string &host, const string &port);
649
650 static sockaddr_in make_sockaddr(const string &hostandport) {
651     auto colon = hostandport.find(':');
652     if (colon == string::npos)
653         return make_sockaddr("127.0.0.1", hostandport);
654     else
655         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
656 }
657
658 static sockaddr_in make_sockaddr(const string &host, const string &port) {
659     sockaddr_in dst;
660     bzero(&dst, sizeof(dst));
661     dst.sin_family = AF_INET;
662
663     struct in_addr a{inet_addr(host.c_str())};
664
665     if(a.s_addr != INADDR_NONE)
666         dst.sin_addr.s_addr = a.s_addr;
667     else {
668         struct hostent *hp = gethostbyname(host.c_str());
669
670         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
671             cerr << "cannot find host name " << host << endl;
672             exit(1);
673         }
674         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
675         dst.sin_addr.s_addr = a.s_addr;
676     }
677     dst.sin_port = hton((in_port_t)stoi(port));
678     return dst;
679 }