ad3fcd9e8d2918a1d37d41b844884ac988dae0fa
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "types.h"
58 #include "rpc.h"
59
60 #include <sys/types.h>
61 #include <arpa/inet.h>
62 #include <netinet/tcp.h>
63 #include <netdb.h>
64 #include <unistd.h>
65
66 #include "jsl_log.h"
67
68 const rpcc::TO rpcc::to_max = { 120000 };
69 const rpcc::TO rpcc::to_min = { 1000 };
70
71 inline void set_rand_seed() {
72     auto now = time_point_cast<nanoseconds>(steady_clock::now());
73     srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
74 }
75
76 rpcc::rpcc(const string & d, bool retrans) :
77     dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
78     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
79 {
80     if(retrans){
81         set_rand_seed();
82         clt_nonce_ = (unsigned int)random();
83     } else {
84         // special client nonce 0 means this client does not
85         // require at-most-once logic from the server
86         // because it uses tcp and never retries a failed connection
87         clt_nonce_ = 0;
88     }
89
90     char *loss_env = getenv("RPC_LOSSY");
91     if(loss_env != NULL){
92         lossytest_ = atoi(loss_env);
93     }
94
95     // xid starts with 1 and latest received reply starts with 0
96     xid_rep_window_.push_back(0);
97
98     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
99             clt_nonce_, lossytest_);
100 }
101
102 // IMPORTANT: destruction should happen only when no external threads
103 // are blocked inside rpcc or will use rpcc in the future
104 rpcc::~rpcc()
105 {
106     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
107             clt_nonce_, chan_?chan_->channo():-1);
108     if(chan_){
109         chan_->closeconn();
110         chan_->decref();
111     }
112     VERIFY(calls_.size() == 0);
113 }
114
115 int
116 rpcc::bind(TO to)
117 {
118     unsigned int r;
119     int ret = call_timeout(rpc_const::bind, to, r, 0);
120     if(ret == 0){
121         lock ml(m_);
122         bind_done_ = true;
123         srv_nonce_ = r;
124     } else {
125         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
126                 inet_ntoa(dst_.sin_addr), ret);
127     }
128     return ret;
129 };
130
131 // Cancel all outstanding calls
132     void
133 rpcc::cancel(void)
134 {
135     lock ml(m_);
136     LOG("rpcc::cancel: force callers to fail");
137     for(auto &p : calls_){
138         caller *ca = p.second;
139
140         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
141         {
142             lock cl(ca->m);
143             ca->done = true;
144             ca->intret = rpc_const::cancel_failure;
145             ca->c.notify_one();
146         }
147     }
148
149     while (calls_.size () > 0){
150         destroy_wait_ = true;
151         destroy_wait_c_.wait(ml);
152     }
153     LOG("rpcc::cancel: done");
154 }
155
156 int
157 rpcc::call1(proc_t proc, marshall &req, unmarshall &rep,
158         TO to)
159 {
160
161     caller ca(0, &rep);
162     int xid_rep;
163     {
164         lock ml(m_);
165
166         if((proc != rpc_const::bind && !bind_done_) ||
167                 (proc == rpc_const::bind && bind_done_)){
168             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
169             return rpc_const::bind_failure;
170         }
171
172         if(destroy_wait_){
173           return rpc_const::cancel_failure;
174         }
175
176         ca.xid = xid_++;
177         calls_[ca.xid] = &ca;
178
179         req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
180         xid_rep = xid_rep_window_.front();
181     }
182
183     TO curr_to;
184     auto finaldeadline = steady_clock::now() + milliseconds(to.to),
185         nextdeadline = finaldeadline;
186
187     curr_to.to = to_min.to;
188
189     bool transmit = true;
190     connection *ch = NULL;
191
192     while (1){
193         if(transmit){
194             get_refconn(&ch);
195             if(ch){
196                 if(reachable_) {
197                     request forgot;
198                     {
199                         lock ml(m_);
200                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
201                             forgot = dup_req_;
202                             dup_req_.clear();
203                         }
204                     }
205                     if (forgot.isvalid())
206                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
207                     ch->send(req.cstr(), req.size());
208                 }
209                 else jsl_log(JSL_DBG_1, "not reachable\n");
210                 jsl_log(JSL_DBG_2,
211                         "rpcc::call1 %u just sent req proc %x xid %d clt_nonce %d\n",
212                         clt_nonce_, proc, ca.xid, clt_nonce_);
213             }
214             transmit = false; // only send once on a given channel
215         }
216
217         if(finaldeadline == time_point<steady_clock>::min())
218             break;
219
220         nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
221         if(nextdeadline > finaldeadline) {
222             nextdeadline = finaldeadline;
223             finaldeadline = time_point<steady_clock>::min();
224         }
225
226         {
227             lock cal(ca.m);
228             while (!ca.done){
229                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
230                 if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){
231                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
232                     break;
233                 }
234             }
235             if(ca.done){
236                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
237                 break;
238             }
239         }
240
241         if(retrans_ && (!ch || ch->isdead())){
242             // since connection is dead, retransmit
243             // on the new connection
244             transmit = true;
245         }
246         curr_to.to <<= 1;
247     }
248
249     {
250         // no locking of ca.m since only this thread changes ca.xid
251         lock ml(m_);
252         calls_.erase(ca.xid);
253         // may need to update the xid again here, in case the
254         // packet times out before it's even sent by the channel.
255         // I don't think there's any harm in maybe doing it twice
256         update_xid_rep(ca.xid);
257
258         if(destroy_wait_){
259           destroy_wait_c_.notify_one();
260         }
261     }
262
263     if (ca.done && lossytest_)
264     {
265         lock ml(m_);
266         if (!dup_req_.isvalid()) {
267             dup_req_.buf.assign(req.cstr(), req.size());
268             dup_req_.xid = ca.xid;
269         }
270         if (xid_rep > xid_rep_done_)
271             xid_rep_done_ = xid_rep;
272     }
273
274     lock cal(ca.m);
275
276     jsl_log(JSL_DBG_2,
277             "rpcc::call1 %u call done for req proc %x xid %d %s:%d done? %d ret %d \n",
278             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
279             ntohs(dst_.sin_port), ca.done, ca.intret);
280
281     if(ch)
282         ch->decref();
283
284     // destruction of req automatically frees its buffer
285     return (ca.done? ca.intret : rpc_const::timeout_failure);
286 }
287
288 void
289 rpcc::get_refconn(connection **ch)
290 {
291     lock ml(chan_m_);
292     if(!chan_ || chan_->isdead()){
293         if(chan_)
294             chan_->decref();
295         chan_ = connect_to_dst(dst_, this, lossytest_);
296     }
297     if(ch && chan_){
298         if(*ch){
299             (*ch)->decref();
300         }
301         *ch = chan_;
302         (*ch)->incref();
303     }
304 }
305
306 // PollMgr's thread is being used to
307 // make this upcall from connection object to rpcc.
308 // this funtion must not block.
309 //
310 // this function keeps no reference for connection *c
311 bool
312 rpcc::got_pdu(connection *, char *b, size_t sz)
313 {
314     unmarshall rep(b, sz);
315     reply_header h;
316     rep.unpack_reply_header(&h);
317
318     if(!rep.ok()){
319         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
320         return true;
321     }
322
323     lock ml(m_);
324
325     update_xid_rep(h.xid);
326
327     if(calls_.find(h.xid) == calls_.end()){
328         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
329         return true;
330     }
331     caller *ca = calls_[h.xid];
332
333     lock cl(ca->m);
334     if(!ca->done){
335         ca->un->take_in(rep);
336         ca->intret = h.ret;
337         if(ca->intret < 0){
338             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
339                     h.xid, ca->intret);
340         }
341         ca->done = 1;
342     }
343     ca->c.notify_all();
344     return true;
345 }
346
347 // assumes thread holds mutex m
348 void
349 rpcc::update_xid_rep(int xid)
350 {
351     if(xid <= xid_rep_window_.front()){
352         return;
353     }
354
355     for (auto it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
356         if(*it > xid){
357             xid_rep_window_.insert(it, xid);
358             goto compress;
359         }
360     }
361     xid_rep_window_.push_back(xid);
362
363 compress:
364     auto it = xid_rep_window_.begin();
365     for (it++; it != xid_rep_window_.end(); it++){
366         while (xid_rep_window_.front() + 1 == *it)
367             xid_rep_window_.pop_front();
368     }
369 }
370
371 rpcs::rpcs(unsigned int p1, size_t count)
372   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
373 {
374     set_rand_seed();
375     nonce_ = (unsigned int)random();
376     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
377
378     char *loss_env = getenv("RPC_LOSSY");
379     if(loss_env != NULL){
380         lossytest_ = atoi(loss_env);
381     }
382
383     reg(rpc_const::bind, &rpcs::rpcbind, this);
384     dispatchpool_ = new ThrPool(6,false);
385
386     listener_ = new tcpsconn(this, port_, lossytest_);
387 }
388
389 rpcs::~rpcs()
390 {
391     // must delete listener before dispatchpool
392     delete listener_;
393     delete dispatchpool_;
394     free_reply_window();
395 }
396
397 bool
398 rpcs::got_pdu(connection *c, char *b, size_t sz)
399 {
400         if(!reachable_){
401             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
402             return true;
403         }
404
405     djob_t *j = new djob_t(c, b, sz);
406     c->incref();
407     bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j));
408     if(!succ || !reachable_){
409         c->decref();
410         delete j;
411     }
412     return succ;
413 }
414
415 void
416 rpcs::reg1(proc_t proc, handler *h)
417 {
418     lock pl(procs_m_);
419     VERIFY(procs_.count(proc) == 0);
420     procs_[proc] = h;
421     VERIFY(procs_.count(proc) >= 1);
422 }
423
424 void
425 rpcs::updatestat(proc_t proc)
426 {
427     lock cl(count_m_);
428     counts_[proc]++;
429     curr_counts_--;
430     if(curr_counts_ == 0){
431         LOG("RPC STATS: ");
432         for (auto i = counts_.begin(); i != counts_.end(); i++)
433             LOG(hex << i->first << ":" << dec << i->second);
434
435         lock rwl(reply_window_m_);
436
437         size_t totalrep = 0, maxrep = 0;
438         for (auto clt : reply_window_) {
439             totalrep += clt.second.size();
440             if(clt.second.size() > maxrep)
441                 maxrep = clt.second.size();
442         }
443         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %lu max per client %lu\n",
444                         (int) reply_window_.size()-1, totalrep, maxrep);
445         curr_counts_ = counting_;
446     }
447 }
448
449 void
450 rpcs::dispatch(djob_t *j)
451 {
452     connection *c = j->conn;
453     unmarshall req(j->buf, j->sz);
454     delete j;
455
456     request_header h;
457     req.unpack_req_header(&h);
458     proc_t proc = h.proc;
459
460     if(!req.ok()){
461         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
462         c->decref();
463         return;
464     }
465
466     jsl_log(JSL_DBG_2,
467             "rpcs::dispatch: rpc %d (proc %x, last_rep %d) from clt %u for srv instance %u \n",
468             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
469
470     marshall rep;
471     reply_header rh(h.xid,0);
472
473     // is client sending to an old instance of server?
474     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
475         jsl_log(JSL_DBG_2,
476                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
477                 h.srv_nonce, nonce_, h.proc);
478         rh.ret = rpc_const::oldsrv_failure;
479         rep.pack_reply_header(rh);
480         c->send(rep.cstr(),rep.size());
481         return;
482     }
483
484     handler *f;
485     // is RPC proc a registered procedure?
486     {
487         lock pl(procs_m_);
488         if(procs_.count(proc) < 1){
489             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
490                 proc);
491             c->decref();
492                         VERIFY(0);
493             return;
494         }
495
496         f = procs_[proc];
497     }
498
499     rpcs::rpcstate_t stat;
500     char *b1 = nullptr;
501     size_t sz1 = 0;
502
503     if(h.clt_nonce){
504         // have i seen this client before?
505         {
506             lock rwl(reply_window_m_);
507             // if we don't know about this clt_nonce, create a cleanup object
508             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
509                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
510                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
511                 jsl_log(JSL_DBG_2,
512                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
513                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
514             }
515         }
516
517         // save the latest good connection to the client
518         {
519             lock rwl(conss_m_);
520             if(conns_.find(h.clt_nonce) == conns_.end()){
521                 c->incref();
522                 conns_[h.clt_nonce] = c;
523             } else if(conns_[h.clt_nonce]->compare(c) < 0){
524                 conns_[h.clt_nonce]->decref();
525                 c->incref();
526                 conns_[h.clt_nonce] = c;
527             }
528         }
529
530         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
531                                                  h.xid_rep, &b1, &sz1);
532     } else {
533         // this client does not require at most once logic
534         stat = NEW;
535     }
536
537     switch (stat){
538         case NEW: // new request
539             if(counting_){
540                 updatestat(proc);
541             }
542
543             rh.ret = (*f)(req, rep);
544             if (rh.ret == rpc_const::unmarshal_args_failure) {
545                 fprintf(stderr, "rpcs::dispatch: failed to"
546                         " unmarshall the arguments. You are"
547                         " probably calling RPC 0x%x with wrong"
548                         " types of arguments.\n", proc);
549                 VERIFY(0);
550             }
551             VERIFY(rh.ret >= 0);
552
553             rep.pack_reply_header(rh);
554             rep.take_buf(&b1,&sz1);
555
556             jsl_log(JSL_DBG_2,
557                     "rpcs::dispatch: sending and saving reply of size %lu for rpc %d, proc %x ret %d, clt %u\n",
558                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
559
560             if(h.clt_nonce > 0){
561                 // only record replies for clients that require at-most-once logic
562                 add_reply(h.clt_nonce, h.xid, b1, sz1);
563             }
564
565             // get the latest connection to the client
566             {
567                 lock rwl(conss_m_);
568                 if(c->isdead() && c != conns_[h.clt_nonce]){
569                     c->decref();
570                     c = conns_[h.clt_nonce];
571                     c->incref();
572                 }
573             }
574
575             c->send(b1, sz1);
576             if(h.clt_nonce == 0){
577                 // reply is not added to at-most-once window, free it
578                 free(b1);
579             }
580             break;
581         case INPROGRESS: // server is working on this request
582             break;
583         case DONE: // duplicate and we still have the response
584             c->send(b1, sz1);
585             break;
586         case FORGOTTEN: // very old request and we don't have the response anymore
587             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %d from %u\n",
588                     h.xid, h.clt_nonce);
589             rh.ret = rpc_const::atmostonce_failure;
590             rep.pack_reply_header(rh);
591             c->send(rep.cstr(),rep.size());
592             break;
593     }
594     c->decref();
595 }
596
597 // rpcs::dispatch calls this when an RPC request arrives.
598 //
599 // checks to see if an RPC with xid from clt_nonce has already been received.
600 // if not, remembers the request in reply_window_.
601 //
602 // deletes remembered requests with XIDs <= xid_rep; the client
603 // says it has received a reply for every RPC up through xid_rep.
604 // frees the reply_t::buf of each such request.
605 //
606 // returns one of:
607 //   NEW: never seen this xid before.
608 //   INPROGRESS: seen this xid, and still processing it.
609 //   DONE: seen this xid, previous reply returned in *b and *sz.
610 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
611 rpcs::rpcstate_t
612 rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
613         int xid_rep, char **b, size_t *sz)
614 {
615     lock rwl(reply_window_m_);
616
617     list<reply_t> &l = reply_window_[clt_nonce];
618
619     VERIFY(l.size() > 0);
620     VERIFY(xid >= xid_rep);
621
622     int past_xid_rep = l.begin()->xid;
623
624     list<reply_t>::iterator start = l.begin(), it = ++start;
625
626     if (past_xid_rep < xid_rep || past_xid_rep == -1) {
627         // scan for deletion candidates
628         for (; it != l.end() && it->xid < xid_rep; it++) {
629             if (it->cb_present)
630                 free(it->buf);
631         }
632         l.erase(start, it);
633         l.begin()->xid = xid_rep;
634     }
635
636     if (xid < past_xid_rep && past_xid_rep != -1)
637         return FORGOTTEN;
638
639     // skip non-deletion candidates
640     while (it != l.end() && it->xid < xid)
641         it++;
642
643     // if it's in the list it must be right here
644     if (it != l.end() && it->xid == xid) {
645         if (it->cb_present) {
646             // return information about the remembered reply
647             *b = it->buf;
648             *sz = it->sz;
649             return DONE;
650         } else {
651             return INPROGRESS;
652         }
653     } else {
654         // remember that a new request has arrived
655         l.insert(it, reply_t(xid));
656         return NEW;
657     }
658 }
659
660 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
661 // and passes the return value in b and sz.
662 // add_reply() should remember b and sz.
663 // free_reply_window() and checkduplicate_and_update is responsible for
664 // calling free(b).
665 void
666 rpcs::add_reply(unsigned int clt_nonce, int xid,
667         char *b, size_t sz)
668 {
669     lock rwl(reply_window_m_);
670     // remember the RPC reply value
671     list<reply_t> &l = reply_window_[clt_nonce];
672     list<reply_t>::iterator it = l.begin();
673     // skip to our place in the list
674     for (it++; it != l.end() && it->xid < xid; it++);
675     // there should already be an entry, so whine if there isn't
676     if (it == l.end() || it->xid != xid) {
677         fprintf(stderr, "Could not find reply struct in add_reply");
678         l.insert(it, reply_t(xid, b, sz));
679     } else {
680         *it = reply_t(xid, b, sz);
681     }
682 }
683
684 void rpcs::free_reply_window(void) {
685     lock rwl(reply_window_m_);
686     for (auto clt : reply_window_) {
687         for (auto it : clt.second){
688             if (it.cb_present)
689                 free(it.buf);
690         }
691         clt.second.clear();
692     }
693     reply_window_.clear();
694 }
695
696 int rpcs::rpcbind(unsigned int &r, int) {
697     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
698     r = nonce_;
699     return 0;
700 }
701
702 marshall &
703 operator<<(marshall &m, uint8_t x) {
704     m.rawbyte(x);
705     return m;
706 }
707
708 marshall &
709 operator<<(marshall &m, uint16_t x) {
710     x = hton(x);
711     m.rawbytes((char *)&x, 2);
712     return m;
713 }
714
715 marshall &
716 operator<<(marshall &m, uint32_t x) {
717     x = hton(x);
718     m.rawbytes((char *)&x, 4);
719     return m;
720 }
721
722 marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; }
723 marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; }
724 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
725 marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; }
726 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
727
728 marshall &
729 operator<<(marshall &m, const string &s) {
730     m << (unsigned int) s.size();
731     m.rawbytes(s.data(), s.size());
732     return m;
733 }
734
735 void marshall::pack_req_header(const request_header &h) {
736     size_t saved_sz = index_;
737     //leave the first 4-byte empty for channel to fill size of pdu
738     index_ = sizeof(rpc_sz_t);
739     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
740     index_ = saved_sz;
741 }
742
743 void marshall::pack_reply_header(const reply_header &h) {
744     size_t saved_sz = index_;
745     //leave the first 4-byte empty for channel to fill size of pdu
746     index_ = sizeof(rpc_sz_t);
747     *this << h.xid << h.ret;
748     index_ = saved_sz;
749 }
750
751 // take the contents from another unmarshall object
752 void
753 unmarshall::take_in(unmarshall &another)
754 {
755     if(buf_)
756         free(buf_);
757     another.take_buf(&buf_, &sz_);
758     index_ = RPC_HEADER_SZ;
759     ok_ = sz_ >= RPC_HEADER_SZ?true:false;
760 }
761
762 inline bool
763 unmarshall::ensure(size_t n) {
764     if (index_+n > sz_)
765         ok_ = false;
766     return ok_;
767 }
768
769 inline uint8_t
770 unmarshall::rawbyte()
771 {
772     if (!ensure(1))
773         return 0;
774     return (uint8_t)buf_[index_++];
775 }
776
777 void
778 unmarshall::rawbytes(string &ss, size_t n)
779 {
780     VERIFY(ensure(n));
781     ss.assign(buf_+index_, n);
782     index_ += n;
783 }
784
785 template <class T>
786 void
787 unmarshall::rawbytes(T &t)
788 {
789     const size_t n = sizeof(T);
790     VERIFY(ensure(n));
791     memcpy(&t, buf_+index_, n);
792     t = ntoh(t);
793     index_ += n;
794 }
795
796 unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; }
797 unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; }
798 unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; }
799 unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes<uint16_t>(x); return u; }
800 unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes<int16_t>(x); return u; }
801 unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes<uint32_t>(x); return u; }
802 unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes<int32_t>(x); return u; }
803 unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes<uint32_t>(xx); x = xx; return u; }
804 unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes<uint64_t>(x); return u; }
805 unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes<int64_t>(x); return u; }
806 unmarshall & operator>>(unmarshall &u, string &s) {
807     unsigned sz = u.grab<unsigned>();
808     if(u.ok())
809         u.rawbytes(s, sz);
810     return u;
811 }
812
813 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
814     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
815             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
816              ((a.sin_port < b.sin_port))));
817 }
818
819 /*---------------auxilary function--------------*/
820 sockaddr_in make_sockaddr(const string &hostandport) {
821     auto colon = hostandport.find(':');
822     if (colon == string::npos)
823         return make_sockaddr("127.0.0.1", hostandport);
824     else
825         return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1));
826 }
827
828 sockaddr_in make_sockaddr(const string &host, const string &port) {
829     sockaddr_in dst;
830     bzero(&dst, sizeof(dst));
831     dst.sin_family = AF_INET;
832
833     struct in_addr a{inet_addr(host.c_str())};
834
835     if(a.s_addr != INADDR_NONE)
836         dst.sin_addr.s_addr = a.s_addr;
837     else {
838         struct hostent *hp = gethostbyname(host.c_str());
839
840         if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
841             fprintf(stderr, "cannot find host name %s\n", host.c_str());
842             exit(1);
843         }
844         memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
845         dst.sin_addr.s_addr = a.s_addr;
846     }
847     dst.sin_port = hton((uint16_t)stoi(port));
848     return dst;
849 }