Reduced verbosity
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
3  server.  The jobs of rpcc include maintaining a connection to server, sending
4  RPC requests and waiting for responses, retransmissions, at-most-once delivery
5  etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has
15  failed (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The thread pool allows
29  us to control the number of threads spawned at the server (spawning one thread
30  per request will hurt when the server faces thousands of requests).
31
32  In order to delete a connection object, we must maintain a reference count.
33  For rpcc, multiple client threads might be invoking the rpcc::call() functions
34  and thus holding multiple references to the underlying connection object. For
35  rpcs, multiple dispatch threads might be holding references to the same
36  connection object.  A connection object is deleted only when the underlying
37  connection is dead and the reference count reaches zero.
38
39  This version of the RPC library explicitly joins exited threads to make sure
40  no outstanding references exist before deleting objects.
41
42  To delete a rpcc object safely, the users of the library must ensure that
43  there are no outstanding calls on the rpcc object.
44
45  To delete a rpcs object safely, we do the following in sequence: 1. stop
46  accepting new incoming connections. 2. close existing active connections.  3.
47  delete the dispatch thread pool which involves waiting for current active RPC
48  handlers to finish.  It is interesting how a thread pool can be deleted
49  without using thread cancellation. The trick is to inject x "poison pills" for
50  a thread pool of x threads. Upon getting a poison pill instead of a normal
51  task, a worker thread will exit (and thread pool destructor waits to join all
52  x exited worker threads).
53  */
54
55 #include "types.h"
56 #include "rpc.h"
57
58 #include <sys/types.h>
59 #include <arpa/inet.h>
60 #include <netinet/tcp.h>
61 #include <netdb.h>
62 #include <unistd.h>
63
64 inline void set_rand_seed() {
65     auto now = time_point_cast<nanoseconds>(steady_clock::now());
66     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
67 }
68
69 static sockaddr_in make_sockaddr(const string &hostandport);
70
71 rpcc::rpcc(const string & d, bool retrans) :
72     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
73     retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
74 {
75     if(retrans){
76         set_rand_seed();
77         clt_nonce_ = (unsigned int)random();
78     } else {
79         // special client nonce 0 means this client does not
80         // require at-most-once logic from the server
81         // because it uses tcp and never retries a failed connection
82         clt_nonce_ = 0;
83     }
84
85     char *loss_env = getenv("RPC_LOSSY");
86     if(loss_env)
87         lossytest_ = atoi(loss_env);
88
89     // xid starts with 1 and latest received reply starts with 0
90     xid_rep_window_.push_back(0);
91
92     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
93 }
94
95 // IMPORTANT: destruction should happen only when no external threads
96 // are blocked inside rpcc or will use rpcc in the future
97 rpcc::~rpcc() {
98     cancel();
99     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
100     if(chan_)
101         chan_->closeconn();
102     VERIFY(calls_.size() == 0);
103 }
104
105 int rpcc::bind(milliseconds to) {
106     unsigned int r;
107     int ret = call_timeout(rpc_const::bind, to, r, 0);
108     if(ret == 0){
109         lock ml(m_);
110         bind_done_ = true;
111         srv_nonce_ = r;
112     } else {
113         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
114     }
115     return ret;
116 };
117
118 // Cancel all outstanding calls
119 void rpcc::cancel(void) {
120     lock ml(m_);
121     if (calls_.size()) {
122         LOG("force callers to fail");
123         for(auto &p : calls_){
124             caller *ca = p.second;
125
126             IF_LEVEL(2) LOG("force caller to fail");
127             {
128                 lock cl(ca->m);
129                 ca->done = true;
130                 ca->intret = rpc_const::cancel_failure;
131                 ca->c.notify_one();
132             }
133         }
134
135         while (calls_.size () > 0){
136             destroy_wait_ = true;
137             destroy_wait_c_.wait(ml);
138         }
139         LOG("done");
140     }
141 }
142
143 int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
144
145     caller ca(0, &rep);
146     int xid_rep;
147     {
148         lock ml(m_);
149
150         if((proc != rpc_const::bind && !bind_done_) ||
151                 (proc == rpc_const::bind && bind_done_)){
152             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
153             return rpc_const::bind_failure;
154         }
155
156         if(destroy_wait_){
157           return rpc_const::cancel_failure;
158         }
159
160         ca.xid = xid_++;
161         calls_[ca.xid] = &ca;
162
163         req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
164         xid_rep = xid_rep_window_.front();
165     }
166
167     milliseconds curr_to = rpc::to_min;
168     auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
169
170     bool transmit = true;
171     shared_ptr<connection> ch;
172
173     while (1) {
174         if(transmit) {
175             get_refconn(ch);
176             if (ch) {
177                 if (reachable_) {
178                     request forgot;
179                     {
180                         lock ml(m_);
181                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
182                             forgot = dup_req_;
183                             dup_req_.clear();
184                         }
185                     }
186                     if (forgot.isvalid())
187                         ch->send(forgot.buf);
188                     ch->send(req);
189                 }
190                 else IF_LEVEL(1) LOG("not reachable");
191                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
192                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
193             }
194             transmit = false; // only send once on a given channel
195         }
196
197         if(finaldeadline == time_point<steady_clock>::min())
198             break;
199
200         nextdeadline = steady_clock::now() + curr_to;
201         if(nextdeadline > finaldeadline) {
202             nextdeadline = finaldeadline;
203             finaldeadline = time_point<steady_clock>::min();
204         }
205
206         {
207             lock cal(ca.m);
208             while (!ca.done){
209                 IF_LEVEL(2) LOG("wait");
210                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
211                     IF_LEVEL(2) LOG("timeout");
212                     break;
213                 }
214             }
215             if(ca.done){
216                 IF_LEVEL(2) LOG("reply received");
217                 break;
218             }
219         }
220
221         if(retrans_ && (!ch || ch->isdead())) {
222             // since connection is dead, retransmit
223             // on the new connection
224             transmit = true;
225         }
226         curr_to *= 2;
227     }
228
229     {
230         // no locking of ca.m since only this thread changes ca.xid
231         lock ml(m_);
232         calls_.erase(ca.xid);
233         // may need to update the xid again here, in case the
234         // packet times out before it's even sent by the channel.
235         // I don't think there's any harm in maybe doing it twice
236         update_xid_rep(ca.xid);
237
238         if(destroy_wait_){
239           destroy_wait_c_.notify_one();
240         }
241     }
242
243     if (ca.done && lossytest_)
244     {
245         lock ml(m_);
246         if (!dup_req_.isvalid()) {
247             dup_req_.buf = req;
248             dup_req_.xid = ca.xid;
249         }
250         if (xid_rep > xid_rep_done_)
251             xid_rep_done_ = xid_rep;
252     }
253
254     lock cal(ca.m);
255
256     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
257                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
258                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
259
260     // destruction of req automatically frees its buffer
261     return (ca.done? ca.intret : rpc_const::timeout_failure);
262 }
263
264 void
265 rpcc::get_refconn(shared_ptr<connection> & ch)
266 {
267     lock ml(chan_m_);
268     if (!chan_ || chan_->isdead())
269         chan_ = connect_to_dst(dst_, this, lossytest_);
270
271     if (chan_)
272         ch = chan_;
273 }
274
275 // PollMgr's thread is being used to
276 // make this upcall from connection object to rpcc.
277 // this funtion must not block.
278 //
279 // this function keeps no reference for connection *c
280 bool
281 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
282 {
283     unmarshall rep(b, true);
284     reply_header h;
285     rep.unpack_header(h);
286
287     if(!rep.ok()){
288         IF_LEVEL(1) LOG("unmarshall header failed!!!");
289         return true;
290     }
291
292     lock ml(m_);
293
294     update_xid_rep(h.xid);
295
296     if(calls_.find(h.xid) == calls_.end()){
297         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
298         return true;
299     }
300     caller *ca = calls_[h.xid];
301
302     lock cl(ca->m);
303     if(!ca->done){
304         *ca->rep = b;
305         ca->intret = h.ret;
306         if(ca->intret < 0){
307             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
308         }
309         ca->done = 1;
310     }
311     ca->c.notify_all();
312     return true;
313 }
314
315 // assumes thread holds mutex m
316 void
317 rpcc::update_xid_rep(int xid)
318 {
319     if(xid <= xid_rep_window_.front()){
320         return;
321     }
322
323     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
324         if(*it > xid){
325             xid_rep_window_.insert(it, xid);
326             goto compress;
327         }
328     }
329     xid_rep_window_.push_back(xid);
330
331 compress:
332     auto it = xid_rep_window_.begin();
333     for (it++; it != xid_rep_window_.end(); it++){
334         while (xid_rep_window_.front() + 1 == *it)
335             xid_rep_window_.pop_front();
336     }
337 }
338
339 rpcs::rpcs(in_port_t p1, size_t count)
340   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
341 {
342     set_rand_seed();
343     nonce_ = (unsigned int)random();
344     IF_LEVEL(2) LOG("created with nonce " << nonce_);
345
346     reg(rpc_const::bind, &rpcs::rpcbind, this);
347     dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
348 }
349
350 void rpcs::start() {
351     char *loss_env = getenv("RPC_LOSSY");
352     listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
353 }
354
355 rpcs::~rpcs()
356 {
357     // must delete listener before dispatchpool
358     listener_ = nullptr;
359     dispatchpool_ = nullptr;
360     free_reply_window();
361 }
362
363 bool
364 rpcs::got_pdu(const shared_ptr<connection> & c, const string & b)
365 {
366     if(!reachable_){
367         IF_LEVEL(1) LOG("not reachable");
368         return true;
369     }
370
371     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
372 }
373
374 void
375 rpcs::reg1(proc_t proc, handler *h)
376 {
377     lock pl(procs_m_);
378     VERIFY(procs_.count(proc) == 0);
379     procs_[proc] = h;
380     VERIFY(procs_.count(proc) >= 1);
381 }
382
383 void
384 rpcs::updatestat(proc_t proc)
385 {
386     lock cl(count_m_);
387     counts_[proc]++;
388     curr_counts_--;
389     if(curr_counts_ == 0){
390         LOG("RPC STATS: ");
391         for (auto i = counts_.begin(); i != counts_.end(); i++)
392             LOG(hex << i->first << ":" << dec << i->second);
393
394         lock rwl(reply_window_m_);
395
396         size_t totalrep = 0, maxrep = 0;
397         for (auto clt : reply_window_) {
398             totalrep += clt.second.size();
399             if(clt.second.size() > maxrep)
400                 maxrep = clt.second.size();
401         }
402         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
403                         totalrep << " max per client " << maxrep);
404         curr_counts_ = counting_;
405     }
406 }
407
408 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
409     unmarshall req(buf, true);
410
411     request_header h;
412     req.unpack_header(h);
413     proc_t proc = h.proc;
414
415     if (!req.ok()) {
416         IF_LEVEL(1) LOG("unmarshall header failed");
417         return;
418     }
419
420     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
421                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
422
423     marshall rep;
424     reply_header rh{h.xid,0};
425
426     // is client sending to an old instance of server?
427     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
428         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
429                         " (current " << nonce_ << ") proc " << hex << h.proc);
430         rh.ret = rpc_const::oldsrv_failure;
431         rep.pack_header(rh);
432         c->send(rep);
433         return;
434     }
435
436     handler *f;
437     // is RPC proc a registered procedure?
438     {
439         lock pl(procs_m_);
440         if(procs_.count(proc) < 1){
441             LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
442             VERIFY(0);
443             return;
444         }
445
446         f = procs_[proc];
447     }
448
449     rpcs::rpcstate_t stat;
450     string b1;
451
452     if(h.clt_nonce){
453         // have i seen this client before?
454         {
455             lock rwl(reply_window_m_);
456             // if we don't know about this clt_nonce, create a cleanup object
457             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
458                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
459                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
460                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
461                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
462             }
463         }
464
465         // save the latest good connection to the client
466         {
467             lock rwl(conns_m_);
468             if (conns_.find(h.clt_nonce) == conns_.end())
469                 conns_[h.clt_nonce] = c;
470             else if(conns_[h.clt_nonce]->create_time() < c->create_time())
471                 conns_[h.clt_nonce] = c;
472         }
473
474         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
475     } else {
476         // this client does not require at most once logic
477         stat = NEW;
478     }
479
480     switch (stat) {
481         case NEW: // new request
482             if (counting_)
483                 updatestat(proc);
484
485             rh.ret = (*f)(req, rep);
486             if (rh.ret == rpc_const::unmarshal_args_failure) {
487                 cerr << "failed to unmarshall the arguments. You are " <<
488                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
489                         "types of arguments." << endl;
490                 VERIFY(0);
491             }
492             VERIFY(rh.ret >= 0);
493
494             rep.pack_header(rh);
495             b1 = rep;
496
497             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
498                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
499
500             if (h.clt_nonce > 0) {
501                 // only record replies for clients that require at-most-once logic
502                 add_reply(h.clt_nonce, h.xid, b1);
503             }
504
505             // get the latest connection to the client
506             {
507                 lock rwl(conns_m_);
508                 if (c->isdead())
509                     c = conns_[h.clt_nonce];
510             }
511
512             c->send(rep);
513             break;
514         case INPROGRESS: // server is working on this request
515             break;
516         case DONE: // duplicate and we still have the response
517             c->send(b1);
518             break;
519         case FORGOTTEN: // very old request and we don't have the response anymore
520             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
521             rh.ret = rpc_const::atmostonce_failure;
522             rep.pack_header(rh);
523             c->send(rep);
524             break;
525     }
526 }
527
528 // rpcs::dispatch calls this when an RPC request arrives.
529 //
530 // checks to see if an RPC with xid from clt_nonce has already been received.
531 // if not, remembers the request in reply_window_.
532 //
533 // deletes remembered requests with XIDs <= xid_rep; the client
534 // says it has received a reply for every RPC up through xid_rep.
535 // frees the reply_t::buf of each such request.
536 //
537 // returns one of:
538 //   NEW: never seen this xid before.
539 //   INPROGRESS: seen this xid, and still processing it.
540 //   DONE: seen this xid, previous reply returned in b.
541 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
542 rpcs::rpcstate_t
543 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
544         int xid_rep, string & b)
545 {
546     lock rwl(reply_window_m_);
547
548     list<reply_t> &l = reply_window_[clt_nonce];
549
550     VERIFY(l.size() > 0);
551     VERIFY(xid >= xid_rep);
552
553     int past_xid_rep = l.begin()->xid;
554
555     list<reply_t>::iterator start = l.begin(), it = ++start;
556
557     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
558         // scan for deletion candidates
559         while (it != l.end() && it->xid < xid_rep)
560             it++;
561         l.erase(start, it);
562         l.begin()->xid = xid_rep;
563     }
564
565     if (xid < past_xid_rep && past_xid_rep != -1)
566         return FORGOTTEN;
567
568     // skip non-deletion candidates
569     while (it != l.end() && it->xid < xid)
570         it++;
571
572     // if it's in the list it must be right here
573     if (it != l.end() && it->xid == xid) {
574         if (it->cb_present) {
575             // return information about the remembered reply
576             b = it->buf;
577             return DONE;
578         }
579         return INPROGRESS;
580     } else {
581         // remember that a new request has arrived
582         l.insert(it, reply_t(xid));
583         return NEW;
584     }
585 }
586
587 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
588 // and passes the return value in b.
589 // add_reply() should remember b.
590 // free_reply_window() and checkduplicate_and_update are responsible for
591 // cleaning up the remembered values.
592 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
593     lock rwl(reply_window_m_);
594     // remember the RPC reply value
595     list<reply_t> &l = reply_window_[clt_nonce];
596     list<reply_t>::iterator it = l.begin();
597     // skip to our place in the list
598     for (it++; it != l.end() && it->xid < xid; it++);
599     // there should already be an entry, so whine if there isn't
600     if (it == l.end() || it->xid != xid) {
601         cerr << "Could not find reply struct in add_reply" << endl;
602         l.insert(it, reply_t(xid, b));
603     } else {
604         *it = reply_t(xid, b);
605     }
606 }
607
608 void rpcs::free_reply_window(void) {
609     lock rwl(reply_window_m_);
610     reply_window_.clear();
611 }
612
613 int rpcs::rpcbind(unsigned int &r, int) {
614     IF_LEVEL(2) LOG("called return nonce " << nonce_);
615     r = nonce_;
616     return 0;
617 }
618
619 static sockaddr_in make_sockaddr(const string &host, const string &port);
620
621 static sockaddr_in make_sockaddr(const string &hostandport) {
622     auto colon = hostandport.find(':');
623     if (colon == string::npos)
624         return make_sockaddr("127.0.0.1", hostandport);
625     else
626         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
627 }
628
629 static sockaddr_in make_sockaddr(const string &host, const string &port) {
630     sockaddr_in dst;
631     bzero(&dst, sizeof(dst));
632     dst.sin_family = AF_INET;
633
634     struct in_addr a{inet_addr(host.c_str())};
635
636     if(a.s_addr != INADDR_NONE)
637         dst.sin_addr.s_addr = a.s_addr;
638     else {
639         struct hostent *hp = gethostbyname(host.c_str());
640
641         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
642             cerr << "cannot find host name " << host << endl;
643             exit(1);
644         }
645         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
646         dst.sin_addr.s_addr = a.s_addr;
647     }
648     dst.sin_port = hton((in_port_t)stoi(port));
649     return dst;
650 }