Fixed two major bugs in paxos.cc.
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "types.h"
58 #include "rpc.h"
59
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
63 #include <netdb.h>
64 #include <unistd.h>
65
66 const rpcc::TO rpcc::to_max = { 120000 };
67 const rpcc::TO rpcc::to_min = { 1000 };
68
69 inline void set_rand_seed() {
70     auto now = time_point_cast<nanoseconds>(steady_clock::now());
71     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
72 }
73
74 static sockaddr_in make_sockaddr(const string &hostandport);
75
76 rpcc::rpcc(const string & d, bool retrans) :
77     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
78     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
79 {
80     if(retrans){
81         set_rand_seed();
82         clt_nonce_ = (unsigned int)random();
83     } else {
84         // special client nonce 0 means this client does not
85         // require at-most-once logic from the server
86         // because it uses tcp and never retries a failed connection
87         clt_nonce_ = 0;
88     }
89
90     char *loss_env = getenv("RPC_LOSSY");
91     if(loss_env)
92         lossytest_ = atoi(loss_env);
93
94     // xid starts with 1 and latest received reply starts with 0
95     xid_rep_window_.push_back(0);
96
97     IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
98 }
99
100 // IMPORTANT: destruction should happen only when no external threads
101 // are blocked inside rpcc or will use rpcc in the future
102 rpcc::~rpcc() {
103     IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
104     if(chan_){
105         chan_->closeconn();
106         chan_->decref();
107     }
108     VERIFY(calls_.size() == 0);
109 }
110
111 int rpcc::bind(TO to) {
112     unsigned int r;
113     int ret = call_timeout(rpc_const::bind, to, r, 0);
114     if(ret == 0){
115         lock ml(m_);
116         bind_done_ = true;
117         srv_nonce_ = r;
118     } else {
119         IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
120     }
121     return ret;
122 };
123
124 // Cancel all outstanding calls
125 void rpcc::cancel(void) {
126     lock ml(m_);
127     LOG("force callers to fail");
128     for(auto &p : calls_){
129         caller *ca = p.second;
130
131         IF_LEVEL(2) LOG("force caller to fail");
132         {
133             lock cl(ca->m);
134             ca->done = true;
135             ca->intret = rpc_const::cancel_failure;
136             ca->c.notify_one();
137         }
138     }
139
140     while (calls_.size () > 0){
141         destroy_wait_ = true;
142         destroy_wait_c_.wait(ml);
143     }
144     LOG("done");
145 }
146
147 int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
148
149     caller ca(0, &rep);
150     int xid_rep;
151     {
152         lock ml(m_);
153
154         if((proc != rpc_const::bind && !bind_done_) ||
155                 (proc == rpc_const::bind && bind_done_)){
156             IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
157             return rpc_const::bind_failure;
158         }
159
160         if(destroy_wait_){
161           return rpc_const::cancel_failure;
162         }
163
164         ca.xid = xid_++;
165         calls_[ca.xid] = &ca;
166
167         req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
168         xid_rep = xid_rep_window_.front();
169     }
170
171     TO curr_to;
172     auto finaldeadline = steady_clock::now() + milliseconds(to.to),
173         nextdeadline = finaldeadline;
174
175     curr_to.to = to_min.to;
176
177     bool transmit = true;
178     connection *ch = NULL;
179
180     while (1){
181         if(transmit){
182             get_refconn(&ch);
183             if(ch){
184                 if(reachable_) {
185                     request forgot;
186                     {
187                         lock ml(m_);
188                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
189                             forgot = dup_req_;
190                             dup_req_.clear();
191                         }
192                     }
193                     if (forgot.isvalid())
194                         ch->send(forgot.buf);
195                     ch->send(req);
196                 }
197                 else IF_LEVEL(1) LOG("not reachable");
198                 IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc <<
199                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
200             }
201             transmit = false; // only send once on a given channel
202         }
203
204         if(finaldeadline == time_point<steady_clock>::min())
205             break;
206
207         nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
208         if(nextdeadline > finaldeadline) {
209             nextdeadline = finaldeadline;
210             finaldeadline = time_point<steady_clock>::min();
211         }
212
213         {
214             lock cal(ca.m);
215             while (!ca.done){
216                 IF_LEVEL(2) LOG("wait");
217                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
218                     IF_LEVEL(2) LOG("timeout");
219                     break;
220                 }
221             }
222             if(ca.done){
223                 IF_LEVEL(2) LOG("reply received");
224                 break;
225             }
226         }
227
228         if(retrans_ && (!ch || ch->isdead())){
229             // since connection is dead, retransmit
230             // on the new connection
231             transmit = true;
232         }
233         curr_to.to <<= 1;
234     }
235
236     {
237         // no locking of ca.m since only this thread changes ca.xid
238         lock ml(m_);
239         calls_.erase(ca.xid);
240         // may need to update the xid again here, in case the
241         // packet times out before it's even sent by the channel.
242         // I don't think there's any harm in maybe doing it twice
243         update_xid_rep(ca.xid);
244
245         if(destroy_wait_){
246           destroy_wait_c_.notify_one();
247         }
248     }
249
250     if (ca.done && lossytest_)
251     {
252         lock ml(m_);
253         if (!dup_req_.isvalid()) {
254             dup_req_.buf = req;
255             dup_req_.xid = ca.xid;
256         }
257         if (xid_rep > xid_rep_done_)
258             xid_rep_done_ = xid_rep;
259     }
260
261     lock cal(ca.m);
262
263     IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc <<
264                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
265                     ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
266
267     if(ch)
268         ch->decref();
269
270     // destruction of req automatically frees its buffer
271     return (ca.done? ca.intret : rpc_const::timeout_failure);
272 }
273
274 void
275 rpcc::get_refconn(connection **ch)
276 {
277     lock ml(chan_m_);
278     if(!chan_ || chan_->isdead()){
279         if(chan_)
280             chan_->decref();
281         chan_ = connect_to_dst(dst_, this, lossytest_);
282     }
283     if(ch && chan_){
284         if(*ch){
285             (*ch)->decref();
286         }
287         *ch = chan_;
288         (*ch)->incref();
289     }
290 }
291
292 // PollMgr's thread is being used to
293 // make this upcall from connection object to rpcc.
294 // this funtion must not block.
295 //
296 // this function keeps no reference for connection *c
297 bool
298 rpcc::got_pdu(connection *, const string & b)
299 {
300     unmarshall rep(b, true);
301     reply_header h;
302     rep.unpack_header(h);
303
304     if(!rep.ok()){
305         IF_LEVEL(1) LOG("unmarshall header failed!!!");
306         return true;
307     }
308
309     lock ml(m_);
310
311     update_xid_rep(h.xid);
312
313     if(calls_.find(h.xid) == calls_.end()){
314         IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
315         return true;
316     }
317     caller *ca = calls_[h.xid];
318
319     lock cl(ca->m);
320     if(!ca->done){
321         *ca->rep = b;
322         ca->intret = h.ret;
323         if(ca->intret < 0){
324             IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret);
325         }
326         ca->done = 1;
327     }
328     ca->c.notify_all();
329     return true;
330 }
331
332 // assumes thread holds mutex m
333 void
334 rpcc::update_xid_rep(int xid)
335 {
336     if(xid <= xid_rep_window_.front()){
337         return;
338     }
339
340     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
341         if(*it > xid){
342             xid_rep_window_.insert(it, xid);
343             goto compress;
344         }
345     }
346     xid_rep_window_.push_back(xid);
347
348 compress:
349     auto it = xid_rep_window_.begin();
350     for (it++; it != xid_rep_window_.end(); it++){
351         while (xid_rep_window_.front() + 1 == *it)
352             xid_rep_window_.pop_front();
353     }
354 }
355
356 rpcs::rpcs(in_port_t p1, size_t count)
357   : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
358 {
359     set_rand_seed();
360     nonce_ = (unsigned int)random();
361     IF_LEVEL(2) LOG("created with nonce " << nonce_);
362
363     reg(rpc_const::bind, &rpcs::rpcbind, this);
364     dispatchpool_ = new ThrPool(6, false);
365
366     char *loss_env = getenv("RPC_LOSSY");
367     listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0);
368 }
369
370 rpcs::~rpcs()
371 {
372     // must delete listener before dispatchpool
373     delete listener_;
374     delete dispatchpool_;
375     free_reply_window();
376 }
377
378 bool
379 rpcs::got_pdu(connection *c, const string & b)
380 {
381     if(!reachable_){
382         IF_LEVEL(1) LOG("not reachable");
383         return true;
384     }
385
386     djob_t *j = new djob_t{c, b};
387     c->incref();
388     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
389     if(!succ || !reachable_){
390         c->decref();
391         delete j;
392     }
393     return succ;
394 }
395
396 void
397 rpcs::reg1(proc_t proc, handler *h)
398 {
399     lock pl(procs_m_);
400     VERIFY(procs_.count(proc) == 0);
401     procs_[proc] = h;
402     VERIFY(procs_.count(proc) >= 1);
403 }
404
405 void
406 rpcs::updatestat(proc_t proc)
407 {
408     lock cl(count_m_);
409     counts_[proc]++;
410     curr_counts_--;
411     if(curr_counts_ == 0){
412         LOG("RPC STATS: ");
413         for (auto i = counts_.begin(); i != counts_.end(); i++)
414             LOG(hex << i->first << ":" << dec << i->second);
415
416         lock rwl(reply_window_m_);
417
418         size_t totalrep = 0, maxrep = 0;
419         for (auto clt : reply_window_) {
420             totalrep += clt.second.size();
421             if(clt.second.size() > maxrep)
422                 maxrep = clt.second.size();
423         }
424         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
425                         totalrep << " max per client " << maxrep);
426         curr_counts_ = counting_;
427     }
428 }
429
430 void
431 rpcs::dispatch(djob_t *j)
432 {
433     connection *c = j->conn;
434     unmarshall req(j->buf, true);
435     delete j;
436
437     request_header h;
438     req.unpack_header(h);
439     proc_t proc = h.proc;
440
441     if(!req.ok()){
442         IF_LEVEL(1) LOG("unmarshall header failed!!!");
443         c->decref();
444         return;
445     }
446
447     IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
448                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
449
450     marshall rep;
451     reply_header rh{h.xid,0};
452
453     // is client sending to an old instance of server?
454     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
455         IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
456                         " (current " << nonce_ << ") proc " << hex << h.proc);
457         rh.ret = rpc_const::oldsrv_failure;
458         rep.pack_header(rh);
459         c->send(rep);
460         return;
461     }
462
463     handler *f;
464     // is RPC proc a registered procedure?
465     {
466         lock pl(procs_m_);
467         if(procs_.count(proc) < 1){
468             cerr << "unknown proc " << hex << proc << "." << endl;
469             c->decref();
470             VERIFY(0);
471             return;
472         }
473
474         f = procs_[proc];
475     }
476
477     rpcs::rpcstate_t stat;
478     string b1;
479
480     if(h.clt_nonce){
481         // have i seen this client before?
482         {
483             lock rwl(reply_window_m_);
484             // if we don't know about this clt_nonce, create a cleanup object
485             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
486                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
487                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
488                 IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
489                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
490             }
491         }
492
493         // save the latest good connection to the client
494         {
495             lock rwl(conss_m_);
496             if(conns_.find(h.clt_nonce) == conns_.end()){
497                 c->incref();
498                 conns_[h.clt_nonce] = c;
499             } else if(conns_[h.clt_nonce]->compare(c) < 0){
500                 conns_[h.clt_nonce]->decref();
501                 c->incref();
502                 conns_[h.clt_nonce] = c;
503             }
504         }
505
506         stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
507     } else {
508         // this client does not require at most once logic
509         stat = NEW;
510     }
511
512     switch (stat) {
513         case NEW: // new request
514             if (counting_){
515                 updatestat(proc);
516             }
517
518             rh.ret = (*f)(req, rep);
519             if (rh.ret == rpc_const::unmarshal_args_failure) {
520                 cerr << "failed to unmarshall the arguments. You are " <<
521                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
522                         "types of arguments." << endl;
523                 VERIFY(0);
524             }
525             VERIFY(rh.ret >= 0);
526
527             rep.pack_header(rh);
528             b1 = rep;
529
530             IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
531                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
532
533             if (h.clt_nonce > 0) {
534                 // only record replies for clients that require at-most-once logic
535                 add_reply(h.clt_nonce, h.xid, b1);
536             }
537
538             // get the latest connection to the client
539             {
540                 lock rwl(conss_m_);
541                 if(c->isdead() && c != conns_[h.clt_nonce]){
542                     c->decref();
543                     c = conns_[h.clt_nonce];
544                     c->incref();
545                 }
546             }
547
548             c->send(rep);
549             break;
550         case INPROGRESS: // server is working on this request
551             break;
552         case DONE: // duplicate and we still have the response
553             c->send(b1);
554             break;
555         case FORGOTTEN: // very old request and we don't have the response anymore
556             IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
557             rh.ret = rpc_const::atmostonce_failure;
558             rep.pack_header(rh);
559             c->send(rep);
560             break;
561     }
562     c->decref();
563 }
564
565 // rpcs::dispatch calls this when an RPC request arrives.
566 //
567 // checks to see if an RPC with xid from clt_nonce has already been received.
568 // if not, remembers the request in reply_window_.
569 //
570 // deletes remembered requests with XIDs <= xid_rep; the client
571 // says it has received a reply for every RPC up through xid_rep.
572 // frees the reply_t::buf of each such request.
573 //
574 // returns one of:
575 //   NEW: never seen this xid before.
576 //   INPROGRESS: seen this xid, and still processing it.
577 //   DONE: seen this xid, previous reply returned in b.
578 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
579 rpcs::rpcstate_t
580 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
581         int xid_rep, string & b)
582 {
583     lock rwl(reply_window_m_);
584
585     list<reply_t> &l = reply_window_[clt_nonce];
586
587     VERIFY(l.size() > 0);
588     VERIFY(xid >= xid_rep);
589
590     int past_xid_rep = l.begin()->xid;
591
592     list<reply_t>::iterator start = l.begin(), it = ++start;
593
594     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
595         // scan for deletion candidates
596         while (it != l.end() && it->xid < xid_rep)
597             it++;
598         l.erase(start, it);
599         l.begin()->xid = xid_rep;
600     }
601
602     if (xid < past_xid_rep && past_xid_rep != -1)
603         return FORGOTTEN;
604
605     // skip non-deletion candidates
606     while (it != l.end() && it->xid < xid)
607         it++;
608
609     // if it's in the list it must be right here
610     if (it != l.end() && it->xid == xid) {
611         if (it->cb_present) {
612             // return information about the remembered reply
613             b = it->buf;
614             return DONE;
615         }
616         return INPROGRESS;
617     } else {
618         // remember that a new request has arrived
619         l.insert(it, reply_t(xid));
620         return NEW;
621     }
622 }
623
624 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
625 // and passes the return value in b.
626 // add_reply() should remember b.
627 // free_reply_window() and checkduplicate_and_update are responsible for
628 // cleaning up the remembered values.
629 void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
630     lock rwl(reply_window_m_);
631     // remember the RPC reply value
632     list<reply_t> &l = reply_window_[clt_nonce];
633     list<reply_t>::iterator it = l.begin();
634     // skip to our place in the list
635     for (it++; it != l.end() && it->xid < xid; it++);
636     // there should already be an entry, so whine if there isn't
637     if (it == l.end() || it->xid != xid) {
638         cerr << "Could not find reply struct in add_reply" << endl;
639         l.insert(it, reply_t(xid, b));
640     } else {
641         *it = reply_t(xid, b);
642     }
643 }
644
645 void rpcs::free_reply_window(void) {
646     lock rwl(reply_window_m_);
647     reply_window_.clear();
648 }
649
650 int rpcs::rpcbind(unsigned int &r, int) {
651     IF_LEVEL(2) LOG("called return nonce " << nonce_);
652     r = nonce_;
653     return 0;
654 }
655
656 static sockaddr_in make_sockaddr(const string &host, const string &port);
657
658 static sockaddr_in make_sockaddr(const string &hostandport) {
659     auto colon = hostandport.find(':');
660     if (colon == string::npos)
661         return make_sockaddr("127.0.0.1", hostandport);
662     else
663         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
664 }
665
666 static sockaddr_in make_sockaddr(const string &host, const string &port) {
667     sockaddr_in dst;
668     bzero(&dst, sizeof(dst));
669     dst.sin_family = AF_INET;
670
671     struct in_addr a{inet_addr(host.c_str())};
672
673     if(a.s_addr != INADDR_NONE)
674         dst.sin_addr.s_addr = a.s_addr;
675     else {
676         struct hostent *hp = gethostbyname(host.c_str());
677
678         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
679             cerr << "cannot find host name " << host << endl;
680             exit(1);
681         }
682         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
683         dst.sin_addr.s_addr = a.s_addr;
684     }
685     dst.sin_port = hton((in_port_t)stoi(port));
686     return dst;
687 }