e18506a537d2711d25b5698e918d17884e3fc3a6
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  The previous version of the RPC library uses pthread_cancel* routines 
42  to implement the deletion of rpcc and rpcs objects. The idea is to cancel 
43  all active threads that might be holding a reference to an object before 
44  deleting that object. However, pthread_cancel is not robust and there are
45  always bugs where outstanding references to deleted objects persist.
46  This version of the RPC library does not do pthread_cancel, but explicitly 
47  joins exited threads to make sure no outstanding references exist before 
48  deleting objects.
49
50  To delete a rpcc object safely, the users of the library must ensure that
51  there are no outstanding calls on the rpcc object.
52
53  To delete a rpcs object safely, we do the following in sequence: 1. stop
54  accepting new incoming connections. 2. close existing active connections.
55  3.  delete the dispatch thread pool which involves waiting for current active
56  RPC handlers to finish.  It is interesting how a thread pool can be deleted
57  without using thread cancellation. The trick is to inject x "poison pills" for
58  a thread pool of x threads. Upon getting a poison pill instead of a normal 
59  task, a worker thread will exit (and thread pool destructor waits to join all
60  x exited worker threads).
61  */
62
63 #include "rpc.h"
64 #include "method_thread.h"
65 #include "slock.h"
66
67 #include <sys/types.h>
68 #include <arpa/inet.h>
69 #include <netinet/tcp.h>
70 #include <time.h>
71 #include <netdb.h>
72
73 #include "jsl_log.h"
74 #include "gettime.h"
75 #include "lang/verify.h"
76
77 const rpcc::TO rpcc::to_max = { 120000 };
78 const rpcc::TO rpcc::to_min = { 1000 };
79
80 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
81 : xid(xxid), un(xun), done(false)
82 {
83         VERIFY(pthread_mutex_init(&m,0) == 0);
84         VERIFY(pthread_cond_init(&c, 0) == 0);
85 }
86
87 rpcc::caller::~caller()
88 {
89         VERIFY(pthread_mutex_destroy(&m) == 0);
90         VERIFY(pthread_cond_destroy(&c) == 0);
91 }
92
93 inline
94 void set_rand_seed()
95 {
96         struct timespec ts;
97         clock_gettime(CLOCK_REALTIME, &ts);
98         srandom((int)ts.tv_nsec^((int)getpid()));
99 }
100
101 rpcc::rpcc(sockaddr_in d, bool retrans) : 
102         dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), 
103         retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
104 {
105         VERIFY(pthread_mutex_init(&m_, 0) == 0);
106         VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
107         VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
108
109         if(retrans){
110                 set_rand_seed();
111                 clt_nonce_ = random();
112         } else {
113                 // special client nonce 0 means this client does not 
114                 // require at-most-once logic from the server
115                 // because it uses tcp and never retries a failed connection
116                 clt_nonce_ = 0;
117         }
118
119         char *loss_env = getenv("RPC_LOSSY");
120         if(loss_env != NULL){
121                 lossytest_ = atoi(loss_env);
122         }
123
124         // xid starts with 1 and latest received reply starts with 0
125         xid_rep_window_.push_back(0);
126
127         jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", 
128                         clt_nonce_, lossytest_); 
129 }
130
131 // IMPORTANT: destruction should happen only when no external threads
132 // are blocked inside rpcc or will use rpcc in the future
133 rpcc::~rpcc()
134 {
135         jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", 
136                         clt_nonce_, chan_?chan_->channo():-1); 
137         if(chan_){
138                 chan_->closeconn();
139                 chan_->decref();
140         }
141         VERIFY(calls_.size() == 0);
142         VERIFY(pthread_mutex_destroy(&m_) == 0);
143         VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
144 }
145
146 int
147 rpcc::bind(TO to)
148 {
149         int r;
150         int ret = call(rpc_const::bind, 0, r, to);
151         if(ret == 0){
152                 ScopedLock ml(&m_);
153                 bind_done_ = true;
154                 srv_nonce_ = r;
155         } else {
156                 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", 
157                                 inet_ntoa(dst_.sin_addr), ret);
158         }
159         return ret;
160 };
161
162 // Cancel all outstanding calls
163 void
164 rpcc::cancel(void)
165 {
166   ScopedLock ml(&m_);
167   printf("rpcc::cancel: force callers to fail\n");
168   std::map<int,caller*>::iterator iter;
169   for(iter = calls_.begin(); iter != calls_.end(); iter++){
170     caller *ca = iter->second;
171
172     jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
173     {
174       ScopedLock cl(&ca->m);
175       ca->done = true;
176       ca->intret = rpc_const::cancel_failure;
177       VERIFY(pthread_cond_signal(&ca->c) == 0);
178     }
179   }
180
181   while (calls_.size () > 0){
182     destroy_wait_ = true;
183     VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
184   }
185   printf("rpcc::cancel: done\n");
186 }
187
188 int
189 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
190                 TO to)
191 {
192
193         caller ca(0, &rep);
194         int xid_rep;
195         {
196                 ScopedLock ml(&m_);
197
198                 if((proc != rpc_const::bind && !bind_done_) ||
199                                 (proc == rpc_const::bind && bind_done_)){
200                         jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
201                         return rpc_const::bind_failure;
202                 }
203
204                 if(destroy_wait_){
205                   return rpc_const::cancel_failure;
206                 }
207
208                 ca.xid = xid_++;
209                 calls_[ca.xid] = &ca;
210
211                 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
212                              xid_rep_window_.front());
213                 req.pack_req_header(h);
214                 xid_rep = xid_rep_window_.front();
215         }
216
217         TO curr_to;
218         struct timespec now, nextdeadline, finaldeadline; 
219
220         clock_gettime(CLOCK_REALTIME, &now);
221         add_timespec(now, to.to, &finaldeadline); 
222         curr_to.to = to_min.to;
223
224         bool transmit = true;
225         connection *ch = NULL;
226
227         while (1){
228                 if(transmit){
229                         get_refconn(&ch);
230                         if(ch){
231                                 if(reachable_) {
232                                         request forgot;
233                                         {
234                                                 ScopedLock ml(&m_);
235                                                 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
236                                                         forgot = dup_req_;
237                                                         dup_req_.clear();
238                                                 }
239                                         }
240                                         if (forgot.isvalid()) 
241                                                 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
242                                         ch->send(req.cstr(), req.size());
243                                 }
244                                 else jsl_log(JSL_DBG_1, "not reachable\n");
245                                 jsl_log(JSL_DBG_2, 
246                                                 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", 
247                                                 clt_nonce_, proc, ca.xid, clt_nonce_); 
248                         }
249                         transmit = false; // only send once on a given channel
250                 }
251
252                 if(!finaldeadline.tv_sec)
253                         break;
254
255                 clock_gettime(CLOCK_REALTIME, &now);
256                 add_timespec(now, curr_to.to, &nextdeadline); 
257                 if(cmp_timespec(nextdeadline,finaldeadline) > 0){
258                         nextdeadline = finaldeadline;
259                         finaldeadline.tv_sec = 0;
260                 }
261
262                 {
263                         ScopedLock cal(&ca.m);
264                         while (!ca.done){
265                                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
266                                 if(pthread_cond_timedwait(&ca.c, &ca.m,
267                                                  &nextdeadline) == ETIMEDOUT){
268                                         jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
269                                         break;
270                                 }
271                         }
272                         if(ca.done){
273                                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
274                                 break;
275                         }
276                 }
277
278                 if(retrans_ && (!ch || ch->isdead())){
279                         // since connection is dead, retransmit
280                         // on the new connection 
281                         transmit = true; 
282                 }
283                 curr_to.to <<= 1;
284         }
285
286         { 
287                 // no locking of ca.m since only this thread changes ca.xid 
288                 ScopedLock ml(&m_);
289                 calls_.erase(ca.xid);
290                 // may need to update the xid again here, in case the
291                 // packet times out before it's even sent by the channel.
292                 // I don't think there's any harm in maybe doing it twice
293                 update_xid_rep(ca.xid);
294
295                 if(destroy_wait_){
296                   VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
297                 }
298         }
299
300         if (ca.done && lossytest_)
301         {
302                 ScopedLock ml(&m_);
303                 if (!dup_req_.isvalid()) {
304                         dup_req_.buf.assign(req.cstr(), req.size());
305                         dup_req_.xid = ca.xid;
306                 }
307                 if (xid_rep > xid_rep_done_)
308                         xid_rep_done_ = xid_rep;
309         }
310
311         ScopedLock cal(&ca.m);
312
313         jsl_log(JSL_DBG_2, 
314                         "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", 
315                         clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
316                         ntohs(dst_.sin_port), ca.done, ca.intret);
317
318         if(ch)
319                 ch->decref();
320
321         // destruction of req automatically frees its buffer
322         return (ca.done? ca.intret : rpc_const::timeout_failure);
323 }
324
325 void
326 rpcc::get_refconn(connection **ch)
327 {
328         ScopedLock ml(&chan_m_);
329         if(!chan_ || chan_->isdead()){
330                 if(chan_)
331                         chan_->decref();
332                 chan_ = connect_to_dst(dst_, this, lossytest_);
333         }
334         if(ch && chan_){
335                 if(*ch){
336                         (*ch)->decref();
337                 }
338                 *ch = chan_;
339                 (*ch)->incref();
340         }
341 }
342
343 // PollMgr's thread is being used to 
344 // make this upcall from connection object to rpcc. 
345 // this funtion must not block.
346 //
347 // this function keeps no reference for connection *c 
348 bool
349 rpcc::got_pdu(connection *c, char *b, int sz)
350 {
351         unmarshall rep(b, sz);
352         reply_header h;
353         rep.unpack_reply_header(&h);
354
355         if(!rep.ok()){
356                 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
357                 return true;
358         }
359
360         ScopedLock ml(&m_);
361
362         update_xid_rep(h.xid);
363
364         if(calls_.find(h.xid) == calls_.end()){
365                 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
366                 return true;
367         }
368         caller *ca = calls_[h.xid];
369
370         ScopedLock cl(&ca->m);
371         if(!ca->done){
372                 ca->un->take_in(rep);
373                 ca->intret = h.ret;
374                 if(ca->intret < 0){
375                         jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
376                                         h.xid, ca->intret);
377                 }
378                 ca->done = 1;
379         }
380         VERIFY(pthread_cond_broadcast(&ca->c) == 0);
381         return true;
382 }
383
384 // assumes thread holds mutex m
385 void 
386 rpcc::update_xid_rep(unsigned int xid)
387 {
388         std::list<unsigned int>::iterator it;
389
390         if(xid <= xid_rep_window_.front()){
391                 return;
392         }
393
394         for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
395                 if(*it > xid){
396                         xid_rep_window_.insert(it, xid);
397                         goto compress;
398                 }
399         }
400         xid_rep_window_.push_back(xid);
401
402 compress:
403         it = xid_rep_window_.begin();
404         for (it++; it != xid_rep_window_.end(); it++){
405                 while (xid_rep_window_.front() + 1 == *it)
406                         xid_rep_window_.pop_front();
407         }
408 }
409
410
411 rpcs::rpcs(unsigned int p1, int count)
412   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
413 {
414         VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
415         VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
416         VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
417         VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
418
419         set_rand_seed();
420         nonce_ = random();
421         jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
422
423         char *loss_env = getenv("RPC_LOSSY");
424         if(loss_env != NULL){
425                 lossytest_ = atoi(loss_env);
426         }
427
428         reg(rpc_const::bind, this, &rpcs::rpcbind);
429         dispatchpool_ = new ThrPool(6,false);
430
431         listener_ = new tcpsconn(this, port_, lossytest_);
432 }
433
434 rpcs::~rpcs()
435 {
436         // must delete listener before dispatchpool
437         delete listener_;
438         delete dispatchpool_;
439         free_reply_window();
440 }
441
442 bool
443 rpcs::got_pdu(connection *c, char *b, int sz)
444 {
445         if(!reachable_){
446             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
447             return true;
448         }
449
450         djob_t *j = new djob_t(c, b, sz);
451         c->incref();
452         bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
453         if(!succ || !reachable_){
454                 c->decref();
455                 delete j;
456         }
457         return succ; 
458 }
459
460 void
461 rpcs::reg1(unsigned int proc, handler *h)
462 {
463         ScopedLock pl(&procs_m_);
464         VERIFY(procs_.count(proc) == 0);
465         procs_[proc] = h;
466         VERIFY(procs_.count(proc) >= 1);
467 }
468
469 void
470 rpcs::updatestat(unsigned int proc)
471 {
472         ScopedLock cl(&count_m_);
473         counts_[proc]++;
474         curr_counts_--;
475         if(curr_counts_ == 0){
476                 std::map<int, int>::iterator i;
477                 printf("RPC STATS: ");
478                 for (i = counts_.begin(); i != counts_.end(); i++){
479                         printf("%x:%d ", i->first, i->second);
480                 }
481                 printf("\n");
482
483                 ScopedLock rwl(&reply_window_m_);
484                 std::map<unsigned int,std::list<reply_t> >::iterator clt;
485
486                 unsigned int totalrep = 0, maxrep = 0;
487                 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
488                         totalrep += clt->second.size();
489                         if(clt->second.size() > maxrep)
490                                 maxrep = clt->second.size();
491                 }
492                 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", 
493                         (int) reply_window_.size()-1, totalrep, maxrep);
494                 curr_counts_ = counting_;
495         }
496 }
497
498 void
499 rpcs::dispatch(djob_t *j)
500 {
501         connection *c = j->conn;
502         unmarshall req(j->buf, j->sz);
503         delete j;
504
505         req_header h;
506         req.unpack_req_header(&h);
507         int proc = h.proc;
508
509         if(!req.ok()){
510                 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
511                 c->decref();
512                 return;
513         }
514
515         jsl_log(JSL_DBG_2,
516                         "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
517                         h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
518
519         marshall rep;
520         reply_header rh(h.xid,0);
521
522         // is client sending to an old instance of server?
523         if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
524                 jsl_log(JSL_DBG_2,
525                                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
526                                 h.srv_nonce, nonce_, h.proc);
527                 rh.ret = rpc_const::oldsrv_failure;
528                 rep.pack_reply_header(rh);
529                 c->send(rep.cstr(),rep.size());
530                 return;
531         }
532
533         handler *f;
534         // is RPC proc a registered procedure?
535         {
536                 ScopedLock pl(&procs_m_);
537                 if(procs_.count(proc) < 1){
538                         fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
539                                 proc);
540                         c->decref();
541                         VERIFY(0);
542                         return;
543                 }
544
545                 f = procs_[proc];
546         }
547
548         rpcs::rpcstate_t stat;
549         char *b1;
550         int sz1;
551
552         if(h.clt_nonce){
553                 // have i seen this client before?
554                 {
555                         ScopedLock rwl(&reply_window_m_);
556                         // if we don't know about this clt_nonce, create a cleanup object
557                         if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
558                                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
559                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
560                                 jsl_log(JSL_DBG_2,
561                                                 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", 
562                                                 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
563                         }
564                 }
565
566                 // save the latest good connection to the client
567                 {
568                         ScopedLock rwl(&conss_m_);
569                         if(conns_.find(h.clt_nonce) == conns_.end()){
570                                 c->incref();
571                                 conns_[h.clt_nonce] = c;
572                         } else if(conns_[h.clt_nonce]->compare(c) < 0){
573                                 conns_[h.clt_nonce]->decref();
574                                 c->incref();
575                                 conns_[h.clt_nonce] = c;
576                         }
577                 }
578
579                 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
580                                                  h.xid_rep, &b1, &sz1);
581         } else {
582                 // this client does not require at most once logic
583                 stat = NEW;
584         }
585
586         switch (stat){
587                 case NEW: // new request
588                         if(counting_){
589                                 updatestat(proc);
590                         }
591
592                         rh.ret = f->fn(req, rep);
593                         if (rh.ret == rpc_const::unmarshal_args_failure) {
594                                 fprintf(stderr, "rpcs::dispatch: failed to"
595                                        " unmarshall the arguments. You are"
596                                        " probably calling RPC 0x%x with wrong"
597                                        " types of arguments.\n", proc);
598                                 VERIFY(0);
599                         }
600                         VERIFY(rh.ret >= 0);
601
602                         rep.pack_reply_header(rh);
603                         rep.take_buf(&b1,&sz1);
604
605                         jsl_log(JSL_DBG_2,
606                                         "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
607                                         sz1, h.xid, proc, rh.ret, h.clt_nonce);
608
609                         if(h.clt_nonce > 0){
610                                 // only record replies for clients that require at-most-once logic
611                                 add_reply(h.clt_nonce, h.xid, b1, sz1);
612                         }
613
614                         // get the latest connection to the client
615                         {
616                                 ScopedLock rwl(&conss_m_);
617                                 if(c->isdead() && c != conns_[h.clt_nonce]){
618                                         c->decref();
619                                         c = conns_[h.clt_nonce];
620                                         c->incref();
621                                 }
622                         }
623
624                         c->send(b1, sz1);
625                         if(h.clt_nonce == 0){
626                                 // reply is not added to at-most-once window, free it
627                                 free(b1);
628                         }
629                         break;
630                 case INPROGRESS: // server is working on this request
631                         break;
632                 case DONE: // duplicate and we still have the response
633                         c->send(b1, sz1);
634                         break;
635                 case FORGOTTEN: // very old request and we don't have the response anymore
636                         jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", 
637                                         h.xid, h.clt_nonce);
638                         rh.ret = rpc_const::atmostonce_failure;
639                         rep.pack_reply_header(rh);
640                         c->send(rep.cstr(),rep.size());
641                         break;
642         }
643         c->decref();
644 }
645
646 // rpcs::dispatch calls this when an RPC request arrives.
647 //
648 // checks to see if an RPC with xid from clt_nonce has already been received.
649 // if not, remembers the request in reply_window_.
650 //
651 // deletes remembered requests with XIDs <= xid_rep; the client
652 // says it has received a reply for every RPC up through xid_rep.
653 // frees the reply_t::buf of each such request.
654 //
655 // returns one of:
656 //   NEW: never seen this xid before.
657 //   INPROGRESS: seen this xid, and still processing it.
658 //   DONE: seen this xid, previous reply returned in *b and *sz.
659 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
660 rpcs::rpcstate_t 
661 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
662                 unsigned int xid_rep, char **b, int *sz)
663 {
664         ScopedLock rwl(&reply_window_m_);
665
666     std::list<reply_t> &l = reply_window_[clt_nonce];
667
668     VERIFY(l.size() > 0);
669     VERIFY(xid >= xid_rep);
670
671     unsigned int past_xid_rep = l.begin()->xid;
672
673     std::list<reply_t>::iterator start = l.begin(), it;
674     it = ++start;
675
676     if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
677         // scan for deletion candidates
678         for (; it != l.end() && it->xid < xid_rep; it++) {
679             if (it->cb_present)
680                 free(it->buf);
681         }
682         l.erase(start, it);
683         l.begin()->xid = xid_rep;
684     }
685
686     if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
687         return FORGOTTEN;
688
689     // skip non-deletion candidates
690     while (it != l.end() && it->xid < xid)
691         it++;
692
693     // if it's in the list it must be right here
694     if (it != l.end() && it->xid == xid) {
695         if (it->cb_present) {
696             // return information about the remembered reply
697             *b = it->buf;
698             *sz = it->sz;
699             return DONE;
700         } else {
701             return INPROGRESS;
702         }
703     } else {
704         // remember that a new request has arrived
705         l.insert(it, reply_t(xid));
706         return NEW;
707     }
708 }
709
710 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
711 // and passes the return value in b and sz.
712 // add_reply() should remember b and sz.
713 // free_reply_window() and checkduplicate_and_update is responsible for 
714 // calling free(b).
715 void
716 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
717                 char *b, int sz)
718 {
719         ScopedLock rwl(&reply_window_m_);
720     // remember the RPC reply value
721     std::list<reply_t> &l = reply_window_[clt_nonce];
722     std::list<reply_t>::iterator it = l.begin();
723     // skip to our place in the list
724     for (it++; it != l.end() && it->xid < xid; it++);
725     // there should already be an entry, so whine if there isn't
726     if (it == l.end() || it->xid != xid) {
727         fprintf(stderr, "Could not find reply struct in add_reply");
728         l.insert(it, reply_t(xid, b, sz));
729     } else {
730         *it = reply_t(xid, b, sz);
731     }
732 }
733
734 void
735 rpcs::free_reply_window(void)
736 {
737         std::map<unsigned int,std::list<reply_t> >::iterator clt;
738         std::list<reply_t>::iterator it;
739
740         ScopedLock rwl(&reply_window_m_);
741         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
742                 for (it = clt->second.begin(); it != clt->second.end(); it++){
743                         if (it->cb_present)
744                 free(it->buf);
745                 }
746                 clt->second.clear();
747         }
748         reply_window_.clear();
749 }
750
751 // rpc handler
752 int 
753 rpcs::rpcbind(int a, int &r)
754 {
755         jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
756         r = nonce_;
757         return 0;
758 }
759
760 void
761 marshall::rawbyte(unsigned char x)
762 {
763         if(_ind >= _capa){
764                 _capa *= 2;
765                 VERIFY (_buf != NULL);
766                 _buf = (char *)realloc(_buf, _capa);
767                 VERIFY(_buf);
768         }
769         _buf[_ind++] = x;
770 }
771
772 void
773 marshall::rawbytes(const char *p, int n)
774 {
775         if((_ind+n) > _capa){
776                 _capa = _capa > n? 2*_capa:(_capa+n);
777                 VERIFY (_buf != NULL);
778                 _buf = (char *)realloc(_buf, _capa);
779                 VERIFY(_buf);
780         }
781         memcpy(_buf+_ind, p, n);
782         _ind += n;
783 }
784
785 marshall &
786 operator<<(marshall &m, bool x)
787 {
788         m.rawbyte(x);
789         return m;
790 }
791
792 marshall &
793 operator<<(marshall &m, unsigned char x)
794 {
795         m.rawbyte(x);
796         return m;
797 }
798
799 marshall &
800 operator<<(marshall &m, char x)
801 {
802         m << (unsigned char) x;
803         return m;
804 }
805
806
807 marshall &
808 operator<<(marshall &m, unsigned short x)
809 {
810         m.rawbyte((x >> 8) & 0xff);
811         m.rawbyte(x & 0xff);
812         return m;
813 }
814
815 marshall &
816 operator<<(marshall &m, short x)
817 {
818         m << (unsigned short) x;
819         return m;
820 }
821
822 marshall &
823 operator<<(marshall &m, unsigned int x)
824 {
825         // network order is big-endian
826         m.rawbyte((x >> 24) & 0xff);
827         m.rawbyte((x >> 16) & 0xff);
828         m.rawbyte((x >> 8) & 0xff);
829         m.rawbyte(x & 0xff);
830         return m;
831 }
832
833 marshall &
834 operator<<(marshall &m, int x)
835 {
836         m << (unsigned int) x;
837         return m;
838 }
839
840 marshall &
841 operator<<(marshall &m, const std::string &s)
842 {
843         m << (unsigned int) s.size();
844         m.rawbytes(s.data(), s.size());
845         return m;
846 }
847
848 marshall &
849 operator<<(marshall &m, unsigned long long x)
850 {
851         m << (unsigned int) (x >> 32);
852         m << (unsigned int) x;
853         return m;
854 }
855
856 void
857 marshall::pack(int x)
858 {
859         rawbyte((x >> 24) & 0xff);
860         rawbyte((x >> 16) & 0xff);
861         rawbyte((x >> 8) & 0xff);
862         rawbyte(x & 0xff);
863 }
864
865 void
866 unmarshall::unpack(int *x)
867 {
868         (*x) = (rawbyte() & 0xff) << 24;
869         (*x) |= (rawbyte() & 0xff) << 16;
870         (*x) |= (rawbyte() & 0xff) << 8;
871         (*x) |= rawbyte() & 0xff;
872 }
873
874 // take the contents from another unmarshall object
875 void
876 unmarshall::take_in(unmarshall &another)
877 {
878         if(_buf)
879                 free(_buf);
880         another.take_buf(&_buf, &_sz);
881         _ind = RPC_HEADER_SZ;
882         _ok = _sz >= RPC_HEADER_SZ?true:false;
883 }
884
885 bool
886 unmarshall::okdone()
887 {
888         if(ok() && _ind == _sz){
889                 return true;
890         } else {
891                 return false;
892         }
893 }
894
895 unsigned int
896 unmarshall::rawbyte()
897 {
898         char c = 0;
899         if(_ind >= _sz)
900                 _ok = false;
901         else
902                 c = _buf[_ind++];
903         return c;
904 }
905
906 unmarshall &
907 operator>>(unmarshall &u, bool &x)
908 {
909         x = (bool) u.rawbyte() ;
910         return u;
911 }
912
913 unmarshall &
914 operator>>(unmarshall &u, unsigned char &x)
915 {
916         x = (unsigned char) u.rawbyte() ;
917         return u;
918 }
919
920 unmarshall &
921 operator>>(unmarshall &u, char &x)
922 {
923         x = (char) u.rawbyte();
924         return u;
925 }
926
927
928 unmarshall &
929 operator>>(unmarshall &u, unsigned short &x)
930 {
931         x = (u.rawbyte() & 0xff) << 8;
932         x |= u.rawbyte() & 0xff;
933         return u;
934 }
935
936 unmarshall &
937 operator>>(unmarshall &u, short &x)
938 {
939         x = (u.rawbyte() & 0xff) << 8;
940         x |= u.rawbyte() & 0xff;
941         return u;
942 }
943
944 unmarshall &
945 operator>>(unmarshall &u, unsigned int &x)
946 {
947         x = (u.rawbyte() & 0xff) << 24;
948         x |= (u.rawbyte() & 0xff) << 16;
949         x |= (u.rawbyte() & 0xff) << 8;
950         x |= u.rawbyte() & 0xff;
951         return u;
952 }
953
954 unmarshall &
955 operator>>(unmarshall &u, int &x)
956 {
957         x = (u.rawbyte() & 0xff) << 24;
958         x |= (u.rawbyte() & 0xff) << 16;
959         x |= (u.rawbyte() & 0xff) << 8;
960         x |= u.rawbyte() & 0xff;
961         return u;
962 }
963
964 unmarshall &
965 operator>>(unmarshall &u, unsigned long long &x)
966 {
967         unsigned int h, l;
968         u >> h;
969         u >> l;
970         x = l | ((unsigned long long) h << 32);
971         return u;
972 }
973
974 unmarshall &
975 operator>>(unmarshall &u, std::string &s)
976 {
977         unsigned sz;
978         u >> sz;
979         if(u.ok())
980                 u.rawbytes(s, sz);
981         return u;
982 }
983
984 void
985 unmarshall::rawbytes(std::string &ss, unsigned int n)
986 {
987         if((_ind+n) > (unsigned)_sz){
988                 _ok = false;
989         } else {
990                 std::string tmps = std::string(_buf+_ind, n);
991                 swap(ss, tmps);
992                 VERIFY(ss.size() == n);
993                 _ind += n;
994         }
995 }
996
997 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
998         return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
999                         ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
1000                          ((a.sin_port < b.sin_port))));
1001 }
1002
1003 /*---------------auxilary function--------------*/
1004 void
1005 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
1006
1007         char host[200];
1008         const char *localhost = "127.0.0.1";
1009         const char *port = index(hostandport, ':');
1010         if(port == NULL){
1011                 memcpy(host, localhost, strlen(localhost)+1);
1012                 port = hostandport;
1013         } else {
1014                 memcpy(host, hostandport, port-hostandport);
1015                 host[port-hostandport] = '\0';
1016                 port++;
1017         }
1018
1019         make_sockaddr(host, port, dst);
1020
1021 }
1022
1023 void
1024 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1025
1026         in_addr_t a;
1027
1028         bzero(dst, sizeof(*dst));
1029         dst->sin_family = AF_INET;
1030
1031         a = inet_addr(host);
1032         if(a != INADDR_NONE){
1033                 dst->sin_addr.s_addr = a;
1034         } else {
1035                 struct hostent *hp = gethostbyname(host);
1036                 if(hp == 0 || hp->h_length != 4){
1037                         fprintf(stderr, "cannot find host name %s\n", host);
1038                         exit(1);
1039                 }
1040                 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1041         }
1042         dst->sin_port = htons(atoi(port));
1043 }
1044
1045 int
1046 cmp_timespec(const struct timespec &a, const struct timespec &b)
1047 {
1048         if(a.tv_sec > b.tv_sec)
1049                 return 1;
1050         else if(a.tv_sec < b.tv_sec)
1051                 return -1;
1052         else {
1053                 if(a.tv_nsec > b.tv_nsec)
1054                         return 1;
1055                 else if(a.tv_nsec < b.tv_nsec)
1056                         return -1;
1057                 else
1058                         return 0;
1059         }
1060 }
1061
1062 void
1063 add_timespec(const struct timespec &a, int b, struct timespec *result)
1064 {
1065         // convert to millisec, add timeout, convert back
1066         result->tv_sec = a.tv_sec + b/1000;
1067         result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
1068         VERIFY(result->tv_nsec >= 0);
1069         while (result->tv_nsec > 1000000000){
1070                 result->tv_sec++;
1071                 result->tv_nsec-=1000000000;
1072         }
1073 }
1074
1075 int
1076 diff_timespec(const struct timespec &end, const struct timespec &start)
1077 {
1078         int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
1079         VERIFY(diff || end.tv_sec == start.tv_sec);
1080         if(end.tv_nsec > start.tv_nsec){
1081                 diff += (end.tv_nsec-start.tv_nsec)/1000000;
1082         } else {
1083                 diff -= (start.tv_nsec-end.tv_nsec)/1000000;
1084         }
1085         return diff;
1086 }