MOAR TEMPLATE MAGIC
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "types.h"
58 #include "rpc.h"
59
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
63 #include <netdb.h>
64 #include <unistd.h>
65
66 const rpcc::TO rpcc::to_max = { 120000 };
67 const rpcc::TO rpcc::to_min = { 1000 };
68
69 inline void set_rand_seed() {
70     auto now = time_point_cast<nanoseconds>(steady_clock::now());
71     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
72 }
73
74 rpcc::rpcc(const string & d, bool retrans) :
75     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
76     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
77 {
78     if(retrans){
79         set_rand_seed();
80         clt_nonce_ = (unsigned int)random();
81     } else {
82         // special client nonce 0 means this client does not
83         // require at-most-once logic from the server
84         // because it uses tcp and never retries a failed connection
85         clt_nonce_ = 0;
86     }
87
88     char *loss_env = getenv("RPC_LOSSY");
89     if(loss_env != NULL){
90         lossytest_ = atoi(loss_env);
91     }
92
93     // xid starts with 1 and latest received reply starts with 0
94     xid_rep_window_.push_back(0);
95
96     IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
97 }
98
99 // IMPORTANT: destruction should happen only when no external threads
100 // are blocked inside rpcc or will use rpcc in the future
101 rpcc::~rpcc() {
102     IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
103     if(chan_){
104         chan_->closeconn();
105         chan_->decref();
106     }
107     VERIFY(calls_.size() == 0);
108 }
109
110 int rpcc::bind(TO to) {
111     unsigned int r;
112     int ret = call_timeout(rpc_const::bind, to, r, 0);
113     if(ret == 0){
114         lock ml(m_);
115         bind_done_ = true;
116         srv_nonce_ = r;
117     } else {
118         IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret);
119     }
120     return ret;
121 };
122
123 // Cancel all outstanding calls
124 void rpcc::cancel(void) {
125     lock ml(m_);
126     LOG("rpcc::cancel: force callers to fail");
127     for(auto &p : calls_){
128         caller *ca = p.second;
129
130         IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail");
131         {
132             lock cl(ca->m);
133             ca->done = true;
134             ca->intret = rpc_const::cancel_failure;
135             ca->c.notify_one();
136         }
137     }
138
139     while (calls_.size () > 0){
140         destroy_wait_ = true;
141         destroy_wait_c_.wait(ml);
142     }
143     LOG("rpcc::cancel: done");
144 }
145
146 int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) {
147
148     caller ca(0, &rep);
149     int xid_rep;
150     {
151         lock ml(m_);
152
153         if((proc != rpc_const::bind && !bind_done_) ||
154                 (proc == rpc_const::bind && bind_done_)){
155             IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice");
156             return rpc_const::bind_failure;
157         }
158
159         if(destroy_wait_){
160           return rpc_const::cancel_failure;
161         }
162
163         ca.xid = xid_++;
164         calls_[ca.xid] = &ca;
165
166         req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
167         xid_rep = xid_rep_window_.front();
168     }
169
170     TO curr_to;
171     auto finaldeadline = steady_clock::now() + milliseconds(to.to),
172         nextdeadline = finaldeadline;
173
174     curr_to.to = to_min.to;
175
176     bool transmit = true;
177     connection *ch = NULL;
178
179     while (1){
180         if(transmit){
181             get_refconn(&ch);
182             if(ch){
183                 if(reachable_) {
184                     request forgot;
185                     {
186                         lock ml(m_);
187                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
188                             forgot = dup_req_;
189                             dup_req_.clear();
190                         }
191                     }
192                     if (forgot.isvalid())
193                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
194                     ch->send(req.cstr(), req.size());
195                 }
196                 else IF_LEVEL(1) LOG("not reachable");
197                 IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc <<
198                                 " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_);
199             }
200             transmit = false; // only send once on a given channel
201         }
202
203         if(finaldeadline == time_point<steady_clock>::min())
204             break;
205
206         nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
207         if(nextdeadline > finaldeadline) {
208             nextdeadline = finaldeadline;
209             finaldeadline = time_point<steady_clock>::min();
210         }
211
212         {
213             lock cal(ca.m);
214             while (!ca.done){
215                 IF_LEVEL(2) LOG("rpcc:call1: wait");
216                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
217                     IF_LEVEL(2) LOG("rpcc::call1: timeout");
218                     break;
219                 }
220             }
221             if(ca.done){
222                 IF_LEVEL(2) LOG("rpcc::call1: reply received");
223                 break;
224             }
225         }
226
227         if(retrans_ && (!ch || ch->isdead())){
228             // since connection is dead, retransmit
229             // on the new connection
230             transmit = true;
231         }
232         curr_to.to <<= 1;
233     }
234
235     {
236         // no locking of ca.m since only this thread changes ca.xid
237         lock ml(m_);
238         calls_.erase(ca.xid);
239         // may need to update the xid again here, in case the
240         // packet times out before it's even sent by the channel.
241         // I don't think there's any harm in maybe doing it twice
242         update_xid_rep(ca.xid);
243
244         if(destroy_wait_){
245           destroy_wait_c_.notify_one();
246         }
247     }
248
249     if (ca.done && lossytest_)
250     {
251         lock ml(m_);
252         if (!dup_req_.isvalid()) {
253             dup_req_.buf.assign(req.cstr(), req.size());
254             dup_req_.xid = ca.xid;
255         }
256         if (xid_rep > xid_rep_done_)
257             xid_rep_done_ = xid_rep;
258     }
259
260     lock cal(ca.m);
261
262     IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc <<
263                     " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" <<
264                     ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
265
266     if(ch)
267         ch->decref();
268
269     // destruction of req automatically frees its buffer
270     return (ca.done? ca.intret : rpc_const::timeout_failure);
271 }
272
273 void
274 rpcc::get_refconn(connection **ch)
275 {
276     lock ml(chan_m_);
277     if(!chan_ || chan_->isdead()){
278         if(chan_)
279             chan_->decref();
280         chan_ = connect_to_dst(dst_, this, lossytest_);
281     }
282     if(ch && chan_){
283         if(*ch){
284             (*ch)->decref();
285         }
286         *ch = chan_;
287         (*ch)->incref();
288     }
289 }
290
291 // PollMgr's thread is being used to
292 // make this upcall from connection object to rpcc.
293 // this funtion must not block.
294 //
295 // this function keeps no reference for connection *c
296 bool
297 rpcc::got_pdu(connection *, char *b, size_t sz)
298 {
299     unmarshall rep(b, sz);
300     reply_header h;
301     rep.unpack_reply_header(&h);
302
303     if(!rep.ok()){
304         IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!");
305         return true;
306     }
307
308     lock ml(m_);
309
310     update_xid_rep(h.xid);
311
312     if(calls_.find(h.xid) == calls_.end()){
313         IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request");
314         return true;
315     }
316     caller *ca = calls_[h.xid];
317
318     lock cl(ca->m);
319     if(!ca->done){
320         ca->un->take_in(rep);
321         ca->intret = h.ret;
322         if(ca->intret < 0){
323             IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret);
324         }
325         ca->done = 1;
326     }
327     ca->c.notify_all();
328     return true;
329 }
330
331 // assumes thread holds mutex m
332 void
333 rpcc::update_xid_rep(int xid)
334 {
335     if(xid <= xid_rep_window_.front()){
336         return;
337     }
338
339     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
340         if(*it > xid){
341             xid_rep_window_.insert(it, xid);
342             goto compress;
343         }
344     }
345     xid_rep_window_.push_back(xid);
346
347 compress:
348     auto it = xid_rep_window_.begin();
349     for (it++; it != xid_rep_window_.end(); it++){
350         while (xid_rep_window_.front() + 1 == *it)
351             xid_rep_window_.pop_front();
352     }
353 }
354
355 rpcs::rpcs(unsigned int p1, size_t count)
356   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
357 {
358     set_rand_seed();
359     nonce_ = (unsigned int)random();
360     IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_);
361
362     char *loss_env = getenv("RPC_LOSSY");
363     if(loss_env != NULL){
364         lossytest_ = atoi(loss_env);
365     }
366
367     reg(rpc_const::bind, &rpcs::rpcbind, this);
368     dispatchpool_ = new ThrPool(6,false);
369
370     listener_ = new tcpsconn(this, port_, lossytest_);
371 }
372
373 rpcs::~rpcs()
374 {
375     // must delete listener before dispatchpool
376     delete listener_;
377     delete dispatchpool_;
378     free_reply_window();
379 }
380
381 bool
382 rpcs::got_pdu(connection *c, char *b, size_t sz)
383 {
384         if(!reachable_){
385             IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable");
386             return true;
387         }
388
389     djob_t *j = new djob_t(c, b, sz);
390     c->incref();
391     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
392     if(!succ || !reachable_){
393         c->decref();
394         delete j;
395     }
396     return succ;
397 }
398
399 void
400 rpcs::reg1(proc_t proc, handler *h)
401 {
402     lock pl(procs_m_);
403     VERIFY(procs_.count(proc) == 0);
404     procs_[proc] = h;
405     VERIFY(procs_.count(proc) >= 1);
406 }
407
408 void
409 rpcs::updatestat(proc_t proc)
410 {
411     lock cl(count_m_);
412     counts_[proc]++;
413     curr_counts_--;
414     if(curr_counts_ == 0){
415         LOG("RPC STATS: ");
416         for (auto i = counts_.begin(); i != counts_.end(); i++)
417             LOG(hex << i->first << ":" << dec << i->second);
418
419         lock rwl(reply_window_m_);
420
421         size_t totalrep = 0, maxrep = 0;
422         for (auto clt : reply_window_) {
423             totalrep += clt.second.size();
424             if(clt.second.size() > maxrep)
425                 maxrep = clt.second.size();
426         }
427         IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
428                         totalrep << " max per client " << maxrep);
429         curr_counts_ = counting_;
430     }
431 }
432
433 void
434 rpcs::dispatch(djob_t *j)
435 {
436     connection *c = j->conn;
437     unmarshall req(j->buf, j->sz);
438     delete j;
439
440     request_header h;
441     req.unpack_req_header(&h);
442     proc_t proc = h.proc;
443
444     if(!req.ok()){
445         IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!");
446         c->decref();
447         return;
448     }
449
450     IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " <<
451                     dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
452
453     marshall rep;
454     reply_header rh(h.xid,0);
455
456     // is client sending to an old instance of server?
457     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
458         IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce <<
459                         " (current " << nonce_ << ") proc " << hex << h.proc);
460         rh.ret = rpc_const::oldsrv_failure;
461         rep.pack_reply_header(rh);
462         c->send(rep.cstr(),rep.size());
463         return;
464     }
465
466     handler *f;
467     // is RPC proc a registered procedure?
468     {
469         lock pl(procs_m_);
470         if(procs_.count(proc) < 1){
471             cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl;
472             c->decref();
473             VERIFY(0);
474             return;
475         }
476
477         f = procs_[proc];
478     }
479
480     rpcs::rpcstate_t stat;
481     char *b1 = nullptr;
482     size_t sz1 = 0;
483
484     if(h.clt_nonce){
485         // have i seen this client before?
486         {
487             lock rwl(reply_window_m_);
488             // if we don't know about this clt_nonce, create a cleanup object
489             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
490                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
491                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
492                 IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid <<
493                                 " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
494             }
495         }
496
497         // save the latest good connection to the client
498         {
499             lock rwl(conss_m_);
500             if(conns_.find(h.clt_nonce) == conns_.end()){
501                 c->incref();
502                 conns_[h.clt_nonce] = c;
503             } else if(conns_[h.clt_nonce]->compare(c) < 0){
504                 conns_[h.clt_nonce]->decref();
505                 c->incref();
506                 conns_[h.clt_nonce] = c;
507             }
508         }
509
510         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
511                                                  h.xid_rep, &b1, &sz1);
512     } else {
513         // this client does not require at most once logic
514         stat = NEW;
515     }
516
517     switch (stat){
518         case NEW: // new request
519             if(counting_){
520                 updatestat(proc);
521             }
522
523             rh.ret = (*f)(req, rep);
524             if (rh.ret == rpc_const::unmarshal_args_failure) {
525                 cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " <<
526                         "probably calling RPC 0x" << hex << proc << " with the wrong " <<
527                         "types of arguments." << endl;
528                 VERIFY(0);
529             }
530             VERIFY(rh.ret >= 0);
531
532             rep.pack_reply_header(rh);
533             rep.take_buf(&b1,&sz1);
534
535             IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " <<
536                             h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
537
538             if(h.clt_nonce > 0){
539                 // only record replies for clients that require at-most-once logic
540                 add_reply(h.clt_nonce, h.xid, b1, sz1);
541             }
542
543             // get the latest connection to the client
544             {
545                 lock rwl(conss_m_);
546                 if(c->isdead() && c != conns_[h.clt_nonce]){
547                     c->decref();
548                     c = conns_[h.clt_nonce];
549                     c->incref();
550                 }
551             }
552
553             c->send(b1, sz1);
554             if(h.clt_nonce == 0){
555                 // reply is not added to at-most-once window, free it
556                 free(b1);
557             }
558             break;
559         case INPROGRESS: // server is working on this request
560             break;
561         case DONE: // duplicate and we still have the response
562             c->send(b1, sz1);
563             break;
564         case FORGOTTEN: // very old request and we don't have the response anymore
565             IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce);
566             rh.ret = rpc_const::atmostonce_failure;
567             rep.pack_reply_header(rh);
568             c->send(rep.cstr(),rep.size());
569             break;
570     }
571     c->decref();
572 }
573
574 // rpcs::dispatch calls this when an RPC request arrives.
575 //
576 // checks to see if an RPC with xid from clt_nonce has already been received.
577 // if not, remembers the request in reply_window_.
578 //
579 // deletes remembered requests with XIDs <= xid_rep; the client
580 // says it has received a reply for every RPC up through xid_rep.
581 // frees the reply_t::buf of each such request.
582 //
583 // returns one of:
584 //   NEW: never seen this xid before.
585 //   INPROGRESS: seen this xid, and still processing it.
586 //   DONE: seen this xid, previous reply returned in *b and *sz.
587 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
588 rpcs::rpcstate_t
589 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
590         int xid_rep, char **b, size_t *sz)
591 {
592     lock rwl(reply_window_m_);
593
594     list<reply_t> &l = reply_window_[clt_nonce];
595
596     VERIFY(l.size() > 0);
597     VERIFY(xid >= xid_rep);
598
599     int past_xid_rep = l.begin()->xid;
600
601     list<reply_t>::iterator start = l.begin(), it = ++start;
602
603     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
604         // scan for deletion candidates
605         for (; it != l.end() && it->xid < xid_rep; it++) {
606             if (it->cb_present)
607                 free(it->buf);
608         }
609         l.erase(start, it);
610         l.begin()->xid = xid_rep;
611     }
612
613     if (xid < past_xid_rep && past_xid_rep != -1)
614         return FORGOTTEN;
615
616     // skip non-deletion candidates
617     while (it != l.end() && it->xid < xid)
618         it++;
619
620     // if it's in the list it must be right here
621     if (it != l.end() && it->xid == xid) {
622         if (it->cb_present) {
623             // return information about the remembered reply
624             *b = it->buf;
625             *sz = it->sz;
626             return DONE;
627         } else {
628             return INPROGRESS;
629         }
630     } else {
631         // remember that a new request has arrived
632         l.insert(it, reply_t(xid));
633         return NEW;
634     }
635 }
636
637 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
638 // and passes the return value in b and sz.
639 // add_reply() should remember b and sz.
640 // free_reply_window() and checkduplicate_and_update is responsible for
641 // calling free(b).
642 void
643 rpcs::add_reply(unsigned int clt_nonce, int xid,
644         char *b, size_t sz)
645 {
646     lock rwl(reply_window_m_);
647     // remember the RPC reply value
648     list<reply_t> &l = reply_window_[clt_nonce];
649     list<reply_t>::iterator it = l.begin();
650     // skip to our place in the list
651     for (it++; it != l.end() && it->xid < xid; it++);
652     // there should already be an entry, so whine if there isn't
653     if (it == l.end() || it->xid != xid) {
654         cerr << "Could not find reply struct in add_reply" << endl;
655         l.insert(it, reply_t(xid, b, sz));
656     } else {
657         *it = reply_t(xid, b, sz);
658     }
659 }
660
661 void rpcs::free_reply_window(void) {
662     lock rwl(reply_window_m_);
663     for (auto clt : reply_window_) {
664         for (auto it : clt.second){
665             if (it.cb_present)
666                 free(it.buf);
667         }
668         clt.second.clear();
669     }
670     reply_window_.clear();
671 }
672
673 int rpcs::rpcbind(unsigned int &r, int) {
674     IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_);
675     r = nonce_;
676     return 0;
677 }
678
679 marshall &
680 operator<<(marshall &m, uint8_t x) {
681     m.rawbyte(x);
682     return m;
683 }
684
685 marshall &
686 operator<<(marshall &m, uint16_t x) {
687     x = hton(x);
688     m.rawbytes((char *)&x, 2);
689     return m;
690 }
691
692 marshall &
693 operator<<(marshall &m, uint32_t x) {
694     x = hton(x);
695     m.rawbytes((char *)&x, 4);
696     return m;
697 }
698
699 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
700 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
701 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
702 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
703 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
704
705 marshall &
706 operator<<(marshall &m, const string &s) {
707     m << (unsigned int) s.size();
708     m.rawbytes(s.data(), s.size());
709     return m;
710 }
711
712 void marshall::pack_req_header(const request_header &h) {
713     size_t saved_sz = index_;
714     //leave the first 4-byte empty for channel to fill size of pdu
715     index_ = sizeof(rpc_sz_t);
716     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
717     index_ = saved_sz;
718 }
719
720 void marshall::pack_reply_header(const reply_header &h) {
721     size_t saved_sz = index_;
722     //leave the first 4-byte empty for channel to fill size of pdu
723     index_ = sizeof(rpc_sz_t);
724     *this << h.xid << h.ret;
725     index_ = saved_sz;
726 }
727
728 // take the contents from another unmarshall object
729 void
730 unmarshall::take_in(unmarshall &another)
731 {
732     if(buf_)
733         free(buf_);
734     another.take_buf(&buf_, &sz_);
735     index_ = RPC_HEADER_SZ;
736     ok_ = sz_ >= RPC_HEADER_SZ?true:false;
737 }
738
739 inline bool
740 unmarshall::ensure(size_t n) {
741     if (index_+n > sz_)
742         ok_ = false;
743     return ok_;
744 }
745
746 inline uint8_t
747 unmarshall::rawbyte()
748 {
749     if (!ensure(1))
750         return 0;
751     return (uint8_t)buf_[index_++];
752 }
753
754 void
755 unmarshall::rawbytes(string &ss, size_t n)
756 {
757     VERIFY(ensure(n));
758     ss.assign(buf_+index_, n);
759     index_ += n;
760 }
761
762 template <class T>
763 void
764 unmarshall::rawbytes(T &t)
765 {
766     const size_t n = sizeof(T);
767     VERIFY(ensure(n));
768     memcpy(&t, buf_+index_, n);
769     t = ntoh(t);
770     index_ += n;
771 }
772
773 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
774 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
775 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
776 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
777 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
778 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
779 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
780 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
781 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
782 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
783 unmarshall & operator>>(unmarshall &u, string &s) {
784     unsigned sz = u.grab<unsigned>();
785     if(u.ok())
786         u.rawbytes(s, sz);
787     return u;
788 }
789
790 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
791     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
792             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
793              ((a.sin_port < b.sin_port))));
794 }
795
796 /*---------------auxilary function--------------*/
797 sockaddr_in make_sockaddr(const string &hostandport) {
798     auto colon = hostandport.find(':');
799     if (colon == string::npos)
800         return make_sockaddr("127.0.0.1", hostandport);
801     else
802         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
803 }
804
805 sockaddr_in make_sockaddr(const string &host, const string &port) {
806     sockaddr_in dst;
807     bzero(&dst, sizeof(dst));
808     dst.sin_family = AF_INET;
809
810     struct in_addr a{inet_addr(host.c_str())};
811
812     if(a.s_addr != INADDR_NONE)
813         dst.sin_addr.s_addr = a.s_addr;
814     else {
815         struct hostent *hp = gethostbyname(host.c_str());
816
817         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
818             cerr << "cannot find host name " << host << endl;
819             exit(1);
820         }
821         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
822         dst.sin_addr.s_addr = a.s_addr;
823     }
824     dst.sin_port = hton((uint16_t)stoi(port));
825     return dst;
826 }