More clean-ups
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a single RPC
3  server.  The jobs of rpcc include maintaining a connection to server, sending
4  RPC requests and waiting for responses, retransmissions, at-most-once delivery
5  etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has
15  failed (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The thread pool allows
29  us to control the number of threads spawned at the server (spawning one thread
30  per request will hurt when the server faces thousands of requests).
31
32  In order to delete a connection object, we must maintain a reference count.
33  For rpcc, multiple client threads might be invoking the rpcc::call() functions
34  and thus holding multiple references to the underlying connection object. For
35  rpcs, multiple dispatch threads might be holding references to the same
36  connection object.  A connection object is deleted only when the underlying
37  connection is dead and the reference count reaches zero.
38
39  This version of the RPC library explicitly joins exited threads to make sure
40  no outstanding references exist before deleting objects.
41
42  To delete a rpcc object safely, the users of the library must ensure that
43  there are no outstanding calls on the rpcc object.
44
45  To delete a rpcs object safely, we do the following in sequence: 1. stop
46  accepting new incoming connections. 2. close existing active connections.  3.
47  delete the dispatch thread pool which involves waiting for current active RPC
48  handlers to finish.  It is interesting how a thread pool can be deleted
49  without using thread cancellation. The trick is to inject x "poison pills" for
50  a thread pool of x threads. Upon getting a poison pill instead of a normal
51  task, a worker thread will exit (and thread pool destructor waits to join all
52  x exited worker threads).
53  */
54
55 #include "rpc.h"
56
57 #include <arpa/inet.h>
58 #include <netinet/tcp.h>
59 #include <netdb.h>
60 #include <unistd.h>
61
62 inline void set_rand_seed() {
63     auto now = time_point_cast<nanoseconds>(steady_clock::now());
64     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
65 }
66
67 static sockaddr_in make_sockaddr(const string &hostandport);
68
69 rpcc::rpcc(const string & d, bool retrans) :
70     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
71     retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
72 {
73     if (retrans) {
74         set_rand_seed();
75         clt_nonce_ = (unsigned int)random();
76     } else {
77         // special client nonce 0 means this client does not
78         // require at-most-once logic from the server
79         // because it uses tcp and never retries a failed connection
80         clt_nonce_ = 0;
81     }
82
83     char *loss_env = getenv("RPC_LOSSY");
84     if (loss_env)
85         lossytest_ = atoi(loss_env);
86
87     // xid starts with 1 and latest received reply starts with 0
88     xid_rep_window_.push_back(0);
89
90     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
91 }
92
93 // IMPORTANT: destruction should happen only when no external threads
94 // are blocked inside rpcc or will use rpcc in the future
95 rpcc::~rpcc() {
96     cancel();
97     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
98     if (chan_)
99         chan_->closeconn();
100     VERIFY(calls_.size() == 0);
101 }
102
103 int rpcc::bind(milliseconds to) {
104     unsigned int r;
105     int ret = call_timeout(rpc_protocol::bind, to, r, 0);
106     if (ret == 0) {
107         lock ml(m_);
108         bind_done_ = true;
109         srv_nonce_ = r;
110     } else {
111         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
112     }
113     return ret;
114 };
115
116 // Cancel all outstanding calls
117 void rpcc::cancel(void) {
118     lock ml(m_);
119     if (calls_.size()) {
120         LOG("force callers to fail");
121         for (auto &p : calls_) {
122             caller *ca = p.second;
123
124             IF_LEVEL(2) LOG("force caller to fail");
125
126             lock cl(ca->m);
127             ca->done = true;
128             ca->intret = rpc_protocol::cancel_failure;
129             ca->c.notify_one();
130         }
131
132         destroy_wait_ = true;
133         while (calls_.size () > 0)
134             destroy_wait_c_.wait(ml);
135
136         LOG("done");
137     }
138 }
139
140 int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
141
142     caller ca(0, &rep);
143     int xid_rep;
144     {
145         lock ml(m_);
146
147         if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
148             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
149             return rpc_protocol::bind_failure;
150         }
151
152         if (destroy_wait_)
153             return rpc_protocol::cancel_failure;
154
155         ca.xid = xid_++;
156         calls_[ca.xid] = &ca;
157
158         req.pack_header(rpc_protocol::request_header{
159                 ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
160                 });
161         xid_rep = xid_rep_window_.front();
162     }
163
164     milliseconds curr_to = rpc::to_min;
165     auto finaldeadline = steady_clock::now() + to;
166
167     bool transmit = true;
168     shared_ptr<connection> ch;
169
170     while (1) {
171         if (transmit) {
172             get_refconn(ch);
173             if (ch) {
174                 if (reachable_) {
175                     request forgot;
176                     {
177                         lock ml(m_);
178                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
179                             forgot = dup_req_;
180                             dup_req_.clear();
181                         }
182                     }
183                     if (forgot.isvalid())
184                         ch->send(forgot.buf);
185                     ch->send(req);
186                 }
187                 else IF_LEVEL(1) LOG("not reachable");
188                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
189                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
190             }
191             transmit = false; // only send once on a given channel
192         }
193
194         auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline);
195         curr_to *= 2;
196
197         {
198             lock cal(ca.m);
199             while (!ca.done) {
200                 IF_LEVEL(2) LOG("wait");
201                 if (ca.c.wait_until(cal, nextdeadline) == cv_status::timeout) {
202                     IF_LEVEL(2) LOG("timeout");
203                     break;
204                 }
205             }
206             if (ca.done) {
207                 IF_LEVEL(2) LOG("reply received");
208                 break;
209             }
210         }
211
212         if (nextdeadline >= finaldeadline)
213             break;
214
215         if (retrans_ && (!ch || ch->isdead())) {
216             // since connection is dead, retransmit
217             // on the new connection
218             transmit = true;
219         }
220     }
221
222     {
223         // no locking of ca.m since only this thread changes ca.xid
224         lock ml(m_);
225         calls_.erase(ca.xid);
226         // may need to update the xid again here, in case the
227         // packet times out before it's even sent by the channel.
228         // I don't think there's any harm in maybe doing it twice
229         update_xid_rep(ca.xid);
230
231         if (destroy_wait_)
232             destroy_wait_c_.notify_one();
233     }
234
235     if (ca.done && lossytest_)
236     {
237         lock ml(m_);
238         if (!dup_req_.isvalid()) {
239             dup_req_.buf = req;
240             dup_req_.xid = ca.xid;
241         }
242         if (xid_rep > xid_rep_done_)
243             xid_rep_done_ = xid_rep;
244     }
245
246     lock cal(ca.m);
247
248     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
249                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
250                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
251
252     // destruction of req automatically frees its buffer
253     return (ca.done? ca.intret : rpc_protocol::timeout_failure);
254 }
255
256 void rpcc::get_refconn(shared_ptr<connection> & ch) {
257     lock ml(chan_m_);
258     if (!chan_ || chan_->isdead())
259         chan_ = connection::to_dst(dst_, this, lossytest_);
260
261     if (chan_)
262         ch = chan_;
263 }
264
265 // PollMgr's thread is being used to
266 // make this upcall from connection object to rpcc.
267 // this funtion must not block.
268 //
269 // this function keeps no reference for connection *c
270 bool
271 rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
272 {
273     unmarshall rep(b, true);
274     rpc_protocol::reply_header h;
275     rep.unpack_header(h);
276
277     if (!rep.ok()) {
278         IF_LEVEL(1) LOG("unmarshall header failed!!!");
279         return true;
280     }
281
282     lock ml(m_);
283
284     update_xid_rep(h.xid);
285
286     if (calls_.find(h.xid) == calls_.end()) {
287         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
288         return true;
289     }
290     caller *ca = calls_[h.xid];
291
292     lock cl(ca->m);
293     if (!ca->done) {
294         *ca->rep = b;
295         ca->intret = h.ret;
296         if (ca->intret < 0) {
297             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
298         }
299         ca->done = 1;
300     }
301     ca->c.notify_all();
302     return true;
303 }
304
305 // assumes thread holds mutex m
306 void
307 rpcc::update_xid_rep(int xid)
308 {
309     if (xid <= xid_rep_window_.front())
310         return;
311
312     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++) {
313         if (*it > xid) {
314             xid_rep_window_.insert(it, xid);
315             goto compress;
316         }
317     }
318     xid_rep_window_.push_back(xid);
319
320 compress:
321     auto it = xid_rep_window_.begin();
322     for (it++; it != xid_rep_window_.end(); it++) {
323         while (xid_rep_window_.front() + 1 == *it)
324             xid_rep_window_.pop_front();
325     }
326 }
327
328 rpcs::rpcs(in_port_t p1, size_t count)
329   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
330 {
331     set_rand_seed();
332     nonce_ = (unsigned int)random();
333     IF_LEVEL(2) LOG("created with nonce " << nonce_);
334
335     reg(rpc_protocol::bind, &rpcs::rpcbind, this);
336     dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
337 }
338
339 void rpcs::start() {
340     char *loss_env = getenv("RPC_LOSSY");
341     listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
342 }
343
344 rpcs::~rpcs() {
345     // must delete listener before dispatchpool
346     listener_ = nullptr;
347     dispatchpool_ = nullptr;
348     free_reply_window();
349 }
350
351 bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
352     if (!reachable_) {
353         IF_LEVEL(1) LOG("not reachable");
354         return true;
355     }
356
357     return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
358 }
359
360 void rpcs::reg1(proc_id_t proc, handler *h) {
361     lock pl(procs_m_);
362     VERIFY(procs_.count(proc) == 0);
363     procs_[proc] = h;
364     VERIFY(procs_.count(proc) >= 1);
365 }
366
367 void rpcs::updatestat(proc_id_t proc) {
368     lock cl(count_m_);
369     counts_[proc]++;
370     curr_counts_--;
371     if (curr_counts_ == 0) {
372         LOG("RPC STATS: ");
373         for (auto i = counts_.begin(); i != counts_.end(); i++)
374             LOG(hex << i->first << ":" << dec << i->second);
375
376         lock rwl(reply_window_m_);
377
378         size_t totalrep = 0, maxrep = 0;
379         for (auto clt : reply_window_) {
380             totalrep += clt.second.size();
381             if (clt.second.size() > maxrep)
382                 maxrep = clt.second.size();
383         }
384         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
385                         totalrep << " max per client " << maxrep);
386         curr_counts_ = counting_;
387     }
388 }
389
390 void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
391     unmarshall req(buf, true);
392
393     rpc_protocol::request_header h;
394     req.unpack_header(h);
395     proc_id_t proc = h.proc;
396
397     if (!req.ok()) {
398         IF_LEVEL(1) LOG("unmarshall header failed");
399         return;
400     }
401
402     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
403                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
404
405     marshall rep;
406     rpc_protocol::reply_header rh{h.xid,0};
407
408     // is client sending to an old instance of server?
409     if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
410         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
411                         " (current " << nonce_ << ") proc " << hex << h.proc);
412         rh.ret = rpc_protocol::oldsrv_failure;
413         rep.pack_header(rh);
414         c->send(rep);
415         return;
416     }
417
418     handler *f;
419     // is RPC proc a registered procedure?
420     {
421         lock pl(procs_m_);
422         if (procs_.count(proc) < 1) {
423             LOG("unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_);
424             VERIFY(0);
425             return;
426         }
427
428         f = procs_[proc];
429     }
430
431     rpcs::rpcstate_t stat;
432     string b1;
433
434     if (h.clt_nonce) {
435         // have i seen this client before?
436         {
437             lock rwl(reply_window_m_);
438             // if we don't know about this clt_nonce, create a cleanup object
439             if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
440                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
441                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
442                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
443                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
444             }
445         }
446
447         // save the latest good connection to the client
448         {
449             lock rwl(conns_m_);
450             if (conns_.find(h.clt_nonce) == conns_.end())
451                 conns_[h.clt_nonce] = c;
452             else if (conns_[h.clt_nonce]->create_time() < c->create_time())
453                 conns_[h.clt_nonce] = c;
454         }
455
456         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
457     } else {
458         // this client does not require at most once logic
459         stat = NEW;
460     }
461
462     switch (stat) {
463         case NEW: // new request
464             if (counting_)
465                 updatestat(proc);
466
467             rh.ret = (*f)(req, rep);
468             if (rh.ret == rpc_protocol::unmarshal_args_failure) {
469                 LOG("failed to unmarshall the arguments. You are " <<
470                     "probably calling RPC 0x" << hex << proc << " with the wrong " <<
471                     "types of arguments.");
472                 VERIFY(0);
473             }
474             VERIFY(rh.ret >= 0);
475
476             rep.pack_header(rh);
477             b1 = rep;
478
479             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
480                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
481
482             if (h.clt_nonce > 0) {
483                 // only record replies for clients that require at-most-once logic
484                 add_reply(h.clt_nonce, h.xid, b1);
485             }
486
487             // get the latest connection to the client
488             {
489                 lock rwl(conns_m_);
490                 if (c->isdead())
491                     c = conns_[h.clt_nonce];
492             }
493
494             c->send(rep);
495             break;
496         case INPROGRESS: // server is working on this request
497             break;
498         case DONE: // duplicate and we still have the response
499             c->send(b1);
500             break;
501         case FORGOTTEN: // very old request and we don't have the response anymore
502             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
503             rh.ret = rpc_protocol::atmostonce_failure;
504             rep.pack_header(rh);
505             c->send(rep);
506             break;
507     }
508 }
509
510 // rpcs::dispatch calls this when an RPC request arrives.
511 //
512 // checks to see if an RPC with xid from clt_nonce has already been received.
513 // if not, remembers the request in reply_window_.
514 //
515 // deletes remembered requests with XIDs <= xid_rep; the client
516 // says it has received a reply for every RPC up through xid_rep.
517 // frees the reply_t::buf of each such request.
518 //
519 // returns one of:
520 //   NEW: never seen this xid before.
521 //   INPROGRESS: seen this xid, and still processing it.
522 //   DONE: seen this xid, previous reply returned in b.
523 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
524 rpcs::rpcstate_t
525 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
526         int xid_rep, string & b)
527 {
528     lock rwl(reply_window_m_);
529
530     list<reply_t> &l = reply_window_[clt_nonce];
531
532     VERIFY(l.size() > 0);
533     VERIFY(xid >= xid_rep);
534
535     int past_xid_rep = l.begin()->xid;
536
537     list<reply_t>::iterator start = l.begin(), it = ++start;
538
539     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
540         // scan for deletion candidates
541         while (it != l.end() && it->xid < xid_rep)
542             it++;
543         l.erase(start, it);
544         l.begin()->xid = xid_rep;
545     }
546
547     if (xid < past_xid_rep && past_xid_rep != -1)
548         return FORGOTTEN;
549
550     // skip non-deletion candidates
551     while (it != l.end() && it->xid < xid)
552         it++;
553
554     // if it's in the list it must be right here
555     if (it != l.end() && it->xid == xid) {
556         if (it->cb_present) {
557             // return information about the remembered reply
558             b = it->buf;
559             return DONE;
560         }
561         return INPROGRESS;
562     } else {
563         // remember that a new request has arrived
564         l.insert(it, reply_t(xid));
565         return NEW;
566     }
567 }
568
569 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
570 // and passes the return value in b.
571 // add_reply() should remember b.
572 // free_reply_window() and checkduplicate_and_update are responsible for
573 // cleaning up the remembered values.
574 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
575     lock rwl(reply_window_m_);
576     // remember the RPC reply value
577     list<reply_t> &l = reply_window_[clt_nonce];
578     list<reply_t>::iterator it = l.begin();
579     // skip to our place in the list
580     for (it++; it != l.end() && it->xid < xid; it++);
581     // there should already be an entry, so whine if there isn't
582     if (it == l.end() || it->xid != xid) {
583         LOG("Could not find reply struct in add_reply");
584         l.insert(it, reply_t(xid, b));
585     } else {
586         *it = reply_t(xid, b);
587     }
588 }
589
590 void rpcs::free_reply_window(void) {
591     lock rwl(reply_window_m_);
592     reply_window_.clear();
593 }
594
595 int rpcs::rpcbind(unsigned int &r, int) {
596     IF_LEVEL(2) LOG("called return nonce " << nonce_);
597     r = nonce_;
598     return 0;
599 }
600
601 static sockaddr_in make_sockaddr(const string &hostandport) {
602     string host = "127.0.0.1";
603     string port = hostandport;
604     auto colon = hostandport.find(':');
605     if (colon != string::npos) {
606         host = hostandport.substr(0, colon);
607         port = hostandport.substr(colon+1);
608     }
609
610     sockaddr_in dst{}; // zero initialize
611     dst.sin_family = AF_INET;
612
613     struct in_addr a{inet_addr(host.c_str())};
614
615     if (a.s_addr != INADDR_NONE)
616         dst.sin_addr.s_addr = a.s_addr;
617     else {
618         struct hostent *hp = gethostbyname(host.c_str());
619
620         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
621             LOG_NONMEMBER("cannot find host name " << host);
622             exit(1);
623         }
624         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
625         dst.sin_addr.s_addr = a.s_addr;
626     }
627     dst.sin_port = hton((in_port_t)stoi(port));
628     return dst;
629 }