More clean-ups and cool template stuff
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "rpc.h"
58
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
62 #include <netdb.h>
63 #include <unistd.h>
64 #include "lock.h"
65
66 #include "jsl_log.h"
67 #include "threaded_log.h"
68 #include "lang/verify.h"
69
70 using std::stoi;
71
72 const rpcc::TO rpcc::to_max = { 120000 };
73 const rpcc::TO rpcc::to_min = { 1000 };
74
75 rpcc::caller::caller(int xxid, unmarshall *xun)
76 : xid(xxid), un(xun), done(false)
77 {
78 }
79
80 rpcc::caller::~caller()
81 {
82 }
83
84 inline
85 void set_rand_seed()
86 {
87     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
88     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
89 }
90
91 rpcc::rpcc(const string & d, bool retrans) :
92     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
93     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
94 {
95     if(retrans){
96         set_rand_seed();
97         clt_nonce_ = (unsigned int)random();
98     } else {
99         // special client nonce 0 means this client does not
100         // require at-most-once logic from the server
101         // because it uses tcp and never retries a failed connection
102         clt_nonce_ = 0;
103     }
104
105     char *loss_env = getenv("RPC_LOSSY");
106     if(loss_env != NULL){
107         lossytest_ = atoi(loss_env);
108     }
109
110     // xid starts with 1 and latest received reply starts with 0
111     xid_rep_window_.push_back(0);
112
113     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
114             clt_nonce_, lossytest_);
115 }
116
117 // IMPORTANT: destruction should happen only when no external threads
118 // are blocked inside rpcc or will use rpcc in the future
119 rpcc::~rpcc()
120 {
121     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
122             clt_nonce_, chan_?chan_->channo():-1);
123     if(chan_){
124         chan_->closeconn();
125         chan_->decref();
126     }
127     VERIFY(calls_.size() == 0);
128 }
129
130 int
131 rpcc::bind(TO to)
132 {
133     unsigned int r;
134     int ret = call_timeout(rpc_const::bind, to, r, 0);
135     if(ret == 0){
136         lock ml(m_);
137         bind_done_ = true;
138         srv_nonce_ = r;
139     } else {
140         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
141                 inet_ntoa(dst_.sin_addr), ret);
142     }
143     return ret;
144 };
145
146 // Cancel all outstanding calls
147     void
148 rpcc::cancel(void)
149 {
150     lock ml(m_);
151     LOG("rpcc::cancel: force callers to fail");
152     for(auto &p : calls_){
153         caller *ca = p.second;
154
155         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
156         {
157             lock cl(ca->m);
158             ca->done = true;
159             ca->intret = rpc_const::cancel_failure;
160             ca->c.notify_one();
161         }
162     }
163
164     while (calls_.size () > 0){
165         destroy_wait_ = true;
166         destroy_wait_c_.wait(ml);
167     }
168     LOG("rpcc::cancel: done");
169 }
170
171 int
172 rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
173         TO to)
174 {
175
176     caller ca(0, &rep);
177     int xid_rep;
178     {
179         lock ml(m_);
180
181         if((proc != rpc_const::bind && !bind_done_) ||
182                 (proc == rpc_const::bind && bind_done_)){
183             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
184             return rpc_const::bind_failure;
185         }
186
187         if(destroy_wait_){
188           return rpc_const::cancel_failure;
189         }
190
191         ca.xid = xid_++;
192         calls_[ca.xid] = &ca;
193
194         req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
195         xid_rep = xid_rep_window_.front();
196     }
197
198     TO curr_to;
199     std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200         std::chrono::steady_clock::now() +
201         std::chrono::milliseconds(to.to),
202         nextdeadline;
203
204     curr_to.to = to_min.to;
205
206     bool transmit = true;
207     connection *ch = NULL;
208
209     while (1){
210         if(transmit){
211             get_refconn(&ch);
212             if(ch){
213                 if(reachable_) {
214                     request forgot;
215                     {
216                         lock ml(m_);
217                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
218                             forgot = dup_req_;
219                             dup_req_.clear();
220                         }
221                     }
222                     if (forgot.isvalid())
223                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224                     ch->send(req.cstr(), req.size());
225                 }
226                 else jsl_log(JSL_DBG_1, "not reachable\n");
227                 jsl_log(JSL_DBG_2,
228                         "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
229                         clt_nonce_, proc, ca.xid, clt_nonce_);
230             }
231             transmit = false; // only send once on a given channel
232         }
233
234         if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
235             break;
236
237         nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238         if(nextdeadline > finaldeadline) {
239             nextdeadline = finaldeadline;
240             finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
241         }
242
243         {
244             lock cal(ca.m);
245             while (!ca.done){
246                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247                 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
249                     break;
250                 }
251             }
252             if(ca.done){
253                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
254                 break;
255             }
256         }
257
258         if(retrans_ && (!ch || ch->isdead())){
259             // since connection is dead, retransmit
260             // on the new connection
261             transmit = true;
262         }
263         curr_to.to <<= 1;
264     }
265
266     {
267         // no locking of ca.m since only this thread changes ca.xid
268         lock ml(m_);
269         calls_.erase(ca.xid);
270         // may need to update the xid again here, in case the
271         // packet times out before it's even sent by the channel.
272         // I don't think there's any harm in maybe doing it twice
273         update_xid_rep(ca.xid);
274
275         if(destroy_wait_){
276           destroy_wait_c_.notify_one();
277         }
278     }
279
280     if (ca.done && lossytest_)
281     {
282         lock ml(m_);
283         if (!dup_req_.isvalid()) {
284             dup_req_.buf.assign(req.cstr(), req.size());
285             dup_req_.xid = ca.xid;
286         }
287         if (xid_rep > xid_rep_done_)
288             xid_rep_done_ = xid_rep;
289     }
290
291     lock cal(ca.m);
292
293     jsl_log(JSL_DBG_2,
294             "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
295             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296             ntohs(dst_.sin_port), ca.done, ca.intret);
297
298     if(ch)
299         ch->decref();
300
301     // destruction of req automatically frees its buffer
302     return (ca.done? ca.intret : rpc_const::timeout_failure);
303 }
304
305 void
306 rpcc::get_refconn(connection **ch)
307 {
308     lock ml(chan_m_);
309     if(!chan_ || chan_->isdead()){
310         if(chan_)
311             chan_->decref();
312         chan_ = connect_to_dst(dst_, this, lossytest_);
313     }
314     if(ch && chan_){
315         if(*ch){
316             (*ch)->decref();
317         }
318         *ch = chan_;
319         (*ch)->incref();
320     }
321 }
322
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
326 //
327 // this function keeps no reference for connection *c
328 bool
329 rpcc::got_pdu(connection *, char *b, size_t sz)
330 {
331     unmarshall rep(b, sz);
332     reply_header h;
333     rep.unpack_reply_header(&h);
334
335     if(!rep.ok()){
336         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
337         return true;
338     }
339
340     lock ml(m_);
341
342     update_xid_rep(h.xid);
343
344     if(calls_.find(h.xid) == calls_.end()){
345         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
346         return true;
347     }
348     caller *ca = calls_[h.xid];
349
350     lock cl(ca->m);
351     if(!ca->done){
352         ca->un->take_in(rep);
353         ca->intret = h.ret;
354         if(ca->intret < 0){
355             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
356                     h.xid, ca->intret);
357         }
358         ca->done = 1;
359     }
360     ca->c.notify_all();
361     return true;
362 }
363
364 // assumes thread holds mutex m
365 void
366 rpcc::update_xid_rep(int xid)
367 {
368     if(xid <= xid_rep_window_.front()){
369         return;
370     }
371
372     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
373         if(*it > xid){
374             xid_rep_window_.insert(it, xid);
375             goto compress;
376         }
377     }
378     xid_rep_window_.push_back(xid);
379
380 compress:
381     auto it = xid_rep_window_.begin();
382     for (it++; it != xid_rep_window_.end(); it++){
383         while (xid_rep_window_.front() + 1 == *it)
384             xid_rep_window_.pop_front();
385     }
386 }
387
388 rpcs::rpcs(unsigned int p1, size_t count)
389   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
390 {
391     set_rand_seed();
392     nonce_ = (unsigned int)random();
393     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
394
395     char *loss_env = getenv("RPC_LOSSY");
396     if(loss_env != NULL){
397         lossytest_ = atoi(loss_env);
398     }
399
400     reg(rpc_const::bind, &rpcs::rpcbind, this);
401     dispatchpool_ = new ThrPool(6,false);
402
403     listener_ = new tcpsconn(this, port_, lossytest_);
404 }
405
406 rpcs::~rpcs()
407 {
408     // must delete listener before dispatchpool
409     delete listener_;
410     delete dispatchpool_;
411     free_reply_window();
412 }
413
414 bool
415 rpcs::got_pdu(connection *c, char *b, size_t sz)
416 {
417         if(!reachable_){
418             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
419             return true;
420         }
421
422     djob_t *j = new djob_t(c, b, sz);
423     c->incref();
424     bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
425     if(!succ || !reachable_){
426         c->decref();
427         delete j;
428     }
429     return succ;
430 }
431
432 void
433 rpcs::reg1(proc_t proc, handler *h)
434 {
435     lock pl(procs_m_);
436     VERIFY(procs_.count(proc) == 0);
437     procs_[proc] = h;
438     VERIFY(procs_.count(proc) >= 1);
439 }
440
441 void
442 rpcs::updatestat(proc_t proc)
443 {
444     lock cl(count_m_);
445     counts_[proc]++;
446     curr_counts_--;
447     if(curr_counts_ == 0){
448         LOG("RPC STATS: ");
449         for (auto i = counts_.begin(); i != counts_.end(); i++)
450             LOG(std::hex << i->first << ":" << std::dec << i->second);
451
452         lock rwl(reply_window_m_);
453         map<unsigned int,list<reply_t> >::iterator clt;
454
455         size_t totalrep = 0, maxrep = 0;
456         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
457             totalrep += clt->second.size();
458             if(clt->second.size() > maxrep)
459                 maxrep = clt->second.size();
460         }
461         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
462                         (int) reply_window_.size()-1, totalrep, maxrep);
463         curr_counts_ = counting_;
464     }
465 }
466
467 void
468 rpcs::dispatch(djob_t *j)
469 {
470     connection *c = j->conn;
471     unmarshall req(j->buf, j->sz);
472     delete j;
473
474     request_header h;
475     req.unpack_req_header(&h);
476     proc_t proc = h.proc;
477
478     if(!req.ok()){
479         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
480         c->decref();
481         return;
482     }
483
484     jsl_log(JSL_DBG_2,
485             "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
486             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
487
488     marshall rep;
489     reply_header rh(h.xid,0);
490
491     // is client sending to an old instance of server?
492     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
493         jsl_log(JSL_DBG_2,
494                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
495                 h.srv_nonce, nonce_, h.proc);
496         rh.ret = rpc_const::oldsrv_failure;
497         rep.pack_reply_header(rh);
498         c->send(rep.cstr(),rep.size());
499         return;
500     }
501
502     handler *f;
503     // is RPC proc a registered procedure?
504     {
505         lock pl(procs_m_);
506         if(procs_.count(proc) < 1){
507             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
508                 proc);
509             c->decref();
510                         VERIFY(0);
511             return;
512         }
513
514         f = procs_[proc];
515     }
516
517     rpcs::rpcstate_t stat;
518     char *b1 = nullptr;
519     size_t sz1 = 0;
520
521     if(h.clt_nonce){
522         // have i seen this client before?
523         {
524             lock rwl(reply_window_m_);
525             // if we don't know about this clt_nonce, create a cleanup object
526             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
527                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
528                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
529                 jsl_log(JSL_DBG_2,
530                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
531                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
532             }
533         }
534
535         // save the latest good connection to the client
536         {
537             lock rwl(conss_m_);
538             if(conns_.find(h.clt_nonce) == conns_.end()){
539                 c->incref();
540                 conns_[h.clt_nonce] = c;
541             } else if(conns_[h.clt_nonce]->compare(c) < 0){
542                 conns_[h.clt_nonce]->decref();
543                 c->incref();
544                 conns_[h.clt_nonce] = c;
545             }
546         }
547
548         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
549                                                  h.xid_rep, &b1, &sz1);
550     } else {
551         // this client does not require at most once logic
552         stat = NEW;
553     }
554
555     switch (stat){
556         case NEW: // new request
557             if(counting_){
558                 updatestat(proc);
559             }
560
561             rh.ret = (*f)(req, rep);
562             if (rh.ret == rpc_const::unmarshal_args_failure) {
563                 fprintf(stderr, "rpcs::dispatch: failed to"
564                         " unmarshall the arguments. You are"
565                         " probably calling RPC 0x%x with wrong"
566                         " types of arguments.\n", proc);
567                 VERIFY(0);
568             }
569             VERIFY(rh.ret >= 0);
570
571             rep.pack_reply_header(rh);
572             rep.take_buf(&b1,&sz1);
573
574             jsl_log(JSL_DBG_2,
575                     "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
576                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
577
578             if(h.clt_nonce > 0){
579                 // only record replies for clients that require at-most-once logic
580                 add_reply(h.clt_nonce, h.xid, b1, sz1);
581             }
582
583             // get the latest connection to the client
584             {
585                 lock rwl(conss_m_);
586                 if(c->isdead() && c != conns_[h.clt_nonce]){
587                     c->decref();
588                     c = conns_[h.clt_nonce];
589                     c->incref();
590                 }
591             }
592
593             c->send(b1, sz1);
594             if(h.clt_nonce == 0){
595                 // reply is not added to at-most-once window, free it
596                 free(b1);
597             }
598             break;
599         case INPROGRESS: // server is working on this request
600             break;
601         case DONE: // duplicate and we still have the response
602             c->send(b1, sz1);
603             break;
604         case FORGOTTEN: // very old request and we don't have the response anymore
605             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
606                     h.xid, h.clt_nonce);
607             rh.ret = rpc_const::atmostonce_failure;
608             rep.pack_reply_header(rh);
609             c->send(rep.cstr(),rep.size());
610             break;
611     }
612     c->decref();
613 }
614
615 // rpcs::dispatch calls this when an RPC request arrives.
616 //
617 // checks to see if an RPC with xid from clt_nonce has already been received.
618 // if not, remembers the request in reply_window_.
619 //
620 // deletes remembered requests with XIDs <= xid_rep; the client
621 // says it has received a reply for every RPC up through xid_rep.
622 // frees the reply_t::buf of each such request.
623 //
624 // returns one of:
625 //   NEW: never seen this xid before.
626 //   INPROGRESS: seen this xid, and still processing it.
627 //   DONE: seen this xid, previous reply returned in *b and *sz.
628 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
629 rpcs::rpcstate_t
630 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
631         int xid_rep, char **b, size_t *sz)
632 {
633     lock rwl(reply_window_m_);
634
635     list<reply_t> &l = reply_window_[clt_nonce];
636
637     VERIFY(l.size() > 0);
638     VERIFY(xid >= xid_rep);
639
640     int past_xid_rep = l.begin()->xid;
641
642     list<reply_t>::iterator start = l.begin(), it;
643     it = ++start;
644
645     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
646         // scan for deletion candidates
647         for (; it != l.end() && it->xid < xid_rep; it++) {
648             if (it->cb_present)
649                 free(it->buf);
650         }
651         l.erase(start, it);
652         l.begin()->xid = xid_rep;
653     }
654
655     if (xid < past_xid_rep && past_xid_rep != -1)
656         return FORGOTTEN;
657
658     // skip non-deletion candidates
659     while (it != l.end() && it->xid < xid)
660         it++;
661
662     // if it's in the list it must be right here
663     if (it != l.end() && it->xid == xid) {
664         if (it->cb_present) {
665             // return information about the remembered reply
666             *b = it->buf;
667             *sz = it->sz;
668             return DONE;
669         } else {
670             return INPROGRESS;
671         }
672     } else {
673         // remember that a new request has arrived
674         l.insert(it, reply_t(xid));
675         return NEW;
676     }
677 }
678
679 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
680 // and passes the return value in b and sz.
681 // add_reply() should remember b and sz.
682 // free_reply_window() and checkduplicate_and_update is responsible for
683 // calling free(b).
684 void
685 rpcs::add_reply(unsigned int clt_nonce, int xid,
686         char *b, size_t sz)
687 {
688     lock rwl(reply_window_m_);
689     // remember the RPC reply value
690     list<reply_t> &l = reply_window_[clt_nonce];
691     list<reply_t>::iterator it = l.begin();
692     // skip to our place in the list
693     for (it++; it != l.end() && it->xid < xid; it++);
694     // there should already be an entry, so whine if there isn't
695     if (it == l.end() || it->xid != xid) {
696         fprintf(stderr, "Could not find reply struct in add_reply");
697         l.insert(it, reply_t(xid, b, sz));
698     } else {
699         *it = reply_t(xid, b, sz);
700     }
701 }
702
703 void
704 rpcs::free_reply_window(void)
705 {
706     lock rwl(reply_window_m_);
707     for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
708         for (auto it = clt->second.begin(); it != clt->second.end(); it++){
709             if (it->cb_present)
710                 free(it->buf);
711         }
712         clt->second.clear();
713     }
714     reply_window_.clear();
715 }
716
717 // rpc handler
718 int
719 rpcs::rpcbind(unsigned int &r, int)
720 {
721     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
722     r = nonce_;
723     return 0;
724 }
725
726 marshall &
727 operator<<(marshall &m, uint8_t x) {
728     m.rawbyte(x);
729     return m;
730 }
731
732 marshall &
733 operator<<(marshall &m, uint16_t x) {
734     x = hton(x);
735     m.rawbytes((char *)&x, 2);
736     return m;
737 }
738
739 marshall &
740 operator<<(marshall &m, uint32_t x) {
741     x = hton(x);
742     m.rawbytes((char *)&x, 4);
743     return m;
744 }
745
746 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
747 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
748 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
749 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
750 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
751
752 marshall &
753 operator<<(marshall &m, const string &s) {
754     m << (unsigned int) s.size();
755     m.rawbytes(s.data(), s.size());
756     return m;
757 }
758
759 void marshall::pack_req_header(const request_header &h) {
760     size_t saved_sz = index_;
761     //leave the first 4-byte empty for channel to fill size of pdu
762     index_ = sizeof(rpc_sz_t);
763     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
764     index_ = saved_sz;
765 }
766
767 void marshall::pack_reply_header(const reply_header &h) {
768     size_t saved_sz = index_;
769     //leave the first 4-byte empty for channel to fill size of pdu
770     index_ = sizeof(rpc_sz_t);
771     *this << h.xid << h.ret;
772     index_ = saved_sz;
773 }
774
775 // take the contents from another unmarshall object
776 void
777 unmarshall::take_in(unmarshall &another)
778 {
779     if(buf_)
780         free(buf_);
781     another.take_buf(&buf_, &sz_);
782     index_ = RPC_HEADER_SZ;
783     ok_ = sz_ >= RPC_HEADER_SZ?true:false;
784 }
785
786 inline bool
787 unmarshall::ensure(size_t n) {
788     if (index_+n > sz_)
789         ok_ = false;
790     return ok_;
791 }
792
793 inline uint8_t
794 unmarshall::rawbyte()
795 {
796     if (!ensure(1))
797         return 0;
798     return (uint8_t)buf_[index_++];
799 }
800
801 void
802 unmarshall::rawbytes(string &ss, size_t n)
803 {
804     VERIFY(ensure(n));
805     ss.assign(buf_+index_, n);
806     index_ += n;
807 }
808
809 template <class T>
810 void
811 unmarshall::rawbytes(T &t)
812 {
813     const size_t n = sizeof(T);
814     VERIFY(ensure(n));
815     memcpy(&t, buf_+index_, n);
816     t = ntoh(t);
817     index_ += n;
818 }
819
820 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
821 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
822 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
823 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
824 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
825 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
826 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
827 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
828 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
829 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
830 unmarshall & operator>>(unmarshall &u, string &s) {
831     unsigned sz = u.grab<unsigned>();
832     if(u.ok())
833         u.rawbytes(s, sz);
834     return u;
835 }
836
837 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
838     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
839             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
840              ((a.sin_port < b.sin_port))));
841 }
842
843 /*---------------auxilary function--------------*/
844 sockaddr_in make_sockaddr(const string &hostandport) {
845     auto colon = hostandport.find(':');
846     if (colon == string::npos)
847         return make_sockaddr("127.0.0.1", hostandport);
848     else
849         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
850 }
851
852 sockaddr_in make_sockaddr(const string &host, const string &port) {
853     sockaddr_in dst;
854     bzero(&dst, sizeof(dst));
855     dst.sin_family = AF_INET;
856
857     struct in_addr a{inet_addr(host.c_str())};
858
859     if(a.s_addr != INADDR_NONE)
860         dst.sin_addr.s_addr = a.s_addr;
861     else {
862         struct hostent *hp = gethostbyname(host.c_str());
863
864         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
865             fprintf(stderr, "cannot find host name %s\n", host.c_str());
866             exit(1);
867         }
868         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
869         dst.sin_addr.s_addr = a.s_addr;
870     }
871     dst.sin_port = hton((uint16_t)stoi(port));
872     return dst;
873 }