Major clean-ups. Migrating to C++11.
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  This version of the RPC library explicitly joins exited threads to make sure
42  no outstanding references exist before deleting objects.
43
44  To delete a rpcc object safely, the users of the library must ensure that
45  there are no outstanding calls on the rpcc object.
46
47  To delete a rpcs object safely, we do the following in sequence: 1. stop
48  accepting new incoming connections. 2. close existing active connections.
49  3.  delete the dispatch thread pool which involves waiting for current active
50  RPC handlers to finish.  It is interesting how a thread pool can be deleted
51  without using thread cancellation. The trick is to inject x "poison pills" for
52  a thread pool of x threads. Upon getting a poison pill instead of a normal
53  task, a worker thread will exit (and thread pool destructor waits to join all
54  x exited worker threads).
55  */
56
57 #include "rpc.h"
58
59 #include <sys/types.h>
60 #include <arpa/inet.h>
61 #include <netinet/tcp.h>
62 #include <netdb.h>
63 #include <unistd.h>
64 #include "lock.h"
65
66 #include "jsl_log.h"
67 #include "lang/verify.h"
68
69 const rpcc::TO rpcc::to_max = { 120000 };
70 const rpcc::TO rpcc::to_min = { 1000 };
71
72 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
73 : xid(xxid), un(xun), done(false)
74 {
75 }
76
77 rpcc::caller::~caller()
78 {
79 }
80
81 inline
82 void set_rand_seed()
83 {
84     auto now = std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now());
85     srandom((int)now.time_since_epoch().count()^((int)getpid()));
86 }
87
88 rpcc::rpcc(sockaddr_in d, bool retrans) :
89     dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
90     retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
91 {
92     if(retrans){
93         set_rand_seed();
94         clt_nonce_ = random();
95     } else {
96         // special client nonce 0 means this client does not
97         // require at-most-once logic from the server
98         // because it uses tcp and never retries a failed connection
99         clt_nonce_ = 0;
100     }
101
102     char *loss_env = getenv("RPC_LOSSY");
103     if(loss_env != NULL){
104         lossytest_ = atoi(loss_env);
105     }
106
107     // xid starts with 1 and latest received reply starts with 0
108     xid_rep_window_.push_back(0);
109
110     jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
111             clt_nonce_, lossytest_);
112 }
113
114 // IMPORTANT: destruction should happen only when no external threads
115 // are blocked inside rpcc or will use rpcc in the future
116 rpcc::~rpcc()
117 {
118     jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
119             clt_nonce_, chan_?chan_->channo():-1);
120     if(chan_){
121         chan_->closeconn();
122         chan_->decref();
123     }
124     VERIFY(calls_.size() == 0);
125 }
126
127 int
128 rpcc::bind(TO to)
129 {
130     int r;
131     int ret = call(rpc_const::bind, 0, r, to);
132     if(ret == 0){
133         lock ml(m_);
134         bind_done_ = true;
135         srv_nonce_ = r;
136     } else {
137         jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
138                 inet_ntoa(dst_.sin_addr), ret);
139     }
140     return ret;
141 };
142
143 // Cancel all outstanding calls
144     void
145 rpcc::cancel(void)
146 {
147     lock ml(m_);
148     printf("rpcc::cancel: force callers to fail\n");
149     std::map<int,caller*>::iterator iter;
150     for(iter = calls_.begin(); iter != calls_.end(); iter++){
151         caller *ca = iter->second;
152
153         jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
154         {
155             lock cl(ca->m);
156             ca->done = true;
157             ca->intret = rpc_const::cancel_failure;
158             ca->c.notify_one();
159         }
160     }
161
162     while (calls_.size () > 0){
163         destroy_wait_ = true;
164         destroy_wait_c_.wait(ml);
165     }
166     printf("rpcc::cancel: done\n");
167 }
168
169 int
170 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
171         TO to)
172 {
173
174     caller ca(0, &rep);
175         int xid_rep;
176     {
177         lock ml(m_);
178
179         if((proc != rpc_const::bind && !bind_done_) ||
180                 (proc == rpc_const::bind && bind_done_)){
181             jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
182             return rpc_const::bind_failure;
183         }
184
185         if(destroy_wait_){
186           return rpc_const::cancel_failure;
187         }
188
189         ca.xid = xid_++;
190         calls_[ca.xid] = &ca;
191
192         req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
193                              xid_rep_window_.front());
194         req.pack_req_header(h);
195                 xid_rep = xid_rep_window_.front();
196     }
197
198     TO curr_to;
199     std::chrono::time_point<std::chrono::steady_clock> finaldeadline =
200         std::chrono::steady_clock::now() +
201         std::chrono::milliseconds(to.to),
202         nextdeadline;
203
204     curr_to.to = to_min.to;
205
206     bool transmit = true;
207     connection *ch = NULL;
208
209     while (1){
210         if(transmit){
211             get_refconn(&ch);
212             if(ch){
213                 if(reachable_) {
214                     request forgot;
215                     {
216                         lock ml(m_);
217                         if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
218                             forgot = dup_req_;
219                             dup_req_.clear();
220                         }
221                     }
222                     if (forgot.isvalid())
223                         ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
224                     ch->send(req.cstr(), req.size());
225                 }
226                 else jsl_log(JSL_DBG_1, "not reachable\n");
227                 jsl_log(JSL_DBG_2,
228                         "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
229                         clt_nonce_, proc, ca.xid, clt_nonce_);
230             }
231             transmit = false; // only send once on a given channel
232         }
233
234         if(finaldeadline == std::chrono::time_point<std::chrono::steady_clock>::min())
235             break;
236
237         nextdeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(curr_to.to);
238         if(nextdeadline > finaldeadline) {
239             nextdeadline = finaldeadline;
240             finaldeadline = std::chrono::time_point<std::chrono::steady_clock>::min();
241         }
242
243         {
244             lock cal(ca.m);
245             while (!ca.done){
246                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
247                 if(ca.c.wait_until(cal, nextdeadline) == std::cv_status::timeout){
248                     jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
249                     break;
250                 }
251             }
252             if(ca.done){
253                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
254                 break;
255             }
256         }
257
258         if(retrans_ && (!ch || ch->isdead())){
259             // since connection is dead, retransmit
260             // on the new connection
261             transmit = true;
262         }
263         curr_to.to <<= 1;
264     }
265
266     {
267         // no locking of ca.m since only this thread changes ca.xid
268         lock ml(m_);
269         calls_.erase(ca.xid);
270         // may need to update the xid again here, in case the
271         // packet times out before it's even sent by the channel.
272         // I don't think there's any harm in maybe doing it twice
273         update_xid_rep(ca.xid);
274
275         if(destroy_wait_){
276           destroy_wait_c_.notify_one();
277         }
278     }
279
280     if (ca.done && lossytest_)
281     {
282         lock ml(m_);
283         if (!dup_req_.isvalid()) {
284             dup_req_.buf.assign(req.cstr(), req.size());
285             dup_req_.xid = ca.xid;
286         }
287         if (xid_rep > xid_rep_done_)
288             xid_rep_done_ = xid_rep;
289     }
290
291     lock cal(ca.m);
292
293     jsl_log(JSL_DBG_2,
294             "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
295             clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
296             ntohs(dst_.sin_port), ca.done, ca.intret);
297
298     if(ch)
299         ch->decref();
300
301     // destruction of req automatically frees its buffer
302     return (ca.done? ca.intret : rpc_const::timeout_failure);
303 }
304
305 void
306 rpcc::get_refconn(connection **ch)
307 {
308     lock ml(chan_m_);
309     if(!chan_ || chan_->isdead()){
310         if(chan_)
311             chan_->decref();
312         chan_ = connect_to_dst(dst_, this, lossytest_);
313     }
314     if(ch && chan_){
315         if(*ch){
316             (*ch)->decref();
317         }
318         *ch = chan_;
319         (*ch)->incref();
320     }
321 }
322
323 // PollMgr's thread is being used to
324 // make this upcall from connection object to rpcc.
325 // this funtion must not block.
326 //
327 // this function keeps no reference for connection *c
328 bool
329 rpcc::got_pdu(connection *c, char *b, int sz)
330 {
331     unmarshall rep(b, sz);
332     reply_header h;
333     rep.unpack_reply_header(&h);
334
335     if(!rep.ok()){
336         jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
337         return true;
338     }
339
340     lock ml(m_);
341
342     update_xid_rep(h.xid);
343
344     if(calls_.find(h.xid) == calls_.end()){
345         jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
346         return true;
347     }
348     caller *ca = calls_[h.xid];
349
350     lock cl(ca->m);
351     if(!ca->done){
352         ca->un->take_in(rep);
353         ca->intret = h.ret;
354         if(ca->intret < 0){
355             jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
356                     h.xid, ca->intret);
357         }
358         ca->done = 1;
359     }
360     ca->c.notify_all();
361     return true;
362 }
363
364 // assumes thread holds mutex m
365 void
366 rpcc::update_xid_rep(unsigned int xid)
367 {
368     std::list<unsigned int>::iterator it;
369
370     if(xid <= xid_rep_window_.front()){
371         return;
372     }
373
374     for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
375         if(*it > xid){
376             xid_rep_window_.insert(it, xid);
377             goto compress;
378         }
379     }
380     xid_rep_window_.push_back(xid);
381
382 compress:
383     it = xid_rep_window_.begin();
384     for (it++; it != xid_rep_window_.end(); it++){
385         while (xid_rep_window_.front() + 1 == *it)
386             xid_rep_window_.pop_front();
387     }
388 }
389
390
391 rpcs::rpcs(unsigned int p1, int count)
392   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
393 {
394     set_rand_seed();
395     nonce_ = random();
396     jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
397
398     char *loss_env = getenv("RPC_LOSSY");
399     if(loss_env != NULL){
400         lossytest_ = atoi(loss_env);
401     }
402
403     reg(rpc_const::bind, this, &rpcs::rpcbind);
404     dispatchpool_ = new ThrPool(6,false);
405
406     listener_ = new tcpsconn(this, port_, lossytest_);
407 }
408
409 rpcs::~rpcs()
410 {
411     // must delete listener before dispatchpool
412     delete listener_;
413     delete dispatchpool_;
414     free_reply_window();
415 }
416
417 bool
418 rpcs::got_pdu(connection *c, char *b, int sz)
419 {
420         if(!reachable_){
421             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
422             return true;
423         }
424
425     djob_t *j = new djob_t(c, b, sz);
426     c->incref();
427     bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
428     if(!succ || !reachable_){
429         c->decref();
430         delete j;
431     }
432     return succ;
433 }
434
435 void
436 rpcs::reg1(unsigned int proc, handler *h)
437 {
438     lock pl(procs_m_);
439     VERIFY(procs_.count(proc) == 0);
440     procs_[proc] = h;
441     VERIFY(procs_.count(proc) >= 1);
442 }
443
444 void
445 rpcs::updatestat(unsigned int proc)
446 {
447     lock cl(count_m_);
448     counts_[proc]++;
449     curr_counts_--;
450     if(curr_counts_ == 0){
451         std::map<int, int>::iterator i;
452         printf("RPC STATS: ");
453         for (i = counts_.begin(); i != counts_.end(); i++){
454             printf("%x:%d ", i->first, i->second);
455         }
456         printf("\n");
457
458         lock rwl(reply_window_m_);
459         std::map<unsigned int,std::list<reply_t> >::iterator clt;
460
461         unsigned int totalrep = 0, maxrep = 0;
462         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
463             totalrep += clt->second.size();
464             if(clt->second.size() > maxrep)
465                 maxrep = clt->second.size();
466         }
467         jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
468                         (int) reply_window_.size()-1, totalrep, maxrep);
469         curr_counts_ = counting_;
470     }
471 }
472
473 void
474 rpcs::dispatch(djob_t *j)
475 {
476     connection *c = j->conn;
477     unmarshall req(j->buf, j->sz);
478     delete j;
479
480     req_header h;
481     req.unpack_req_header(&h);
482     int proc = h.proc;
483
484     if(!req.ok()){
485         jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
486         c->decref();
487         return;
488     }
489
490     jsl_log(JSL_DBG_2,
491             "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
492             h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
493
494     marshall rep;
495     reply_header rh(h.xid,0);
496
497     // is client sending to an old instance of server?
498     if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
499         jsl_log(JSL_DBG_2,
500                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
501                 h.srv_nonce, nonce_, h.proc);
502         rh.ret = rpc_const::oldsrv_failure;
503         rep.pack_reply_header(rh);
504         c->send(rep.cstr(),rep.size());
505         return;
506     }
507
508     handler *f;
509     // is RPC proc a registered procedure?
510     {
511         lock pl(procs_m_);
512         if(procs_.count(proc) < 1){
513             fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
514                 proc);
515             c->decref();
516                         VERIFY(0);
517             return;
518         }
519
520         f = procs_[proc];
521     }
522
523     rpcs::rpcstate_t stat;
524     char *b1;
525     int sz1;
526
527     if(h.clt_nonce){
528         // have i seen this client before?
529         {
530             lock rwl(reply_window_m_);
531             // if we don't know about this clt_nonce, create a cleanup object
532             if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
533                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
534                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
535                 jsl_log(JSL_DBG_2,
536                         "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
537                         h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
538             }
539         }
540
541         // save the latest good connection to the client
542         {
543             lock rwl(conss_m_);
544             if(conns_.find(h.clt_nonce) == conns_.end()){
545                 c->incref();
546                 conns_[h.clt_nonce] = c;
547             } else if(conns_[h.clt_nonce]->compare(c) < 0){
548                 conns_[h.clt_nonce]->decref();
549                 c->incref();
550                 conns_[h.clt_nonce] = c;
551             }
552         }
553
554         stat = checkduplicate_and_update(h.clt_nonce, h.xid,
555                                                  h.xid_rep, &b1, &sz1);
556     } else {
557         // this client does not require at most once logic
558         stat = NEW;
559     }
560
561     switch (stat){
562         case NEW: // new request
563             if(counting_){
564                 updatestat(proc);
565             }
566
567             rh.ret = f->fn(req, rep);
568                         if (rh.ret == rpc_const::unmarshal_args_failure) {
569                                 fprintf(stderr, "rpcs::dispatch: failed to"
570                                        " unmarshall the arguments. You are"
571                                        " probably calling RPC 0x%x with wrong"
572                                        " types of arguments.\n", proc);
573                                 VERIFY(0);
574                         }
575             VERIFY(rh.ret >= 0);
576
577             rep.pack_reply_header(rh);
578             rep.take_buf(&b1,&sz1);
579
580             jsl_log(JSL_DBG_2,
581                     "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
582                     sz1, h.xid, proc, rh.ret, h.clt_nonce);
583
584             if(h.clt_nonce > 0){
585                 // only record replies for clients that require at-most-once logic
586                 add_reply(h.clt_nonce, h.xid, b1, sz1);
587             }
588
589             // get the latest connection to the client
590             {
591                 lock rwl(conss_m_);
592                 if(c->isdead() && c != conns_[h.clt_nonce]){
593                     c->decref();
594                     c = conns_[h.clt_nonce];
595                     c->incref();
596                 }
597             }
598
599             c->send(b1, sz1);
600             if(h.clt_nonce == 0){
601                 // reply is not added to at-most-once window, free it
602                 free(b1);
603             }
604             break;
605         case INPROGRESS: // server is working on this request
606             break;
607         case DONE: // duplicate and we still have the response
608             c->send(b1, sz1);
609             break;
610         case FORGOTTEN: // very old request and we don't have the response anymore
611             jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
612                     h.xid, h.clt_nonce);
613             rh.ret = rpc_const::atmostonce_failure;
614             rep.pack_reply_header(rh);
615             c->send(rep.cstr(),rep.size());
616             break;
617     }
618     c->decref();
619 }
620
621 // rpcs::dispatch calls this when an RPC request arrives.
622 //
623 // checks to see if an RPC with xid from clt_nonce has already been received.
624 // if not, remembers the request in reply_window_.
625 //
626 // deletes remembered requests with XIDs <= xid_rep; the client
627 // says it has received a reply for every RPC up through xid_rep.
628 // frees the reply_t::buf of each such request.
629 //
630 // returns one of:
631 //   NEW: never seen this xid before.
632 //   INPROGRESS: seen this xid, and still processing it.
633 //   DONE: seen this xid, previous reply returned in *b and *sz.
634 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
635 rpcs::rpcstate_t
636 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
637         unsigned int xid_rep, char **b, int *sz)
638 {
639     lock rwl(reply_window_m_);
640
641     std::list<reply_t> &l = reply_window_[clt_nonce];
642
643     VERIFY(l.size() > 0);
644     VERIFY(xid >= xid_rep);
645
646     unsigned int past_xid_rep = l.begin()->xid;
647
648     std::list<reply_t>::iterator start = l.begin(), it;
649     it = ++start;
650
651     if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
652         // scan for deletion candidates
653         for (; it != l.end() && it->xid < xid_rep; it++) {
654             if (it->cb_present)
655                 free(it->buf);
656         }
657         l.erase(start, it);
658         l.begin()->xid = xid_rep;
659     }
660
661     if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
662         return FORGOTTEN;
663
664     // skip non-deletion candidates
665     while (it != l.end() && it->xid < xid)
666         it++;
667
668     // if it's in the list it must be right here
669     if (it != l.end() && it->xid == xid) {
670         if (it->cb_present) {
671             // return information about the remembered reply
672             *b = it->buf;
673             *sz = it->sz;
674             return DONE;
675         } else {
676             return INPROGRESS;
677         }
678     } else {
679         // remember that a new request has arrived
680         l.insert(it, reply_t(xid));
681         return NEW;
682     }
683 }
684
685 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
686 // and passes the return value in b and sz.
687 // add_reply() should remember b and sz.
688 // free_reply_window() and checkduplicate_and_update is responsible for
689 // calling free(b).
690 void
691 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
692         char *b, int sz)
693 {
694     lock rwl(reply_window_m_);
695     // remember the RPC reply value
696     std::list<reply_t> &l = reply_window_[clt_nonce];
697     std::list<reply_t>::iterator it = l.begin();
698     // skip to our place in the list
699     for (it++; it != l.end() && it->xid < xid; it++);
700     // there should already be an entry, so whine if there isn't
701     if (it == l.end() || it->xid != xid) {
702         fprintf(stderr, "Could not find reply struct in add_reply");
703         l.insert(it, reply_t(xid, b, sz));
704     } else {
705         *it = reply_t(xid, b, sz);
706     }
707 }
708
709 void
710 rpcs::free_reply_window(void)
711 {
712     std::map<unsigned int,std::list<reply_t> >::iterator clt;
713     std::list<reply_t>::iterator it;
714
715     lock rwl(reply_window_m_);
716     for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
717         for (it = clt->second.begin(); it != clt->second.end(); it++){
718             if (it->cb_present)
719                 free(it->buf);
720         }
721         clt->second.clear();
722     }
723     reply_window_.clear();
724 }
725
726 // rpc handler
727 int
728 rpcs::rpcbind(int a, int &r)
729 {
730     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
731     r = nonce_;
732     return 0;
733 }
734
735 void
736 marshall::rawbyte(unsigned char x)
737 {
738     if(_ind >= _capa){
739         _capa *= 2;
740         VERIFY (_buf != NULL);
741         _buf = (char *)realloc(_buf, _capa);
742         VERIFY(_buf);
743     }
744     _buf[_ind++] = x;
745 }
746
747 void
748 marshall::rawbytes(const char *p, int n)
749 {
750     if((_ind+n) > _capa){
751         _capa = _capa > n? 2*_capa:(_capa+n);
752         VERIFY (_buf != NULL);
753         _buf = (char *)realloc(_buf, _capa);
754         VERIFY(_buf);
755     }
756     memcpy(_buf+_ind, p, n);
757     _ind += n;
758 }
759
760 marshall &
761 operator<<(marshall &m, bool x)
762 {
763     m.rawbyte(x);
764     return m;
765 }
766
767 marshall &
768 operator<<(marshall &m, unsigned char x)
769 {
770     m.rawbyte(x);
771     return m;
772 }
773
774 marshall &
775 operator<<(marshall &m, char x)
776 {
777     m << (unsigned char) x;
778     return m;
779 }
780
781
782 marshall &
783 operator<<(marshall &m, unsigned short x)
784 {
785     m.rawbyte((x >> 8) & 0xff);
786     m.rawbyte(x & 0xff);
787     return m;
788 }
789
790 marshall &
791 operator<<(marshall &m, short x)
792 {
793     m << (unsigned short) x;
794     return m;
795 }
796
797 marshall &
798 operator<<(marshall &m, unsigned int x)
799 {
800     // network order is big-endian
801     m.rawbyte((x >> 24) & 0xff);
802     m.rawbyte((x >> 16) & 0xff);
803     m.rawbyte((x >> 8) & 0xff);
804     m.rawbyte(x & 0xff);
805     return m;
806 }
807
808 marshall &
809 operator<<(marshall &m, int x)
810 {
811     m << (unsigned int) x;
812     return m;
813 }
814
815 marshall &
816 operator<<(marshall &m, const std::string &s)
817 {
818     m << (unsigned int) s.size();
819     m.rawbytes(s.data(), s.size());
820     return m;
821 }
822
823 marshall &
824 operator<<(marshall &m, unsigned long long x)
825 {
826     m << (unsigned int) (x >> 32);
827     m << (unsigned int) x;
828     return m;
829 }
830
831 void
832 marshall::pack(int x)
833 {
834     rawbyte((x >> 24) & 0xff);
835     rawbyte((x >> 16) & 0xff);
836     rawbyte((x >> 8) & 0xff);
837     rawbyte(x & 0xff);
838 }
839
840 void
841 unmarshall::unpack(int *x)
842 {
843     (*x) = (rawbyte() & 0xff) << 24;
844     (*x) |= (rawbyte() & 0xff) << 16;
845     (*x) |= (rawbyte() & 0xff) << 8;
846     (*x) |= rawbyte() & 0xff;
847 }
848
849 // take the contents from another unmarshall object
850 void
851 unmarshall::take_in(unmarshall &another)
852 {
853     if(_buf)
854         free(_buf);
855     another.take_buf(&_buf, &_sz);
856     _ind = RPC_HEADER_SZ;
857     _ok = _sz >= RPC_HEADER_SZ?true:false;
858 }
859
860 bool
861 unmarshall::okdone()
862 {
863     if(ok() && _ind == _sz){
864         return true;
865     } else {
866         return false;
867     }
868 }
869
870 unsigned int
871 unmarshall::rawbyte()
872 {
873     char c = 0;
874     if(_ind >= _sz)
875         _ok = false;
876     else
877         c = _buf[_ind++];
878     return c;
879 }
880
881 unmarshall &
882 operator>>(unmarshall &u, bool &x)
883 {
884     x = (bool) u.rawbyte() ;
885     return u;
886 }
887
888 unmarshall &
889 operator>>(unmarshall &u, unsigned char &x)
890 {
891     x = (unsigned char) u.rawbyte() ;
892     return u;
893 }
894
895 unmarshall &
896 operator>>(unmarshall &u, char &x)
897 {
898     x = (char) u.rawbyte();
899     return u;
900 }
901
902
903 unmarshall &
904 operator>>(unmarshall &u, unsigned short &x)
905 {
906     x = (u.rawbyte() & 0xff) << 8;
907     x |= u.rawbyte() & 0xff;
908     return u;
909 }
910
911 unmarshall &
912 operator>>(unmarshall &u, short &x)
913 {
914     x = (u.rawbyte() & 0xff) << 8;
915     x |= u.rawbyte() & 0xff;
916     return u;
917 }
918
919 unmarshall &
920 operator>>(unmarshall &u, unsigned int &x)
921 {
922     x = (u.rawbyte() & 0xff) << 24;
923     x |= (u.rawbyte() & 0xff) << 16;
924     x |= (u.rawbyte() & 0xff) << 8;
925     x |= u.rawbyte() & 0xff;
926     return u;
927 }
928
929 unmarshall &
930 operator>>(unmarshall &u, int &x)
931 {
932     x = (u.rawbyte() & 0xff) << 24;
933     x |= (u.rawbyte() & 0xff) << 16;
934     x |= (u.rawbyte() & 0xff) << 8;
935     x |= u.rawbyte() & 0xff;
936     return u;
937 }
938
939 unmarshall &
940 operator>>(unmarshall &u, unsigned long long &x)
941 {
942     unsigned int h, l;
943     u >> h;
944     u >> l;
945     x = l | ((unsigned long long) h << 32);
946     return u;
947 }
948
949 unmarshall &
950 operator>>(unmarshall &u, std::string &s)
951 {
952     unsigned sz;
953     u >> sz;
954     if(u.ok())
955         u.rawbytes(s, sz);
956     return u;
957 }
958
959 void
960 unmarshall::rawbytes(std::string &ss, unsigned int n)
961 {
962     if((_ind+n) > (unsigned)_sz){
963         _ok = false;
964     } else {
965         std::string tmps = std::string(_buf+_ind, n);
966         swap(ss, tmps);
967         VERIFY(ss.size() == n);
968         _ind += n;
969     }
970 }
971
972 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
973     return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
974             ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
975              ((a.sin_port < b.sin_port))));
976 }
977
978 /*---------------auxilary function--------------*/
979 void
980 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
981
982     char host[200];
983     const char *localhost = "127.0.0.1";
984     const char *port = index(hostandport, ':');
985     if(port == NULL){
986         memcpy(host, localhost, strlen(localhost)+1);
987         port = hostandport;
988     } else {
989         memcpy(host, hostandport, port-hostandport);
990         host[port-hostandport] = '\0';
991         port++;
992     }
993
994     make_sockaddr(host, port, dst);
995
996 }
997
998 void
999 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1000
1001     in_addr_t a;
1002
1003     bzero(dst, sizeof(*dst));
1004     dst->sin_family = AF_INET;
1005
1006     a = inet_addr(host);
1007     if(a != INADDR_NONE){
1008         dst->sin_addr.s_addr = a;
1009     } else {
1010         struct hostent *hp = gethostbyname(host);
1011         if(hp == 0 || hp->h_length != 4){
1012             fprintf(stderr, "cannot find host name %s\n", host);
1013             exit(1);
1014         }
1015         dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1016     }
1017     dst->sin_port = htons(atoi(port));
1018 }