More clean-ups
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "rpc.h"
58
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
62 #include <netdb.h>
63 #include <unistd.h>
64 #include "lock.h"
65
66 #include "jsl_log.h"
67 #include "lang/verify.h"
68
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
71
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
74 {
75 }
76
77 rpcc::caller::~caller()
78 {
79 }
80
81 inline
82 void set_rand_seed()
83 {
84     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85     srandom((int)now.time_since_epoch().count()^((int)getpid()));
86 }
87
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89     dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
91 {
92     if(retrans){
93         set_rand_seed();
94         clt_nonce_ = random();
95     } else {
96         // special client nonce 0 means this client does not
97         // require at-most-once logic from the server
98         // because it uses tcp and never retries a failed connection
99         clt_nonce_ = 0;
100     }
101
102     char *loss_env = getenv("RPC_LOSSY");
103     if(loss_env != NULL){
104         lossytest_ = atoi(loss_env);
105     }
106
107     // xid starts with 1 and latest received reply starts with 0
108     xid_rep_window_.push_back(0);
109
110     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111             clt_nonce_, lossytest_);
112 }
113
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
116 rpcc::~rpcc()
117 {
118     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119             clt_nonce_, chan_?chan_->channo():-1);
120     if(chan_){
121         chan_->closeconn();
122         chan_->decref();
123     }
124     VERIFY(calls_.size() == 0);
125 }
126
127 int
128 rpcc::bind(TO to)
129 {
130     int r;
131     int ret = call_timeout(rpc_const::bind, to, r, 0);
132     if(ret == 0){
133         lock ml(m_);
134         bind_done_ = true;
135         srv_nonce_ = r;
136     } else {
137         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138                 inet_ntoa(dst_.sin_addr), ret);
139     }
140     return ret;
141 };
142
143 // Cancel all outstanding calls
144     void
145 rpcc::cancel(void)
146 {
147     lock ml(m_);
148     printf("rpcc::cancel: force callers to fail\n");
149     std::map<int,caller*>::iterator iter;
150     for(iter = calls_.begin(); iter != calls_.end(); iter++){
151         caller *ca = iter->second;
152
153         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
154         {
155             lock cl(ca->m);
156             ca->done = true;
157             ca->intret = rpc_const::cancel_failure;
158             ca->c.notify_one();
159         }
160     }
161
162     while (calls_.size () > 0){
163         destroy_wait_ = true;
164         destroy_wait_c_.wait(ml);
165     }
166     printf("rpcc::cancel: done\n");
167 }
168
169 int
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
171         TO to)
172 {
173
174     caller ca(0, &rep);
175         int xid_rep;
176     {
177         lock ml(m_);
178
179         if((proc != rpc_const::bind && !bind_done_) ||
180                 (proc == rpc_const::bind && bind_done_)){
181             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182             return rpc_const::bind_failure;
183         }
184
185         if(destroy_wait_){
186           return rpc_const::cancel_failure;
187         }
188
189         ca.xid = xid_++;
190         calls_[ca.xid] = &ca;
191
192         req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
193         xid_rep = xid_rep_window_.front();
194     }
195
196     TO curr_to;
197     std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
198         std::chrono::steady_clock::now() +
199         std::chrono::milliseconds(to.to),
200         nextdeadline;
201
202     curr_to.to = to_min.to;
203
204     bool transmit = true;
205     connection *ch = NULL;
206
207     while (1){
208         if(transmit){
209             get_refconn(&ch);
210             if(ch){
211                 if(reachable_) {
212                     request forgot;
213                     {
214                         lock ml(m_);
215                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
216                             forgot = dup_req_;
217                             dup_req_.clear();
218                         }
219                     }
220                     if (forgot.isvalid())
221                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
222                     ch->send(req.cstr(), req.size());
223                 }
224                 else jsl_log(JSL_DBG_1, "not reachable\n");
225                 jsl_log(JSL_DBG_2,
226                         "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
227                         clt_nonce_, proc, ca.xid, clt_nonce_);
228             }
229             transmit = false; // only send once on a given channel
230         }
231
232         if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
233             break;
234
235         nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
236         if(nextdeadline > finaldeadline) {
237             nextdeadline = finaldeadline;
238             finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
239         }
240
241         {
242             lock cal(ca.m);
243             while (!ca.done){
244                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
245                 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
246                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
247                     break;
248                 }
249             }
250             if(ca.done){
251                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
252                 break;
253             }
254         }
255
256         if(retrans_ && (!ch || ch->isdead())){
257             // since connection is dead, retransmit
258             // on the new connection
259             transmit = true;
260         }
261         curr_to.to <<= 1;
262     }
263
264     {
265         // no locking of ca.m since only this thread changes ca.xid
266         lock ml(m_);
267         calls_.erase(ca.xid);
268         // may need to update the xid again here, in case the
269         // packet times out before it's even sent by the channel.
270         // I don't think there's any harm in maybe doing it twice
271         update_xid_rep(ca.xid);
272
273         if(destroy_wait_){
274           destroy_wait_c_.notify_one();
275         }
276     }
277
278     if (ca.done && lossytest_)
279     {
280         lock ml(m_);
281         if (!dup_req_.isvalid()) {
282             dup_req_.buf.assign(req.cstr(), req.size());
283             dup_req_.xid = ca.xid;
284         }
285         if (xid_rep > xid_rep_done_)
286             xid_rep_done_ = xid_rep;
287     }
288
289     lock cal(ca.m);
290
291     jsl_log(JSL_DBG_2,
292             "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
293             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
294             ntohs(dst_.sin_port), ca.done, ca.intret);
295
296     if(ch)
297         ch->decref();
298
299     // destruction of req automatically frees its buffer
300     return (ca.done? ca.intret : rpc_const::timeout_failure);
301 }
302
303 void
304 rpcc::get_refconn(connection **ch)
305 {
306     lock ml(chan_m_);
307     if(!chan_ || chan_->isdead()){
308         if(chan_)
309             chan_->decref();
310         chan_ = connect_to_dst(dst_, this, lossytest_);
311     }
312     if(ch && chan_){
313         if(*ch){
314             (*ch)->decref();
315         }
316         *ch = chan_;
317         (*ch)->incref();
318     }
319 }
320
321 // PollMgr's thread is being used to
322 // make this upcall from connection object to rpcc.
323 // this funtion must not block.
324 //
325 // this function keeps no reference for connection *c
326 bool
327 rpcc::got_pdu(connection *c, char *b, int sz)
328 {
329     unmarshall rep(b, sz);
330     reply_header h;
331     rep.unpack_reply_header(&h);
332
333     if(!rep.ok()){
334         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
335         return true;
336     }
337
338     lock ml(m_);
339
340     update_xid_rep(h.xid);
341
342     if(calls_.find(h.xid) == calls_.end()){
343         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
344         return true;
345     }
346     caller *ca = calls_[h.xid];
347
348     lock cl(ca->m);
349     if(!ca->done){
350         ca->un->take_in(rep);
351         ca->intret = h.ret;
352         if(ca->intret < 0){
353             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
354                     h.xid, ca->intret);
355         }
356         ca->done = 1;
357     }
358     ca->c.notify_all();
359     return true;
360 }
361
362 // assumes thread holds mutex m
363 void
364 rpcc::update_xid_rep(unsigned int xid)
365 {
366     std::list<unsigned int>::iterator it;
367
368     if(xid <= xid_rep_window_.front()){
369         return;
370     }
371
372     for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
373         if(*it > xid){
374             xid_rep_window_.insert(it, xid);
375             goto compress;
376         }
377     }
378     xid_rep_window_.push_back(xid);
379
380 compress:
381     it = xid_rep_window_.begin();
382     for (it++; it != xid_rep_window_.end(); it++){
383         while (xid_rep_window_.front() + 1 == *it)
384             xid_rep_window_.pop_front();
385     }
386 }
387
388 rpcs::rpcs(unsigned int p1, int count)
389   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
390 {
391     set_rand_seed();
392     nonce_ = random();
393     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
394
395     char *loss_env = getenv("RPC_LOSSY");
396     if(loss_env != NULL){
397         lossytest_ = atoi(loss_env);
398     }
399
400     reg(rpc_const::bind, &rpcs::rpcbind, this);
401     dispatchpool_ = new ThrPool(6,false);
402
403     listener_ = new tcpsconn(this, port_, lossytest_);
404 }
405
406 rpcs::~rpcs()
407 {
408     // must delete listener before dispatchpool
409     delete listener_;
410     delete dispatchpool_;
411     free_reply_window();
412 }
413
414 bool
415 rpcs::got_pdu(connection *c, char *b, int sz)
416 {
417         if(!reachable_){
418             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
419             return true;
420         }
421
422     djob_t *j = new djob_t(c, b, sz);
423     c->incref();
424     bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
425     if(!succ || !reachable_){
426         c->decref();
427         delete j;
428     }
429     return succ;
430 }
431
432 void
433 rpcs::reg1(unsigned int proc, handler *h)
434 {
435     lock pl(procs_m_);
436     VERIFY(procs_.count(proc) == 0);
437     procs_[proc] = h;
438     VERIFY(procs_.count(proc) >= 1);
439 }
440
441 void
442 rpcs::updatestat(unsigned int proc)
443 {
444     lock cl(count_m_);
445     counts_[proc]++;
446     curr_counts_--;
447     if(curr_counts_ == 0){
448         std::map<int, int>::iterator i;
449         printf("RPC STATS: ");
450         for (i = counts_.begin(); i != counts_.end(); i++){
451             printf("%x:%d ", i->first, i->second);
452         }
453         printf("\n");
454
455         lock rwl(reply_window_m_);
456         std::map<unsigned int,std::list<reply_t> >::iterator clt;
457
458         unsigned int totalrep = 0, maxrep = 0;
459         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
460             totalrep += clt->second.size();
461             if(clt->second.size() > maxrep)
462                 maxrep = clt->second.size();
463         }
464         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
465                         (int) reply_window_.size()-1, totalrep, maxrep);
466         curr_counts_ = counting_;
467     }
468 }
469
470 void
471 rpcs::dispatch(djob_t *j)
472 {
473     connection *c = j->conn;
474     unmarshall req(j->buf, j->sz);
475     delete j;
476
477     request_header h;
478     req.unpack_req_header(&h);
479     int proc = h.proc;
480
481     if(!req.ok()){
482         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
483         c->decref();
484         return;
485     }
486
487     jsl_log(JSL_DBG_2,
488             "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
489             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
490
491     marshall rep;
492     reply_header rh(h.xid,0);
493
494     // is client sending to an old instance of server?
495     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
496         jsl_log(JSL_DBG_2,
497                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
498                 h.srv_nonce, nonce_, h.proc);
499         rh.ret = rpc_const::oldsrv_failure;
500         rep.pack_reply_header(rh);
501         c->send(rep.cstr(),rep.size());
502         return;
503     }
504
505     handler *f;
506     // is RPC proc a registered procedure?
507     {
508         lock pl(procs_m_);
509         if(procs_.count(proc) < 1){
510             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
511                 proc);
512             c->decref();
513                         VERIFY(0);
514             return;
515         }
516
517         f = procs_[proc];
518     }
519
520     rpcs::rpcstate_t stat;
521     char *b1;
522     int sz1;
523
524     if(h.clt_nonce){
525         // have i seen this client before?
526         {
527             lock rwl(reply_window_m_);
528             // if we don't know about this clt_nonce, create a cleanup object
529             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
530                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
531                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
532                 jsl_log(JSL_DBG_2,
533                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
534                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
535             }
536         }
537
538         // save the latest good connection to the client
539         {
540             lock rwl(conss_m_);
541             if(conns_.find(h.clt_nonce) == conns_.end()){
542                 c->incref();
543                 conns_[h.clt_nonce] = c;
544             } else if(conns_[h.clt_nonce]->compare(c) < 0){
545                 conns_[h.clt_nonce]->decref();
546                 c->incref();
547                 conns_[h.clt_nonce] = c;
548             }
549         }
550
551         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
552                                                  h.xid_rep, &b1, &sz1);
553     } else {
554         // this client does not require at most once logic
555         stat = NEW;
556     }
557
558     switch (stat){
559         case NEW: // new request
560             if(counting_){
561                 updatestat(proc);
562             }
563
564             rh.ret = (*f)(req, rep);
565             if (rh.ret == rpc_const::unmarshal_args_failure) {
566                 fprintf(stderr, "rpcs::dispatch: failed to"
567                         " unmarshall the arguments. You are"
568                         " probably calling RPC 0x%x with wrong"
569                         " types of arguments.\n", proc);
570                 VERIFY(0);
571             }
572             VERIFY(rh.ret >= 0);
573
574             rep.pack_reply_header(rh);
575             rep.take_buf(&b1,&sz1);
576
577             jsl_log(JSL_DBG_2,
578                     "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
579                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
580
581             if(h.clt_nonce > 0){
582                 // only record replies for clients that require at-most-once logic
583                 add_reply(h.clt_nonce, h.xid, b1, sz1);
584             }
585
586             // get the latest connection to the client
587             {
588                 lock rwl(conss_m_);
589                 if(c->isdead() && c != conns_[h.clt_nonce]){
590                     c->decref();
591                     c = conns_[h.clt_nonce];
592                     c->incref();
593                 }
594             }
595
596             c->send(b1, sz1);
597             if(h.clt_nonce == 0){
598                 // reply is not added to at-most-once window, free it
599                 free(b1);
600             }
601             break;
602         case INPROGRESS: // server is working on this request
603             break;
604         case DONE: // duplicate and we still have the response
605             c->send(b1, sz1);
606             break;
607         case FORGOTTEN: // very old request and we don't have the response anymore
608             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
609                     h.xid, h.clt_nonce);
610             rh.ret = rpc_const::atmostonce_failure;
611             rep.pack_reply_header(rh);
612             c->send(rep.cstr(),rep.size());
613             break;
614     }
615     c->decref();
616 }
617
618 // rpcs::dispatch calls this when an RPC request arrives.
619 //
620 // checks to see if an RPC with xid from clt_nonce has already been received.
621 // if not, remembers the request in reply_window_.
622 //
623 // deletes remembered requests with XIDs <= xid_rep; the client
624 // says it has received a reply for every RPC up through xid_rep.
625 // frees the reply_t::buf of each such request.
626 //
627 // returns one of:
628 //   NEW: never seen this xid before.
629 //   INPROGRESS: seen this xid, and still processing it.
630 //   DONE: seen this xid, previous reply returned in *b and *sz.
631 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
632 rpcs::rpcstate_t
633 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
634         unsigned int xid_rep, char **b, int *sz)
635 {
636     lock rwl(reply_window_m_);
637
638     std::list<reply_t> &l = reply_window_[clt_nonce];
639
640     VERIFY(l.size() > 0);
641     VERIFY(xid >= xid_rep);
642
643     unsigned int past_xid_rep = l.begin()->xid;
644
645     std::list<reply_t>::iterator start = l.begin(), it;
646     it = ++start;
647
648     if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
649         // scan for deletion candidates
650         for (; it != l.end() && it->xid < xid_rep; it++) {
651             if (it->cb_present)
652                 free(it->buf);
653         }
654         l.erase(start, it);
655         l.begin()->xid = xid_rep;
656     }
657
658     if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
659         return FORGOTTEN;
660
661     // skip non-deletion candidates
662     while (it != l.end() && it->xid < xid)
663         it++;
664
665     // if it's in the list it must be right here
666     if (it != l.end() && it->xid == xid) {
667         if (it->cb_present) {
668             // return information about the remembered reply
669             *b = it->buf;
670             *sz = it->sz;
671             return DONE;
672         } else {
673             return INPROGRESS;
674         }
675     } else {
676         // remember that a new request has arrived
677         l.insert(it, reply_t(xid));
678         return NEW;
679     }
680 }
681
682 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
683 // and passes the return value in b and sz.
684 // add_reply() should remember b and sz.
685 // free_reply_window() and checkduplicate_and_update is responsible for
686 // calling free(b).
687 void
688 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
689         char *b, int sz)
690 {
691     lock rwl(reply_window_m_);
692     // remember the RPC reply value
693     std::list<reply_t> &l = reply_window_[clt_nonce];
694     std::list<reply_t>::iterator it = l.begin();
695     // skip to our place in the list
696     for (it++; it != l.end() && it->xid < xid; it++);
697     // there should already be an entry, so whine if there isn't
698     if (it == l.end() || it->xid != xid) {
699         fprintf(stderr, "Could not find reply struct in add_reply");
700         l.insert(it, reply_t(xid, b, sz));
701     } else {
702         *it = reply_t(xid, b, sz);
703     }
704 }
705
706 void
707 rpcs::free_reply_window(void)
708 {
709     std::map<unsigned int,std::list<reply_t> >::iterator clt;
710     std::list<reply_t>::iterator it;
711
712     lock rwl(reply_window_m_);
713     for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
714         for (it = clt->second.begin(); it != clt->second.end(); it++){
715             if (it->cb_present)
716                 free(it->buf);
717         }
718         clt->second.clear();
719     }
720     reply_window_.clear();
721 }
722
723 // rpc handler
724 int
725 rpcs::rpcbind(int &r, int a)
726 {
727     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
728     r = nonce_;
729     return 0;
730 }
731
732 marshall &
733 operator<<(marshall &m, uint8_t x) {
734     m.rawbyte(x);
735     return m;
736 }
737
738 marshall &
739 operator<<(marshall &m, uint16_t x) {
740     x = htons(x);
741     m.rawbytes((char *)&x, 2);
742     return m;
743 }
744
745 marshall &
746 operator<<(marshall &m, uint32_t x) {
747     x = htonl(x);
748     m.rawbytes((char *)&x, 4);
749     return m;
750 }
751
752 marshall & operator<<(marshall &m, int x) { return m << (unsigned int) x; }
753 marshall & operator<<(marshall &m, char x) { return m << (uint8_t)x; }
754 marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; }
755 marshall & operator<<(marshall &m, short x) { return m << (unsigned short) x; }
756 marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; }
757
758 marshall &
759 operator<<(marshall &m, const std::string &s) {
760     m << (unsigned int) s.size();
761     m.rawbytes(s.data(), s.size());
762     return m;
763 }
764
765 void marshall::pack_req_header(const request_header &h) {
766     int saved_sz = index_;
767     //leave the first 4-byte empty for channel to fill size of pdu
768     index_ = sizeof(rpc_sz_t);
769     *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep;
770     index_ = saved_sz;
771 }
772
773 void marshall::pack_reply_header(const reply_header &h) {
774     int saved_sz = index_;
775     //leave the first 4-byte empty for channel to fill size of pdu
776     index_ = sizeof(rpc_sz_t);
777     *this << h.xid << h.ret;
778     index_ = saved_sz;
779 }
780
781 void
782 unmarshall::unpack(int *x)
783 {
784     (*x) = (rawbyte() & 0xff) << 24;
785     (*x) |= (rawbyte() & 0xff) << 16;
786     (*x) |= (rawbyte() & 0xff) << 8;
787     (*x) |= rawbyte() & 0xff;
788 }
789
790 // take the contents from another unmarshall object
791 void
792 unmarshall::take_in(unmarshall &another)
793 {
794     if(buf_)
795         free(buf_);
796     another.take_buf(&buf_, &sz_);
797     index_ = RPC_HEADER_SZ;
798     ok_ = sz_ >= RPC_HEADER_SZ?true:false;
799 }
800
801 inline bool
802 unmarshall::ensure(size_t n) {
803     if (index_+n > sz_)
804         ok_ = false;
805     return ok_;
806 }
807
808 unsigned int
809 unmarshall::rawbyte()
810 {
811     if (!ensure(1))
812         return 0;
813     return buf_[index_++];
814 }
815
816 void
817 unmarshall::rawbytes(std::string &ss, size_t n)
818 {
819     VERIFY(ensure(n));
820     ss.assign(buf_+index_, n);
821     index_ += n;
822 }
823
824 unmarshall &
825 operator>>(unmarshall &u, bool &x)
826 {
827     x = (bool)u.rawbyte();
828     return u;
829 }
830
831 unmarshall &
832 operator>>(unmarshall &u, unsigned char &x)
833 {
834     x = (unsigned char)u.rawbyte();
835     return u;
836 }
837
838 unmarshall &
839 operator>>(unmarshall &u, char &x)
840 {
841     x = (char)u.rawbyte();
842     return u;
843 }
844
845 unmarshall &
846 operator>>(unmarshall &u, unsigned short &x)
847 {
848     x = (u.rawbyte() & 0xff) << 8;
849     x |= u.rawbyte() & 0xff;
850     return u;
851 }
852
853 unmarshall &
854 operator>>(unmarshall &u, short &x)
855 {
856     x = (u.rawbyte() & 0xff) << 8;
857     x |= u.rawbyte() & 0xff;
858     return u;
859 }
860
861 unmarshall &
862 operator>>(unmarshall &u, unsigned int &x)
863 {
864     x = (u.rawbyte() & 0xff) << 24;
865     x |= (u.rawbyte() & 0xff) << 16;
866     x |= (u.rawbyte() & 0xff) << 8;
867     x |= u.rawbyte() & 0xff;
868     return u;
869 }
870
871 unmarshall &
872 operator>>(unmarshall &u, int &x)
873 {
874     x = (u.rawbyte() & 0xff) << 24;
875     x |= (u.rawbyte() & 0xff) << 16;
876     x |= (u.rawbyte() & 0xff) << 8;
877     x |= u.rawbyte() & 0xff;
878     return u;
879 }
880
881 unmarshall &
882 operator>>(unmarshall &u, unsigned long long &x)
883 {
884     unsigned int h, l;
885     u >> h;
886     u >> l;
887     x = l | ((unsigned long long) h << 32);
888     return u;
889 }
890
891 unmarshall &
892 operator>>(unmarshall &u, std::string &s)
893 {
894     unsigned sz;
895     u >> sz;
896     if(u.ok())
897         u.rawbytes(s, sz);
898     return u;
899 }
900
901 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
902     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
903             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
904              ((a.sin_port < b.sin_port))));
905 }
906
907 /*---------------auxilary function--------------*/
908 void
909 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
910
911     char host[200];
912     const char *localhost = "127.0.0.1";
913     const char *port = index(hostandport, ':');
914     if(port == NULL){
915         memcpy(host, localhost, strlen(localhost)+1);
916         port = hostandport;
917     } else {
918         memcpy(host, hostandport, port-hostandport);
919         host[port-hostandport] = '\0';
920         port++;
921     }
922
923     make_sockaddr(host, port, dst);
924
925 }
926
927 void
928 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
929
930     in_addr_t a;
931
932     bzero(dst, sizeof(*dst));
933     dst->sin_family = AF_INET;
934
935     a = inet_addr(host);
936     if(a != INADDR_NONE){
937         dst->sin_addr.s_addr = a;
938     } else {
939         struct hostent *hp = gethostbyname(host);
940         if(hp == 0 || hp->h_length != 4){
941             fprintf(stderr, "cannot find host name %s\n", host);
942             exit(1);
943         }
944         dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
945     }
946     dst->sin_port = htons(atoi(port));
947 }