5e43547fc5901c1c91a3830f6bd1d98912c77d86
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "rpc.h"
58
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
62 #include <netdb.h>
63 #include <unistd.h>
64 #include "lock.h"
65
66 #include "jsl_log.h"
67 #include "tprintf.h"
68 #include "lang/verify.h"
69
70 const rpcc::TO rpcc::to_max = { 120000 };
71 const rpcc::TO rpcc::to_min = { 1000 };
72
73 rpcc::caller::caller(int xxid, unmarshall *xun)
74 : xid(xxid), un(xun), done(false)
75 {
76 }
77
78 rpcc::caller::~caller()
79 {
80 }
81
82 inline
83 void set_rand_seed()
84 {
85     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
86     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
87 }
88
89 rpcc::rpcc(sockaddr_in d, bool retrans) :
90     dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
91     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
92 {
93     if(retrans){
94         set_rand_seed();
95         clt_nonce_ = (unsigned int)random();
96     } else {
97         // special client nonce 0 means this client does not
98         // require at-most-once logic from the server
99         // because it uses tcp and never retries a failed connection
100         clt_nonce_ = 0;
101     }
102
103     char *loss_env = getenv("RPC_LOSSY");
104     if(loss_env != NULL){
105         lossytest_ = atoi(loss_env);
106     }
107
108     // xid starts with 1 and latest received reply starts with 0
109     xid_rep_window_.push_back(0);
110
111     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
112             clt_nonce_, lossytest_);
113 }
114
115 // IMPORTANT: destruction should happen only when no external threads
116 // are blocked inside rpcc or will use rpcc in the future
117 rpcc::~rpcc()
118 {
119     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
120             clt_nonce_, chan_?chan_->channo():-1);
121     if(chan_){
122         chan_->closeconn();
123         chan_->decref();
124     }
125     VERIFY(calls_.size() == 0);
126 }
127
128 int
129 rpcc::bind(TO to)
130 {
131     unsigned int r;
132     int ret = call_timeout(rpc_const::bind, to, r, 0);
133     if(ret == 0){
134         lock ml(m_);
135         bind_done_ = true;
136         srv_nonce_ = r;
137     } else {
138         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
139                 inet_ntoa(dst_.sin_addr), ret);
140     }
141     return ret;
142 };
143
144 // Cancel all outstanding calls
145     void
146 rpcc::cancel(void)
147 {
148     lock ml(m_);
149     tprintf("rpcc::cancel: force callers to fail");
150     for(auto &p : calls_){
151         caller *ca = p.second;
152
153         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
154         {
155             lock cl(ca->m);
156             ca->done = true;
157             ca->intret = rpc_const::cancel_failure;
158             ca->c.notify_one();
159         }
160     }
161
162     while (calls_.size () > 0){
163         destroy_wait_ = true;
164         destroy_wait_c_.wait(ml);
165     }
166     tprintf("rpcc::cancel: done");
167 }
168
169 int
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
171         TO to)
172 {
173
174     caller ca(0, &rep);
175     int xid_rep;
176     {
177         lock ml(m_);
178
179         if((proc != rpc_const::bind && !bind_done_) ||
180                 (proc == rpc_const::bind && bind_done_)){
181             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182             return rpc_const::bind_failure;
183         }
184
185         if(destroy_wait_){
186           return rpc_const::cancel_failure;
187         }
188
189         ca.xid = xid_++;
190         calls_[ca.xid] = &ca;
191
192         req.pack_req_header({ca.xid, (int)proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
193         xid_rep = xid_rep_window_.front();
194     }
195
196     TO curr_to;
197     std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
198         std::chrono::steady_clock::now() +
199         std::chrono::milliseconds(to.to),
200         nextdeadline;
201
202     curr_to.to = to_min.to;
203
204     bool transmit = true;
205     connection *ch = NULL;
206
207     while (1){
208         if(transmit){
209             get_refconn(&ch);
210             if(ch){
211                 if(reachable_) {
212                     request forgot;
213                     {
214                         lock ml(m_);
215                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
216                             forgot = dup_req_;
217                             dup_req_.clear();
218                         }
219                     }
220                     if (forgot.isvalid())
221                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
222                     ch->send(req.cstr(), req.size());
223                 }
224                 else jsl_log(JSL_DBG_1, "not reachable\n");
225                 jsl_log(JSL_DBG_2,
226                         "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
227                         clt_nonce_, proc, ca.xid, clt_nonce_);
228             }
229             transmit = false; // only send once on a given channel
230         }
231
232         if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
233             break;
234
235         nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
236         if(nextdeadline > finaldeadline) {
237             nextdeadline = finaldeadline;
238             finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
239         }
240
241         {
242             lock cal(ca.m);
243             while (!ca.done){
244                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
245                 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
246                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
247                     break;
248                 }
249             }
250             if(ca.done){
251                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
252                 break;
253             }
254         }
255
256         if(retrans_ && (!ch || ch->isdead())){
257             // since connection is dead, retransmit
258             // on the new connection
259             transmit = true;
260         }
261         curr_to.to <<= 1;
262     }
263
264     {
265         // no locking of ca.m since only this thread changes ca.xid
266         lock ml(m_);
267         calls_.erase(ca.xid);
268         // may need to update the xid again here, in case the
269         // packet times out before it's even sent by the channel.
270         // I don't think there's any harm in maybe doing it twice
271         update_xid_rep(ca.xid);
272
273         if(destroy_wait_){
274           destroy_wait_c_.notify_one();
275         }
276     }
277
278     if (ca.done && lossytest_)
279     {
280         lock ml(m_);
281         if (!dup_req_.isvalid()) {
282             dup_req_.buf.assign(req.cstr(), req.size());
283             dup_req_.xid = ca.xid;
284         }
285         if (xid_rep > xid_rep_done_)
286             xid_rep_done_ = xid_rep;
287     }
288
289     lock cal(ca.m);
290
291     jsl_log(JSL_DBG_2,
292             "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
293             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
294             ntohs(dst_.sin_port), ca.done, ca.intret);
295
296     if(ch)
297         ch->decref();
298
299     // destruction of req automatically frees its buffer
300     return (ca.done? ca.intret : rpc_const::timeout_failure);
301 }
302
303 void
304 rpcc::get_refconn(connection **ch)
305 {
306     lock ml(chan_m_);
307     if(!chan_ || chan_->isdead()){
308         if(chan_)
309             chan_->decref();
310         chan_ = connect_to_dst(dst_, this, lossytest_);
311     }
312     if(ch && chan_){
313         if(*ch){
314             (*ch)->decref();
315         }
316         *ch = chan_;
317         (*ch)->incref();
318     }
319 }
320
321 // PollMgr's thread is being used to
322 // make this upcall from connection object to rpcc.
323 // this funtion must not block.
324 //
325 // this function keeps no reference for connection *c
326 bool
327 rpcc::got_pdu(connection *, char *b, size_t sz)
328 {
329     unmarshall rep(b, sz);
330     reply_header h;
331     rep.unpack_reply_header(&h);
332
333     if(!rep.ok()){
334         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
335         return true;
336     }
337
338     lock ml(m_);
339
340     update_xid_rep(h.xid);
341
342     if(calls_.find(h.xid) == calls_.end()){
343         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
344         return true;
345     }
346     caller *ca = calls_[h.xid];
347
348     lock cl(ca->m);
349     if(!ca->done){
350         ca->un->take_in(rep);
351         ca->intret = h.ret;
352         if(ca->intret < 0){
353             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
354                     h.xid, ca->intret);
355         }
356         ca->done = 1;
357     }
358     ca->c.notify_all();
359     return true;
360 }
361
362 // assumes thread holds mutex m
363 void
364 rpcc::update_xid_rep(int xid)
365 {
366     if(xid <= xid_rep_window_.front()){
367         return;
368     }
369
370     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
371         if(*it > xid){
372             xid_rep_window_.insert(it, xid);
373             goto compress;
374         }
375     }
376     xid_rep_window_.push_back(xid);
377
378 compress:
379     auto it = xid_rep_window_.begin();
380     for (it++; it != xid_rep_window_.end(); it++){
381         while (xid_rep_window_.front() + 1 == *it)
382             xid_rep_window_.pop_front();
383     }
384 }
385
386 rpcs::rpcs(unsigned int p1, size_t count)
387   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
388 {
389     set_rand_seed();
390     nonce_ = (unsigned int)random();
391     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
392
393     char *loss_env = getenv("RPC_LOSSY");
394     if(loss_env != NULL){
395         lossytest_ = atoi(loss_env);
396     }
397
398     reg(rpc_const::bind, &rpcs::rpcbind, this);
399     dispatchpool_ = new ThrPool(6,false);
400
401     listener_ = new tcpsconn(this, port_, lossytest_);
402 }
403
404 rpcs::~rpcs()
405 {
406     // must delete listener before dispatchpool
407     delete listener_;
408     delete dispatchpool_;
409     free_reply_window();
410 }
411
412 bool
413 rpcs::got_pdu(connection *c, char *b, size_t sz)
414 {
415         if(!reachable_){
416             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
417             return true;
418         }
419
420     djob_t *j = new djob_t(c, b, sz);
421     c->incref();
422     bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
423     if(!succ || !reachable_){
424         c->decref();
425         delete j;
426     }
427     return succ;
428 }
429
430 void
431 rpcs::reg1(unsigned int proc, handler *h)
432 {
433     lock pl(procs_m_);
434     VERIFY(procs_.count(proc) == 0);
435     procs_[proc] = h;
436     VERIFY(procs_.count(proc) >= 1);
437 }
438
439 void
440 rpcs::updatestat(unsigned int proc)
441 {
442     lock cl(count_m_);
443     counts_[proc]++;
444     curr_counts_--;
445     if(curr_counts_ == 0){
446         tprintf("RPC STATS: ");
447         for (auto i = counts_.begin(); i != counts_.end(); i++)
448             tprintf("%x:%lu ", i->first, i->second);
449
450         lock rwl(reply_window_m_);
451         std::map<unsigned int,std::list<reply_t> >::iterator clt;
452
453         size_t totalrep = 0, maxrep = 0;
454         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
455             totalrep += clt->second.size();
456             if(clt->second.size() > maxrep)
457                 maxrep = clt->second.size();
458         }
459         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
460                         (int) reply_window_.size()-1, totalrep, maxrep);
461         curr_counts_ = counting_;
462     }
463 }
464
465 void
466 rpcs::dispatch(djob_t *j)
467 {
468     connection *c = j->conn;
469     unmarshall req(j->buf, j->sz);
470     delete j;
471
472     request_header h;
473     req.unpack_req_header(&h);
474     unsigned int proc = (unsigned int)h.proc;
475
476     if(!req.ok()){
477         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
478         c->decref();
479         return;
480     }
481
482     jsl_log(JSL_DBG_2,
483             "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
484             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
485
486     marshall rep;
487     reply_header rh(h.xid,0);
488
489     // is client sending to an old instance of server?
490     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
491         jsl_log(JSL_DBG_2,
492                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
493                 h.srv_nonce, nonce_, h.proc);
494         rh.ret = rpc_const::oldsrv_failure;
495         rep.pack_reply_header(rh);
496         c->send(rep.cstr(),rep.size());
497         return;
498     }
499
500     handler *f;
501     // is RPC proc a registered procedure?
502     {
503         lock pl(procs_m_);
504         if(procs_.count(proc) < 1){
505             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
506                 proc);
507             c->decref();
508                         VERIFY(0);
509             return;
510         }
511
512         f = procs_[proc];
513     }
514
515     rpcs::rpcstate_t stat;
516     char *b1 = nullptr;
517     size_t sz1 = 0;
518
519     if(h.clt_nonce){
520         // have i seen this client before?
521         {
522             lock rwl(reply_window_m_);
523             // if we don't know about this clt_nonce, create a cleanup object
524             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
525                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
526                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
527                 jsl_log(JSL_DBG_2,
528                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
529                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
530             }
531         }
532
533         // save the latest good connection to the client
534         {
535             lock rwl(conss_m_);
536             if(conns_.find(h.clt_nonce) == conns_.end()){
537                 c->incref();
538                 conns_[h.clt_nonce] = c;
539             } else if(conns_[h.clt_nonce]->compare(c) < 0){
540                 conns_[h.clt_nonce]->decref();
541                 c->incref();
542                 conns_[h.clt_nonce] = c;
543             }
544         }
545
546         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
547                                                  h.xid_rep, &b1, &sz1);
548     } else {
549         // this client does not require at most once logic
550         stat = NEW;
551     }
552
553     switch (stat){
554         case NEW: // new request
555             if(counting_){
556                 updatestat(proc);
557             }
558
559             rh.ret = (*f)(req, rep);
560             if (rh.ret == rpc_const::unmarshal_args_failure) {
561                 fprintf(stderr, "rpcs::dispatch: failed to"
562                         " unmarshall the arguments. You are"
563                         " probably calling RPC 0x%x with wrong"
564                         " types of arguments.\n", proc);
565                 VERIFY(0);
566             }
567             VERIFY(rh.ret >= 0);
568
569             rep.pack_reply_header(rh);
570             rep.take_buf(&b1,&sz1);
571
572             jsl_log(JSL_DBG_2,
573                     "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
574                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
575
576             if(h.clt_nonce > 0){
577                 // only record replies for clients that require at-most-once logic
578                 add_reply(h.clt_nonce, h.xid, b1, sz1);
579             }
580
581             // get the latest connection to the client
582             {
583                 lock rwl(conss_m_);
584                 if(c->isdead() && c != conns_[h.clt_nonce]){
585                     c->decref();
586                     c = conns_[h.clt_nonce];
587                     c->incref();
588                 }
589             }
590
591             c->send(b1, sz1);
592             if(h.clt_nonce == 0){
593                 // reply is not added to at-most-once window, free it
594                 free(b1);
595             }
596             break;
597         case INPROGRESS: // server is working on this request
598             break;
599         case DONE: // duplicate and we still have the response
600             c->send(b1, sz1);
601             break;
602         case FORGOTTEN: // very old request and we don't have the response anymore
603             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
604                     h.xid, h.clt_nonce);
605             rh.ret = rpc_const::atmostonce_failure;
606             rep.pack_reply_header(rh);
607             c->send(rep.cstr(),rep.size());
608             break;
609     }
610     c->decref();
611 }
612
613 // rpcs::dispatch calls this when an RPC request arrives.
614 //
615 // checks to see if an RPC with xid from clt_nonce has already been received.
616 // if not, remembers the request in reply_window_.
617 //
618 // deletes remembered requests with XIDs <= xid_rep; the client
619 // says it has received a reply for every RPC up through xid_rep.
620 // frees the reply_t::buf of each such request.
621 //
622 // returns one of:
623 //   NEW: never seen this xid before.
624 //   INPROGRESS: seen this xid, and still processing it.
625 //   DONE: seen this xid, previous reply returned in *b and *sz.
626 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
627 rpcs::rpcstate_t
628 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
629         int xid_rep, char **b, size_t *sz)
630 {
631     lock rwl(reply_window_m_);
632
633     std::list<reply_t> &l = reply_window_[clt_nonce];
634
635     VERIFY(l.size() > 0);
636     VERIFY(xid >= xid_rep);
637
638     int past_xid_rep = l.begin()->xid;
639
640     std::list<reply_t>::iterator start = l.begin(), it;
641     it = ++start;
642
643     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
644         // scan for deletion candidates
645         for (; it != l.end() && it->xid < xid_rep; it++) {
646             if (it->cb_present)
647                 free(it->buf);
648         }
649         l.erase(start, it);
650         l.begin()->xid = xid_rep;
651     }
652
653     if (xid < past_xid_rep && past_xid_rep != -1)
654         return FORGOTTEN;
655
656     // skip non-deletion candidates
657     while (it != l.end() && it->xid < xid)
658         it++;
659
660     // if it's in the list it must be right here
661     if (it != l.end() && it->xid == xid) {
662         if (it->cb_present) {
663             // return information about the remembered reply
664             *b = it->buf;
665             *sz = it->sz;
666             return DONE;
667         } else {
668             return INPROGRESS;
669         }
670     } else {
671         // remember that a new request has arrived
672         l.insert(it, reply_t(xid));
673         return NEW;
674     }
675 }
676
677 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
678 // and passes the return value in b and sz.
679 // add_reply() should remember b and sz.
680 // free_reply_window() and checkduplicate_and_update is responsible for
681 // calling free(b).
682 void
683 rpcs::add_reply(unsigned int clt_nonce, int xid,
684         char *b, size_t sz)
685 {
686     lock rwl(reply_window_m_);
687     // remember the RPC reply value
688     std::list<reply_t> &l = reply_window_[clt_nonce];
689     std::list<reply_t>::iterator it = l.begin();
690     // skip to our place in the list
691     for (it++; it != l.end() && it->xid < xid; it++);
692     // there should already be an entry, so whine if there isn't
693     if (it == l.end() || it->xid != xid) {
694         fprintf(stderr, "Could not find reply struct in add_reply");
695         l.insert(it, reply_t(xid, b, sz));
696     } else {
697         *it = reply_t(xid, b, sz);
698     }
699 }
700
701 void
702 rpcs::free_reply_window(void)
703 {
704     lock rwl(reply_window_m_);
705     for (auto clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
706         for (auto it = clt->second.begin(); it != clt->second.end(); it++){
707             if (it->cb_present)
708                 free(it->buf);
709         }
710         clt->second.clear();
711     }
712     reply_window_.clear();
713 }
714
715 // rpc handler
716 int
717 rpcs::rpcbind(unsigned int &r, int)
718 {
719     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
720     r = nonce_;
721     return 0;
722 }
723
724 marshall &
725 operator<<(marshall &m, uint8_t x) {
726     m.rawbyte(x);
727     return m;
728 }
729
730 marshall &
731 operator<<(marshall &m, uint16_t x) {
732     x = hton(x);
733     m.rawbytes((char *)&x, 2);
734     return m;
735 }
736
737 marshall &
738 operator<<(marshall &m, uint32_t x) {
739     x = hton(x);
740     m.rawbytes((char *)&x, 4);
741     return m;
742 }
743
744 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
745 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
746 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
747 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
748 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
749
750 marshall &
751 operator<<(marshall &m, const std::string &s) {
752     m << (unsigned int) s.size();
753     m.rawbytes(s.data(), s.size());
754     return m;
755 }
756
757 void marshall::pack_req_header(const request_header &h) {
758     size_t saved_sz = index_;
759     //leave the first 4-byte empty for channel to fill size of pdu
760     index_ = sizeof(rpc_sz_t);
761     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
762     index_ = saved_sz;
763 }
764
765 void marshall::pack_reply_header(const reply_header &h) {
766     size_t saved_sz = index_;
767     //leave the first 4-byte empty for channel to fill size of pdu
768     index_ = sizeof(rpc_sz_t);
769     *this << h.xid << h.ret;
770     index_ = saved_sz;
771 }
772
773 // take the contents from another unmarshall object
774 void
775 unmarshall::take_in(unmarshall &another)
776 {
777     if(buf_)
778         free(buf_);
779     another.take_buf(&buf_, &sz_);
780     index_ = RPC_HEADER_SZ;
781     ok_ = sz_ >= RPC_HEADER_SZ?true:false;
782 }
783
784 inline bool
785 unmarshall::ensure(size_t n) {
786     if (index_+n > sz_)
787         ok_ = false;
788     return ok_;
789 }
790
791 inline uint8_t
792 unmarshall::rawbyte()
793 {
794     if (!ensure(1))
795         return 0;
796     return (uint8_t)buf_[index_++];
797 }
798
799 void
800 unmarshall::rawbytes(std::string &ss, size_t n)
801 {
802     VERIFY(ensure(n));
803     ss.assign(buf_+index_, n);
804     index_ += n;
805 }
806
807 template <class T>
808 void
809 unmarshall::rawbytes(T &t)
810 {
811     const size_t n = sizeof(T);
812     VERIFY(ensure(n));
813     memcpy(&t, buf_+index_, n);
814     t = ntoh(t);
815     index_ += n;
816 }
817
818 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
819 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
820 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
821 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
822 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
823 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
824 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
825 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
826 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
827 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
828 unmarshall & operator>>(unmarshall &u, std::string &s) {
829     unsigned sz = u.grab<unsigned>();
830     if(u.ok())
831         u.rawbytes(s, sz);
832     return u;
833 }
834
835 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
836     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
837             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
838              ((a.sin_port < b.sin_port))));
839 }
840
841 /*---------------auxilary function--------------*/
842 void
843 make_sockaddr(const std::string &hostandport, struct sockaddr_in *dst) {
844     auto colon = hostandport.find(':');
845     if (colon == std::string::npos)
846         make_sockaddr("127.0.0.1", hostandport, dst);
847     else
848         make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1), dst);
849 }
850
851 void
852 make_sockaddr(const std::string &host, const std::string &port, struct sockaddr_in *dst) {
853     bzero(dst, sizeof(*dst));
854     dst->sin_family = AF_INET;
855
856     struct in_addr a{inet_addr(host.c_str())};
857
858     if(a.s_addr != INADDR_NONE)
859         dst->sin_addr.s_addr = a.s_addr;
860     else {
861         struct hostent *hp = gethostbyname(host.c_str());
862
863         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
864             fprintf(stderr, "cannot find host name %s\n", host.c_str());
865             exit(1);
866         }
867         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
868         dst->sin_addr.s_addr = a.s_addr;
869     }
870     dst->sin_port = hton((uint16_t)std::stoi(port));
871 }