TEMPLATE MAGIC FOR GREAT JUSTICE
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "rpc.h"
58
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
62 #include <netdb.h>
63 #include <unistd.h>
64 #include "lock.h"
65
66 #include "jsl_log.h"
67 #include "lang/verify.h"
68
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
71
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
74 {
75 }
76
77 rpcc::caller::~caller()
78 {
79 }
80
81 inline
82 void set_rand_seed()
83 {
84     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85     srandom((int)now.time_since_epoch().count()^((int)getpid()));
86 }
87
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89     dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
91 {
92     if(retrans){
93         set_rand_seed();
94         clt_nonce_ = random();
95     } else {
96         // special client nonce 0 means this client does not
97         // require at-most-once logic from the server
98         // because it uses tcp and never retries a failed connection
99         clt_nonce_ = 0;
100     }
101
102     char *loss_env = getenv("RPC_LOSSY");
103     if(loss_env != NULL){
104         lossytest_ = atoi(loss_env);
105     }
106
107     // xid starts with 1 and latest received reply starts with 0
108     xid_rep_window_.push_back(0);
109
110     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111             clt_nonce_, lossytest_);
112 }
113
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
116 rpcc::~rpcc()
117 {
118     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119             clt_nonce_, chan_?chan_->channo():-1);
120     if(chan_){
121         chan_->closeconn();
122         chan_->decref();
123     }
124     VERIFY(calls_.size() == 0);
125 }
126
127 int
128 rpcc::bind(TO to)
129 {
130     int r;
131     int ret = call_timeout(rpc_const::bind, to, r, 0);
132     if(ret == 0){
133         lock ml(m_);
134         bind_done_ = true;
135         srv_nonce_ = r;
136     } else {
137         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138                 inet_ntoa(dst_.sin_addr), ret);
139     }
140     return ret;
141 };
142
143 // Cancel all outstanding calls
144     void
145 rpcc::cancel(void)
146 {
147     lock ml(m_);
148     printf("rpcc::cancel: force callers to fail\n");
149     std::map<int,caller*>::iterator iter;
150     for(iter = calls_.begin(); iter != calls_.end(); iter++){
151         caller *ca = iter->second;
152
153         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
154         {
155             lock cl(ca->m);
156             ca->done = true;
157             ca->intret = rpc_const::cancel_failure;
158             ca->c.notify_one();
159         }
160     }
161
162     while (calls_.size () > 0){
163         destroy_wait_ = true;
164         destroy_wait_c_.wait(ml);
165     }
166     printf("rpcc::cancel: done\n");
167 }
168
169 int
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
171         TO to)
172 {
173
174     caller ca(0, &rep);
175         int xid_rep;
176     {
177         lock ml(m_);
178
179         if((proc != rpc_const::bind && !bind_done_) ||
180                 (proc == rpc_const::bind && bind_done_)){
181             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182             return rpc_const::bind_failure;
183         }
184
185         if(destroy_wait_){
186           return rpc_const::cancel_failure;
187         }
188
189         ca.xid = xid_++;
190         calls_[ca.xid] = &ca;
191
192         req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
193                              xid_rep_window_.front());
194         req.pack_req_header(h);
195                 xid_rep = xid_rep_window_.front();
196     }
197
198     TO curr_to;
199     std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200         std::chrono::steady_clock::now() +
201         std::chrono::milliseconds(to.to),
202         nextdeadline;
203
204     curr_to.to = to_min.to;
205
206     bool transmit = true;
207     connection *ch = NULL;
208
209     while (1){
210         if(transmit){
211             get_refconn(&ch);
212             if(ch){
213                 if(reachable_) {
214                     request forgot;
215                     {
216                         lock ml(m_);
217                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
218                             forgot = dup_req_;
219                             dup_req_.clear();
220                         }
221                     }
222                     if (forgot.isvalid())
223                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224                     ch->send(req.cstr(), req.size());
225                 }
226                 else jsl_log(JSL_DBG_1, "not reachable\n");
227                 jsl_log(JSL_DBG_2,
228                         "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
229                         clt_nonce_, proc, ca.xid, clt_nonce_);
230             }
231             transmit = false; // only send once on a given channel
232         }
233
234         if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
235             break;
236
237         nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238         if(nextdeadline > finaldeadline) {
239             nextdeadline = finaldeadline;
240             finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
241         }
242
243         {
244             lock cal(ca.m);
245             while (!ca.done){
246                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247                 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
249                     break;
250                 }
251             }
252             if(ca.done){
253                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
254                 break;
255             }
256         }
257
258         if(retrans_ && (!ch || ch->isdead())){
259             // since connection is dead, retransmit
260             // on the new connection
261             transmit = true;
262         }
263         curr_to.to <<= 1;
264     }
265
266     {
267         // no locking of ca.m since only this thread changes ca.xid
268         lock ml(m_);
269         calls_.erase(ca.xid);
270         // may need to update the xid again here, in case the
271         // packet times out before it's even sent by the channel.
272         // I don't think there's any harm in maybe doing it twice
273         update_xid_rep(ca.xid);
274
275         if(destroy_wait_){
276           destroy_wait_c_.notify_one();
277         }
278     }
279
280     if (ca.done && lossytest_)
281     {
282         lock ml(m_);
283         if (!dup_req_.isvalid()) {
284             dup_req_.buf.assign(req.cstr(), req.size());
285             dup_req_.xid = ca.xid;
286         }
287         if (xid_rep > xid_rep_done_)
288             xid_rep_done_ = xid_rep;
289     }
290
291     lock cal(ca.m);
292
293     jsl_log(JSL_DBG_2,
294             "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
295             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296             ntohs(dst_.sin_port), ca.done, ca.intret);
297
298     if(ch)
299         ch->decref();
300
301     // destruction of req automatically frees its buffer
302     return (ca.done? ca.intret : rpc_const::timeout_failure);
303 }
304
305 void
306 rpcc::get_refconn(connection **ch)
307 {
308     lock ml(chan_m_);
309     if(!chan_ || chan_->isdead()){
310         if(chan_)
311             chan_->decref();
312         chan_ = connect_to_dst(dst_, this, lossytest_);
313     }
314     if(ch && chan_){
315         if(*ch){
316             (*ch)->decref();
317         }
318         *ch = chan_;
319         (*ch)->incref();
320     }
321 }
322
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
326 //
327 // this function keeps no reference for connection *c
328 bool
329 rpcc::got_pdu(connection *c, char *b, int sz)
330 {
331     unmarshall rep(b, sz);
332     reply_header h;
333     rep.unpack_reply_header(&h);
334
335     if(!rep.ok()){
336         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
337         return true;
338     }
339
340     lock ml(m_);
341
342     update_xid_rep(h.xid);
343
344     if(calls_.find(h.xid) == calls_.end()){
345         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
346         return true;
347     }
348     caller *ca = calls_[h.xid];
349
350     lock cl(ca->m);
351     if(!ca->done){
352         ca->un->take_in(rep);
353         ca->intret = h.ret;
354         if(ca->intret < 0){
355             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
356                     h.xid, ca->intret);
357         }
358         ca->done = 1;
359     }
360     ca->c.notify_all();
361     return true;
362 }
363
364 // assumes thread holds mutex m
365 void
366 rpcc::update_xid_rep(unsigned int xid)
367 {
368     std::list<unsigned int>::iterator it;
369
370     if(xid <= xid_rep_window_.front()){
371         return;
372     }
373
374     for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
375         if(*it > xid){
376             xid_rep_window_.insert(it, xid);
377             goto compress;
378         }
379     }
380     xid_rep_window_.push_back(xid);
381
382 compress:
383     it = xid_rep_window_.begin();
384     for (it++; it != xid_rep_window_.end(); it++){
385         while (xid_rep_window_.front() + 1 == *it)
386             xid_rep_window_.pop_front();
387     }
388 }
389
390 rpcs::rpcs(unsigned int p1, int count)
391   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
392 {
393     set_rand_seed();
394     nonce_ = random();
395     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
396
397     char *loss_env = getenv("RPC_LOSSY");
398     if(loss_env != NULL){
399         lossytest_ = atoi(loss_env);
400     }
401
402     reg(rpc_const::bind, &rpcs::rpcbind, this);
403     dispatchpool_ = new ThrPool(6,false);
404
405     listener_ = new tcpsconn(this, port_, lossytest_);
406 }
407
408 rpcs::~rpcs()
409 {
410     // must delete listener before dispatchpool
411     delete listener_;
412     delete dispatchpool_;
413     free_reply_window();
414 }
415
416 bool
417 rpcs::got_pdu(connection *c, char *b, int sz)
418 {
419         if(!reachable_){
420             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
421             return true;
422         }
423
424     djob_t *j = new djob_t(c, b, sz);
425     c->incref();
426     bool succ = dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, j));
427     if(!succ || !reachable_){
428         c->decref();
429         delete j;
430     }
431     return succ;
432 }
433
434 void
435 rpcs::reg1(unsigned int proc, handler *h)
436 {
437     lock pl(procs_m_);
438     VERIFY(procs_.count(proc) == 0);
439     procs_[proc] = h;
440     VERIFY(procs_.count(proc) >= 1);
441 }
442
443 void
444 rpcs::updatestat(unsigned int proc)
445 {
446     lock cl(count_m_);
447     counts_[proc]++;
448     curr_counts_--;
449     if(curr_counts_ == 0){
450         std::map<int, int>::iterator i;
451         printf("RPC STATS: ");
452         for (i = counts_.begin(); i != counts_.end(); i++){
453             printf("%x:%d ", i->first, i->second);
454         }
455         printf("\n");
456
457         lock rwl(reply_window_m_);
458         std::map<unsigned int,std::list<reply_t> >::iterator clt;
459
460         unsigned int totalrep = 0, maxrep = 0;
461         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
462             totalrep += clt->second.size();
463             if(clt->second.size() > maxrep)
464                 maxrep = clt->second.size();
465         }
466         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
467                         (int) reply_window_.size()-1, totalrep, maxrep);
468         curr_counts_ = counting_;
469     }
470 }
471
472 void
473 rpcs::dispatch(djob_t *j)
474 {
475     connection *c = j->conn;
476     unmarshall req(j->buf, j->sz);
477     delete j;
478
479     req_header h;
480     req.unpack_req_header(&h);
481     int proc = h.proc;
482
483     if(!req.ok()){
484         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
485         c->decref();
486         return;
487     }
488
489     jsl_log(JSL_DBG_2,
490             "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
491             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
492
493     marshall rep;
494     reply_header rh(h.xid,0);
495
496     // is client sending to an old instance of server?
497     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
498         jsl_log(JSL_DBG_2,
499                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
500                 h.srv_nonce, nonce_, h.proc);
501         rh.ret = rpc_const::oldsrv_failure;
502         rep.pack_reply_header(rh);
503         c->send(rep.cstr(),rep.size());
504         return;
505     }
506
507     handler *f;
508     // is RPC proc a registered procedure?
509     {
510         lock pl(procs_m_);
511         if(procs_.count(proc) < 1){
512             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
513                 proc);
514             c->decref();
515                         VERIFY(0);
516             return;
517         }
518
519         f = procs_[proc];
520     }
521
522     rpcs::rpcstate_t stat;
523     char *b1;
524     int sz1;
525
526     if(h.clt_nonce){
527         // have i seen this client before?
528         {
529             lock rwl(reply_window_m_);
530             // if we don't know about this clt_nonce, create a cleanup object
531             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
532                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
533                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
534                 jsl_log(JSL_DBG_2,
535                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
536                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
537             }
538         }
539
540         // save the latest good connection to the client
541         {
542             lock rwl(conss_m_);
543             if(conns_.find(h.clt_nonce) == conns_.end()){
544                 c->incref();
545                 conns_[h.clt_nonce] = c;
546             } else if(conns_[h.clt_nonce]->compare(c) < 0){
547                 conns_[h.clt_nonce]->decref();
548                 c->incref();
549                 conns_[h.clt_nonce] = c;
550             }
551         }
552
553         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
554                                                  h.xid_rep, &b1, &sz1);
555     } else {
556         // this client does not require at most once logic
557         stat = NEW;
558     }
559
560     switch (stat){
561         case NEW: // new request
562             if(counting_){
563                 updatestat(proc);
564             }
565
566             rh.ret = (*f)(req, rep);
567             if (rh.ret == rpc_const::unmarshal_args_failure) {
568                 fprintf(stderr, "rpcs::dispatch: failed to"
569                         " unmarshall the arguments. You are"
570                         " probably calling RPC 0x%x with wrong"
571                         " types of arguments.\n", proc);
572                 VERIFY(0);
573             }
574             VERIFY(rh.ret >= 0);
575
576             rep.pack_reply_header(rh);
577             rep.take_buf(&b1,&sz1);
578
579             jsl_log(JSL_DBG_2,
580                     "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
581                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
582
583             if(h.clt_nonce > 0){
584                 // only record replies for clients that require at-most-once logic
585                 add_reply(h.clt_nonce, h.xid, b1, sz1);
586             }
587
588             // get the latest connection to the client
589             {
590                 lock rwl(conss_m_);
591                 if(c->isdead() && c != conns_[h.clt_nonce]){
592                     c->decref();
593                     c = conns_[h.clt_nonce];
594                     c->incref();
595                 }
596             }
597
598             c->send(b1, sz1);
599             if(h.clt_nonce == 0){
600                 // reply is not added to at-most-once window, free it
601                 free(b1);
602             }
603             break;
604         case INPROGRESS: // server is working on this request
605             break;
606         case DONE: // duplicate and we still have the response
607             c->send(b1, sz1);
608             break;
609         case FORGOTTEN: // very old request and we don't have the response anymore
610             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
611                     h.xid, h.clt_nonce);
612             rh.ret = rpc_const::atmostonce_failure;
613             rep.pack_reply_header(rh);
614             c->send(rep.cstr(),rep.size());
615             break;
616     }
617     c->decref();
618 }
619
620 // rpcs::dispatch calls this when an RPC request arrives.
621 //
622 // checks to see if an RPC with xid from clt_nonce has already been received.
623 // if not, remembers the request in reply_window_.
624 //
625 // deletes remembered requests with XIDs <= xid_rep; the client
626 // says it has received a reply for every RPC up through xid_rep.
627 // frees the reply_t::buf of each such request.
628 //
629 // returns one of:
630 //   NEW: never seen this xid before.
631 //   INPROGRESS: seen this xid, and still processing it.
632 //   DONE: seen this xid, previous reply returned in *b and *sz.
633 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
634 rpcs::rpcstate_t
635 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
636         unsigned int xid_rep, char **b, int *sz)
637 {
638     lock rwl(reply_window_m_);
639
640     std::list<reply_t> &l = reply_window_[clt_nonce];
641
642     VERIFY(l.size() > 0);
643     VERIFY(xid >= xid_rep);
644
645     unsigned int past_xid_rep = l.begin()->xid;
646
647     std::list<reply_t>::iterator start = l.begin(), it;
648     it = ++start;
649
650     if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
651         // scan for deletion candidates
652         for (; it != l.end() && it->xid < xid_rep; it++) {
653             if (it->cb_present)
654                 free(it->buf);
655         }
656         l.erase(start, it);
657         l.begin()->xid = xid_rep;
658     }
659
660     if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
661         return FORGOTTEN;
662
663     // skip non-deletion candidates
664     while (it != l.end() && it->xid < xid)
665         it++;
666
667     // if it's in the list it must be right here
668     if (it != l.end() && it->xid == xid) {
669         if (it->cb_present) {
670             // return information about the remembered reply
671             *b = it->buf;
672             *sz = it->sz;
673             return DONE;
674         } else {
675             return INPROGRESS;
676         }
677     } else {
678         // remember that a new request has arrived
679         l.insert(it, reply_t(xid));
680         return NEW;
681     }
682 }
683
684 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
685 // and passes the return value in b and sz.
686 // add_reply() should remember b and sz.
687 // free_reply_window() and checkduplicate_and_update is responsible for
688 // calling free(b).
689 void
690 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
691         char *b, int sz)
692 {
693     lock rwl(reply_window_m_);
694     // remember the RPC reply value
695     std::list<reply_t> &l = reply_window_[clt_nonce];
696     std::list<reply_t>::iterator it = l.begin();
697     // skip to our place in the list
698     for (it++; it != l.end() && it->xid < xid; it++);
699     // there should already be an entry, so whine if there isn't
700     if (it == l.end() || it->xid != xid) {
701         fprintf(stderr, "Could not find reply struct in add_reply");
702         l.insert(it, reply_t(xid, b, sz));
703     } else {
704         *it = reply_t(xid, b, sz);
705     }
706 }
707
708 void
709 rpcs::free_reply_window(void)
710 {
711     std::map<unsigned int,std::list<reply_t> >::iterator clt;
712     std::list<reply_t>::iterator it;
713
714     lock rwl(reply_window_m_);
715     for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
716         for (it = clt->second.begin(); it != clt->second.end(); it++){
717             if (it->cb_present)
718                 free(it->buf);
719         }
720         clt->second.clear();
721     }
722     reply_window_.clear();
723 }
724
725 // rpc handler
726 int
727 rpcs::rpcbind(int &r, int a)
728 {
729     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
730     r = nonce_;
731     return 0;
732 }
733
734 void
735 marshall::rawbyte(unsigned char x)
736 {
737     if(_ind >= _capa){
738         _capa *= 2;
739         VERIFY (_buf != NULL);
740         _buf = (char *)realloc(_buf, _capa);
741         VERIFY(_buf);
742     }
743     _buf[_ind++] = x;
744 }
745
746 void
747 marshall::rawbytes(const char *p, int n)
748 {
749     if((_ind+n) > _capa){
750         _capa = _capa > n? 2*_capa:(_capa+n);
751         VERIFY (_buf != NULL);
752         _buf = (char *)realloc(_buf, _capa);
753         VERIFY(_buf);
754     }
755     memcpy(_buf+_ind, p, n);
756     _ind += n;
757 }
758
759 marshall &
760 operator<<(marshall &m, bool x)
761 {
762     m.rawbyte(x);
763     return m;
764 }
765
766 marshall &
767 operator<<(marshall &m, unsigned char x)
768 {
769     m.rawbyte(x);
770     return m;
771 }
772
773 marshall &
774 operator<<(marshall &m, char x)
775 {
776     m << (unsigned char) x;
777     return m;
778 }
779
780
781 marshall &
782 operator<<(marshall &m, unsigned short x)
783 {
784     m.rawbyte((x >> 8) & 0xff);
785     m.rawbyte(x & 0xff);
786     return m;
787 }
788
789 marshall &
790 operator<<(marshall &m, short x)
791 {
792     m << (unsigned short) x;
793     return m;
794 }
795
796 marshall &
797 operator<<(marshall &m, unsigned int x)
798 {
799     // network order is big-endian
800     m.rawbyte((x >> 24) & 0xff);
801     m.rawbyte((x >> 16) & 0xff);
802     m.rawbyte((x >> 8) & 0xff);
803     m.rawbyte(x & 0xff);
804     return m;
805 }
806
807 marshall &
808 operator<<(marshall &m, int x)
809 {
810     m << (unsigned int) x;
811     return m;
812 }
813
814 marshall &
815 operator<<(marshall &m, const std::string &s)
816 {
817     m << (unsigned int) s.size();
818     m.rawbytes(s.data(), s.size());
819     return m;
820 }
821
822 marshall &
823 operator<<(marshall &m, unsigned long long x)
824 {
825     m << (unsigned int) (x >> 32);
826     m << (unsigned int) x;
827     return m;
828 }
829
830 void
831 marshall::pack(int x)
832 {
833     rawbyte((x >> 24) & 0xff);
834     rawbyte((x >> 16) & 0xff);
835     rawbyte((x >> 8) & 0xff);
836     rawbyte(x & 0xff);
837 }
838
839 void
840 unmarshall::unpack(int *x)
841 {
842     (*x) = (rawbyte() & 0xff) << 24;
843     (*x) |= (rawbyte() & 0xff) << 16;
844     (*x) |= (rawbyte() & 0xff) << 8;
845     (*x) |= rawbyte() & 0xff;
846 }
847
848 // take the contents from another unmarshall object
849 void
850 unmarshall::take_in(unmarshall &another)
851 {
852     if(_buf)
853         free(_buf);
854     another.take_buf(&_buf, &_sz);
855     _ind = RPC_HEADER_SZ;
856     _ok = _sz >= RPC_HEADER_SZ?true:false;
857 }
858
859 bool
860 unmarshall::okdone()
861 {
862     if(ok() && _ind == _sz){
863         return true;
864     } else {
865         return false;
866     }
867 }
868
869 unsigned int
870 unmarshall::rawbyte()
871 {
872     char c = 0;
873     if(_ind >= _sz)
874         _ok = false;
875     else
876         c = _buf[_ind++];
877     return c;
878 }
879
880 unmarshall &
881 operator>>(unmarshall &u, bool &x)
882 {
883     x = (bool) u.rawbyte() ;
884     return u;
885 }
886
887 unmarshall &
888 operator>>(unmarshall &u, unsigned char &x)
889 {
890     x = (unsigned char) u.rawbyte() ;
891     return u;
892 }
893
894 unmarshall &
895 operator>>(unmarshall &u, char &x)
896 {
897     x = (char) u.rawbyte();
898     return u;
899 }
900
901
902 unmarshall &
903 operator>>(unmarshall &u, unsigned short &x)
904 {
905     x = (u.rawbyte() & 0xff) << 8;
906     x |= u.rawbyte() & 0xff;
907     return u;
908 }
909
910 unmarshall &
911 operator>>(unmarshall &u, short &x)
912 {
913     x = (u.rawbyte() & 0xff) << 8;
914     x |= u.rawbyte() & 0xff;
915     return u;
916 }
917
918 unmarshall &
919 operator>>(unmarshall &u, unsigned int &x)
920 {
921     x = (u.rawbyte() & 0xff) << 24;
922     x |= (u.rawbyte() & 0xff) << 16;
923     x |= (u.rawbyte() & 0xff) << 8;
924     x |= u.rawbyte() & 0xff;
925     return u;
926 }
927
928 unmarshall &
929 operator>>(unmarshall &u, int &x)
930 {
931     x = (u.rawbyte() & 0xff) << 24;
932     x |= (u.rawbyte() & 0xff) << 16;
933     x |= (u.rawbyte() & 0xff) << 8;
934     x |= u.rawbyte() & 0xff;
935     return u;
936 }
937
938 unmarshall &
939 operator>>(unmarshall &u, unsigned long long &x)
940 {
941     unsigned int h, l;
942     u >> h;
943     u >> l;
944     x = l | ((unsigned long long) h << 32);
945     return u;
946 }
947
948 unmarshall &
949 operator>>(unmarshall &u, std::string &s)
950 {
951     unsigned sz;
952     u >> sz;
953     if(u.ok())
954         u.rawbytes(s, sz);
955     return u;
956 }
957
958 void
959 unmarshall::rawbytes(std::string &ss, unsigned int n)
960 {
961     if((_ind+n) > (unsigned)_sz){
962         _ok = false;
963     } else {
964         std::string tmps = std::string(_buf+_ind, n);
965         swap(ss, tmps);
966         VERIFY(ss.size() == n);
967         _ind += n;
968     }
969 }
970
971 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
972     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
973             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
974              ((a.sin_port < b.sin_port))));
975 }
976
977 /*---------------auxilary function--------------*/
978 void
979 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
980
981     char host[200];
982     const char *localhost = "127.0.0.1";
983     const char *port = index(hostandport, ':');
984     if(port == NULL){
985         memcpy(host, localhost, strlen(localhost)+1);
986         port = hostandport;
987     } else {
988         memcpy(host, hostandport, port-hostandport);
989         host[port-hostandport] = '\0';
990         port++;
991     }
992
993     make_sockaddr(host, port, dst);
994
995 }
996
997 void
998 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
999
1000     in_addr_t a;
1001
1002     bzero(dst, sizeof(*dst));
1003     dst->sin_family = AF_INET;
1004
1005     a = inet_addr(host);
1006     if(a != INADDR_NONE){
1007         dst->sin_addr.s_addr = a;
1008     } else {
1009         struct hostent *hp = gethostbyname(host);
1010         if(hp == 0 || hp->h_length != 4){
1011             fprintf(stderr, "cannot find host name %s\n", host);
1012             exit(1);
1013         }
1014         dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1015     }
1016     dst->sin_port = htons(atoi(port));
1017 }