Build on wheezy, and presumably precise
[invirt/third/libt4.git] / rpc / rpc.cc
1 /*
2  The rpcc class handles client-side RPC.  Each rpcc is bound to a
3  single RPC server.  The jobs of rpcc include maintaining a connection to
4  server, sending RPC requests and waiting for responses, retransmissions,
5  at-most-once delivery etc.
6
7  The rpcs class handles the server side of RPC.  Each rpcs handles multiple
8  connections from different rpcc objects.  The jobs of rpcs include accepting
9  connections, dispatching requests to registered RPC handlers, at-most-once
10  delivery etc.
11
12  Both rpcc and rpcs use the connection class as an abstraction for the
13  underlying communication channel.  To send an RPC request/reply, one calls
14  connection::send() which blocks until data is sent or the connection has failed
15  (thus the caller can free the buffer when send() returns).  When a
16  request/reply is received, connection makes a callback into the corresponding
17  rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
18
19  Thread organization:
20  rpcc uses application threads to send RPC requests and blocks to receive the
21  reply or error. All connections use a single PollMgr object to perform async
22  socket IO.  PollMgr creates a single thread to examine the readiness of socket
23  file descriptors and informs the corresponding connection whenever a socket is
24  ready to be read or written.  (We use asynchronous socket IO to reduce the
25  number of threads needed to manage these connections; without async IO, at
26  least one thread is needed per connection to read data without blocking other
27  activities.)  Each rpcs object creates one thread for listening on the server
28  port and a pool of threads for executing RPC requests.  The
29  thread pool allows us to control the number of threads spawned at the server
30  (spawning one thread per request will hurt when the server faces thousands of
31  requests).
32
33  In order to delete a connection object, we must maintain a reference count.
34  For rpcc,
35  multiple client threads might be invoking the rpcc::call() functions and thus
36  holding multiple references to the underlying connection object. For rpcs,
37  multiple dispatch threads might be holding references to the same connection
38  object.  A connection object is deleted only when the underlying connection is
39  dead and the reference count reaches zero.
40
41  The previous version of the RPC library uses pthread_cancel* routines 
42  to implement the deletion of rpcc and rpcs objects. The idea is to cancel 
43  all active threads that might be holding a reference to an object before 
44  deleting that object. However, pthread_cancel is not robust and there are
45  always bugs where outstanding references to deleted objects persist.
46  This version of the RPC library does not do pthread_cancel, but explicitly 
47  joins exited threads to make sure no outstanding references exist before 
48  deleting objects.
49
50  To delete a rpcc object safely, the users of the library must ensure that
51  there are no outstanding calls on the rpcc object.
52
53  To delete a rpcs object safely, we do the following in sequence: 1. stop
54  accepting new incoming connections. 2. close existing active connections.
55  3.  delete the dispatch thread pool which involves waiting for current active
56  RPC handlers to finish.  It is interesting how a thread pool can be deleted
57  without using thread cancellation. The trick is to inject x "poison pills" for
58  a thread pool of x threads. Upon getting a poison pill instead of a normal 
59  task, a worker thread will exit (and thread pool destructor waits to join all
60  x exited worker threads).
61  */
62
63 #include "rpc.h"
64 #include "method_thread.h"
65 #include "slock.h"
66
67 #include <sys/types.h>
68 #include <arpa/inet.h>
69 #include <netinet/tcp.h>
70 #include <time.h>
71 #include <netdb.h>
72 #include <unistd.h>
73
74 #include "jsl_log.h"
75 #include "gettime.h"
76 #include "lang/verify.h"
77
78 const rpcc::TO rpcc::to_max = { 120000 };
79 const rpcc::TO rpcc::to_min = { 1000 };
80
81 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
82 : xid(xxid), un(xun), done(false)
83 {
84         VERIFY(pthread_mutex_init(&m,0) == 0);
85         VERIFY(pthread_cond_init(&c, 0) == 0);
86 }
87
88 rpcc::caller::~caller()
89 {
90         VERIFY(pthread_mutex_destroy(&m) == 0);
91         VERIFY(pthread_cond_destroy(&c) == 0);
92 }
93
94 inline
95 void set_rand_seed()
96 {
97         struct timespec ts;
98         clock_gettime(CLOCK_REALTIME, &ts);
99         srandom((int)ts.tv_nsec^((int)getpid()));
100 }
101
102 rpcc::rpcc(sockaddr_in d, bool retrans) : 
103         dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), 
104         retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
105 {
106         VERIFY(pthread_mutex_init(&m_, 0) == 0);
107         VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
108         VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
109
110         if(retrans){
111                 set_rand_seed();
112                 clt_nonce_ = random();
113         } else {
114                 // special client nonce 0 means this client does not 
115                 // require at-most-once logic from the server
116                 // because it uses tcp and never retries a failed connection
117                 clt_nonce_ = 0;
118         }
119
120         char *loss_env = getenv("RPC_LOSSY");
121         if(loss_env != NULL){
122                 lossytest_ = atoi(loss_env);
123         }
124
125         // xid starts with 1 and latest received reply starts with 0
126         xid_rep_window_.push_back(0);
127
128         jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", 
129                         clt_nonce_, lossytest_); 
130 }
131
132 // IMPORTANT: destruction should happen only when no external threads
133 // are blocked inside rpcc or will use rpcc in the future
134 rpcc::~rpcc()
135 {
136         jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", 
137                         clt_nonce_, chan_?chan_->channo():-1); 
138         if(chan_){
139                 chan_->closeconn();
140                 chan_->decref();
141         }
142         VERIFY(calls_.size() == 0);
143         VERIFY(pthread_mutex_destroy(&m_) == 0);
144         VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
145 }
146
147 int
148 rpcc::bind(TO to)
149 {
150         int r;
151         int ret = call(rpc_const::bind, 0, r, to);
152         if(ret == 0){
153                 ScopedLock ml(&m_);
154                 bind_done_ = true;
155                 srv_nonce_ = r;
156         } else {
157                 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", 
158                                 inet_ntoa(dst_.sin_addr), ret);
159         }
160         return ret;
161 };
162
163 // Cancel all outstanding calls
164 void
165 rpcc::cancel(void)
166 {
167   ScopedLock ml(&m_);
168   printf("rpcc::cancel: force callers to fail\n");
169   std::map<int,caller*>::iterator iter;
170   for(iter = calls_.begin(); iter != calls_.end(); iter++){
171     caller *ca = iter->second;
172
173     jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
174     {
175       ScopedLock cl(&ca->m);
176       ca->done = true;
177       ca->intret = rpc_const::cancel_failure;
178       VERIFY(pthread_cond_signal(&ca->c) == 0);
179     }
180   }
181
182   while (calls_.size () > 0){
183     destroy_wait_ = true;
184     VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
185   }
186   printf("rpcc::cancel: done\n");
187 }
188
189 int
190 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
191                 TO to)
192 {
193
194         caller ca(0, &rep);
195         int xid_rep;
196         {
197                 ScopedLock ml(&m_);
198
199                 if((proc != rpc_const::bind && !bind_done_) ||
200                                 (proc == rpc_const::bind && bind_done_)){
201                         jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
202                         return rpc_const::bind_failure;
203                 }
204
205                 if(destroy_wait_){
206                   return rpc_const::cancel_failure;
207                 }
208
209                 ca.xid = xid_++;
210                 calls_[ca.xid] = &ca;
211
212                 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
213                              xid_rep_window_.front());
214                 req.pack_req_header(h);
215                 xid_rep = xid_rep_window_.front();
216         }
217
218         TO curr_to;
219         struct timespec now, nextdeadline, finaldeadline; 
220
221         clock_gettime(CLOCK_REALTIME, &now);
222         add_timespec(now, to.to, &finaldeadline); 
223         curr_to.to = to_min.to;
224
225         bool transmit = true;
226         connection *ch = NULL;
227
228         while (1){
229                 if(transmit){
230                         get_refconn(&ch);
231                         if(ch){
232                                 if(reachable_) {
233                                         request forgot;
234                                         {
235                                                 ScopedLock ml(&m_);
236                                                 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
237                                                         forgot = dup_req_;
238                                                         dup_req_.clear();
239                                                 }
240                                         }
241                                         if (forgot.isvalid()) 
242                                                 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
243                                         ch->send(req.cstr(), req.size());
244                                 }
245                                 else jsl_log(JSL_DBG_1, "not reachable\n");
246                                 jsl_log(JSL_DBG_2, 
247                                                 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", 
248                                                 clt_nonce_, proc, ca.xid, clt_nonce_); 
249                         }
250                         transmit = false; // only send once on a given channel
251                 }
252
253                 if(!finaldeadline.tv_sec)
254                         break;
255
256                 clock_gettime(CLOCK_REALTIME, &now);
257                 add_timespec(now, curr_to.to, &nextdeadline); 
258                 if(cmp_timespec(nextdeadline,finaldeadline) > 0){
259                         nextdeadline = finaldeadline;
260                         finaldeadline.tv_sec = 0;
261                 }
262
263                 {
264                         ScopedLock cal(&ca.m);
265                         while (!ca.done){
266                                 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
267                                 if(pthread_cond_timedwait(&ca.c, &ca.m,
268                                                  &nextdeadline) == ETIMEDOUT){
269                                         jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
270                                         break;
271                                 }
272                         }
273                         if(ca.done){
274                                 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
275                                 break;
276                         }
277                 }
278
279                 if(retrans_ && (!ch || ch->isdead())){
280                         // since connection is dead, retransmit
281                         // on the new connection 
282                         transmit = true; 
283                 }
284                 curr_to.to <<= 1;
285         }
286
287         { 
288                 // no locking of ca.m since only this thread changes ca.xid 
289                 ScopedLock ml(&m_);
290                 calls_.erase(ca.xid);
291                 // may need to update the xid again here, in case the
292                 // packet times out before it's even sent by the channel.
293                 // I don't think there's any harm in maybe doing it twice
294                 update_xid_rep(ca.xid);
295
296                 if(destroy_wait_){
297                   VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
298                 }
299         }
300
301         if (ca.done && lossytest_)
302         {
303                 ScopedLock ml(&m_);
304                 if (!dup_req_.isvalid()) {
305                         dup_req_.buf.assign(req.cstr(), req.size());
306                         dup_req_.xid = ca.xid;
307                 }
308                 if (xid_rep > xid_rep_done_)
309                         xid_rep_done_ = xid_rep;
310         }
311
312         ScopedLock cal(&ca.m);
313
314         jsl_log(JSL_DBG_2, 
315                         "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", 
316                         clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
317                         ntohs(dst_.sin_port), ca.done, ca.intret);
318
319         if(ch)
320                 ch->decref();
321
322         // destruction of req automatically frees its buffer
323         return (ca.done? ca.intret : rpc_const::timeout_failure);
324 }
325
326 void
327 rpcc::get_refconn(connection **ch)
328 {
329         ScopedLock ml(&chan_m_);
330         if(!chan_ || chan_->isdead()){
331                 if(chan_)
332                         chan_->decref();
333                 chan_ = connect_to_dst(dst_, this, lossytest_);
334         }
335         if(ch && chan_){
336                 if(*ch){
337                         (*ch)->decref();
338                 }
339                 *ch = chan_;
340                 (*ch)->incref();
341         }
342 }
343
344 // PollMgr's thread is being used to 
345 // make this upcall from connection object to rpcc. 
346 // this funtion must not block.
347 //
348 // this function keeps no reference for connection *c 
349 bool
350 rpcc::got_pdu(connection *c, char *b, int sz)
351 {
352         unmarshall rep(b, sz);
353         reply_header h;
354         rep.unpack_reply_header(&h);
355
356         if(!rep.ok()){
357                 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
358                 return true;
359         }
360
361         ScopedLock ml(&m_);
362
363         update_xid_rep(h.xid);
364
365         if(calls_.find(h.xid) == calls_.end()){
366                 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
367                 return true;
368         }
369         caller *ca = calls_[h.xid];
370
371         ScopedLock cl(&ca->m);
372         if(!ca->done){
373                 ca->un->take_in(rep);
374                 ca->intret = h.ret;
375                 if(ca->intret < 0){
376                         jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
377                                         h.xid, ca->intret);
378                 }
379                 ca->done = 1;
380         }
381         VERIFY(pthread_cond_broadcast(&ca->c) == 0);
382         return true;
383 }
384
385 // assumes thread holds mutex m
386 void 
387 rpcc::update_xid_rep(unsigned int xid)
388 {
389         std::list<unsigned int>::iterator it;
390
391         if(xid <= xid_rep_window_.front()){
392                 return;
393         }
394
395         for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
396                 if(*it > xid){
397                         xid_rep_window_.insert(it, xid);
398                         goto compress;
399                 }
400         }
401         xid_rep_window_.push_back(xid);
402
403 compress:
404         it = xid_rep_window_.begin();
405         for (it++; it != xid_rep_window_.end(); it++){
406                 while (xid_rep_window_.front() + 1 == *it)
407                         xid_rep_window_.pop_front();
408         }
409 }
410
411
412 rpcs::rpcs(unsigned int p1, int count)
413   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
414 {
415         VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
416         VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
417         VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
418         VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
419
420         set_rand_seed();
421         nonce_ = random();
422         jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
423
424         char *loss_env = getenv("RPC_LOSSY");
425         if(loss_env != NULL){
426                 lossytest_ = atoi(loss_env);
427         }
428
429         reg(rpc_const::bind, this, &rpcs::rpcbind);
430         dispatchpool_ = new ThrPool(6,false);
431
432         listener_ = new tcpsconn(this, port_, lossytest_);
433 }
434
435 rpcs::~rpcs()
436 {
437         // must delete listener before dispatchpool
438         delete listener_;
439         delete dispatchpool_;
440         free_reply_window();
441 }
442
443 bool
444 rpcs::got_pdu(connection *c, char *b, int sz)
445 {
446         if(!reachable_){
447             jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
448             return true;
449         }
450
451         djob_t *j = new djob_t(c, b, sz);
452         c->incref();
453         bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
454         if(!succ || !reachable_){
455                 c->decref();
456                 delete j;
457         }
458         return succ; 
459 }
460
461 void
462 rpcs::reg1(unsigned int proc, handler *h)
463 {
464         ScopedLock pl(&procs_m_);
465         VERIFY(procs_.count(proc) == 0);
466         procs_[proc] = h;
467         VERIFY(procs_.count(proc) >= 1);
468 }
469
470 void
471 rpcs::updatestat(unsigned int proc)
472 {
473         ScopedLock cl(&count_m_);
474         counts_[proc]++;
475         curr_counts_--;
476         if(curr_counts_ == 0){
477                 std::map<int, int>::iterator i;
478                 printf("RPC STATS: ");
479                 for (i = counts_.begin(); i != counts_.end(); i++){
480                         printf("%x:%d ", i->first, i->second);
481                 }
482                 printf("\n");
483
484                 ScopedLock rwl(&reply_window_m_);
485                 std::map<unsigned int,std::list<reply_t> >::iterator clt;
486
487                 unsigned int totalrep = 0, maxrep = 0;
488                 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
489                         totalrep += clt->second.size();
490                         if(clt->second.size() > maxrep)
491                                 maxrep = clt->second.size();
492                 }
493                 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", 
494                         (int) reply_window_.size()-1, totalrep, maxrep);
495                 curr_counts_ = counting_;
496         }
497 }
498
499 void
500 rpcs::dispatch(djob_t *j)
501 {
502         connection *c = j->conn;
503         unmarshall req(j->buf, j->sz);
504         delete j;
505
506         req_header h;
507         req.unpack_req_header(&h);
508         int proc = h.proc;
509
510         if(!req.ok()){
511                 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
512                 c->decref();
513                 return;
514         }
515
516         jsl_log(JSL_DBG_2,
517                         "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
518                         h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
519
520         marshall rep;
521         reply_header rh(h.xid,0);
522
523         // is client sending to an old instance of server?
524         if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
525                 jsl_log(JSL_DBG_2,
526                                 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
527                                 h.srv_nonce, nonce_, h.proc);
528                 rh.ret = rpc_const::oldsrv_failure;
529                 rep.pack_reply_header(rh);
530                 c->send(rep.cstr(),rep.size());
531                 return;
532         }
533
534         handler *f;
535         // is RPC proc a registered procedure?
536         {
537                 ScopedLock pl(&procs_m_);
538                 if(procs_.count(proc) < 1){
539                         fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
540                                 proc);
541                         c->decref();
542                         VERIFY(0);
543                         return;
544                 }
545
546                 f = procs_[proc];
547         }
548
549         rpcs::rpcstate_t stat;
550         char *b1;
551         int sz1;
552
553         if(h.clt_nonce){
554                 // have i seen this client before?
555                 {
556                         ScopedLock rwl(&reply_window_m_);
557                         // if we don't know about this clt_nonce, create a cleanup object
558                         if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
559                                 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
560                 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
561                                 jsl_log(JSL_DBG_2,
562                                                 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", 
563                                                 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
564                         }
565                 }
566
567                 // save the latest good connection to the client
568                 {
569                         ScopedLock rwl(&conss_m_);
570                         if(conns_.find(h.clt_nonce) == conns_.end()){
571                                 c->incref();
572                                 conns_[h.clt_nonce] = c;
573                         } else if(conns_[h.clt_nonce]->compare(c) < 0){
574                                 conns_[h.clt_nonce]->decref();
575                                 c->incref();
576                                 conns_[h.clt_nonce] = c;
577                         }
578                 }
579
580                 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
581                                                  h.xid_rep, &b1, &sz1);
582         } else {
583                 // this client does not require at most once logic
584                 stat = NEW;
585         }
586
587         switch (stat){
588                 case NEW: // new request
589                         if(counting_){
590                                 updatestat(proc);
591                         }
592
593                         rh.ret = f->fn(req, rep);
594                         if (rh.ret == rpc_const::unmarshal_args_failure) {
595                                 fprintf(stderr, "rpcs::dispatch: failed to"
596                                        " unmarshall the arguments. You are"
597                                        " probably calling RPC 0x%x with wrong"
598                                        " types of arguments.\n", proc);
599                                 VERIFY(0);
600                         }
601                         VERIFY(rh.ret >= 0);
602
603                         rep.pack_reply_header(rh);
604                         rep.take_buf(&b1,&sz1);
605
606                         jsl_log(JSL_DBG_2,
607                                         "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
608                                         sz1, h.xid, proc, rh.ret, h.clt_nonce);
609
610                         if(h.clt_nonce > 0){
611                                 // only record replies for clients that require at-most-once logic
612                                 add_reply(h.clt_nonce, h.xid, b1, sz1);
613                         }
614
615                         // get the latest connection to the client
616                         {
617                                 ScopedLock rwl(&conss_m_);
618                                 if(c->isdead() && c != conns_[h.clt_nonce]){
619                                         c->decref();
620                                         c = conns_[h.clt_nonce];
621                                         c->incref();
622                                 }
623                         }
624
625                         c->send(b1, sz1);
626                         if(h.clt_nonce == 0){
627                                 // reply is not added to at-most-once window, free it
628                                 free(b1);
629                         }
630                         break;
631                 case INPROGRESS: // server is working on this request
632                         break;
633                 case DONE: // duplicate and we still have the response
634                         c->send(b1, sz1);
635                         break;
636                 case FORGOTTEN: // very old request and we don't have the response anymore
637                         jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", 
638                                         h.xid, h.clt_nonce);
639                         rh.ret = rpc_const::atmostonce_failure;
640                         rep.pack_reply_header(rh);
641                         c->send(rep.cstr(),rep.size());
642                         break;
643         }
644         c->decref();
645 }
646
647 // rpcs::dispatch calls this when an RPC request arrives.
648 //
649 // checks to see if an RPC with xid from clt_nonce has already been received.
650 // if not, remembers the request in reply_window_.
651 //
652 // deletes remembered requests with XIDs <= xid_rep; the client
653 // says it has received a reply for every RPC up through xid_rep.
654 // frees the reply_t::buf of each such request.
655 //
656 // returns one of:
657 //   NEW: never seen this xid before.
658 //   INPROGRESS: seen this xid, and still processing it.
659 //   DONE: seen this xid, previous reply returned in *b and *sz.
660 //   FORGOTTEN: might have seen this xid, but deleted previous reply.
661 rpcs::rpcstate_t 
662 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
663                 unsigned int xid_rep, char **b, int *sz)
664 {
665         ScopedLock rwl(&reply_window_m_);
666
667     std::list<reply_t> &l = reply_window_[clt_nonce];
668
669     VERIFY(l.size() > 0);
670     VERIFY(xid >= xid_rep);
671
672     unsigned int past_xid_rep = l.begin()->xid;
673
674     std::list<reply_t>::iterator start = l.begin(), it;
675     it = ++start;
676
677     if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
678         // scan for deletion candidates
679         for (; it != l.end() && it->xid < xid_rep; it++) {
680             if (it->cb_present)
681                 free(it->buf);
682         }
683         l.erase(start, it);
684         l.begin()->xid = xid_rep;
685     }
686
687     if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
688         return FORGOTTEN;
689
690     // skip non-deletion candidates
691     while (it != l.end() && it->xid < xid)
692         it++;
693
694     // if it's in the list it must be right here
695     if (it != l.end() && it->xid == xid) {
696         if (it->cb_present) {
697             // return information about the remembered reply
698             *b = it->buf;
699             *sz = it->sz;
700             return DONE;
701         } else {
702             return INPROGRESS;
703         }
704     } else {
705         // remember that a new request has arrived
706         l.insert(it, reply_t(xid));
707         return NEW;
708     }
709 }
710
711 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
712 // and passes the return value in b and sz.
713 // add_reply() should remember b and sz.
714 // free_reply_window() and checkduplicate_and_update is responsible for 
715 // calling free(b).
716 void
717 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
718                 char *b, int sz)
719 {
720         ScopedLock rwl(&reply_window_m_);
721     // remember the RPC reply value
722     std::list<reply_t> &l = reply_window_[clt_nonce];
723     std::list<reply_t>::iterator it = l.begin();
724     // skip to our place in the list
725     for (it++; it != l.end() && it->xid < xid; it++);
726     // there should already be an entry, so whine if there isn't
727     if (it == l.end() || it->xid != xid) {
728         fprintf(stderr, "Could not find reply struct in add_reply");
729         l.insert(it, reply_t(xid, b, sz));
730     } else {
731         *it = reply_t(xid, b, sz);
732     }
733 }
734
735 void
736 rpcs::free_reply_window(void)
737 {
738         std::map<unsigned int,std::list<reply_t> >::iterator clt;
739         std::list<reply_t>::iterator it;
740
741         ScopedLock rwl(&reply_window_m_);
742         for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
743                 for (it = clt->second.begin(); it != clt->second.end(); it++){
744                         if (it->cb_present)
745                 free(it->buf);
746                 }
747                 clt->second.clear();
748         }
749         reply_window_.clear();
750 }
751
752 // rpc handler
753 int 
754 rpcs::rpcbind(int a, int &r)
755 {
756         jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
757         r = nonce_;
758         return 0;
759 }
760
761 void
762 marshall::rawbyte(unsigned char x)
763 {
764         if(_ind >= _capa){
765                 _capa *= 2;
766                 VERIFY (_buf != NULL);
767                 _buf = (char *)realloc(_buf, _capa);
768                 VERIFY(_buf);
769         }
770         _buf[_ind++] = x;
771 }
772
773 void
774 marshall::rawbytes(const char *p, int n)
775 {
776         if((_ind+n) > _capa){
777                 _capa = _capa > n? 2*_capa:(_capa+n);
778                 VERIFY (_buf != NULL);
779                 _buf = (char *)realloc(_buf, _capa);
780                 VERIFY(_buf);
781         }
782         memcpy(_buf+_ind, p, n);
783         _ind += n;
784 }
785
786 marshall &
787 operator<<(marshall &m, bool x)
788 {
789         m.rawbyte(x);
790         return m;
791 }
792
793 marshall &
794 operator<<(marshall &m, unsigned char x)
795 {
796         m.rawbyte(x);
797         return m;
798 }
799
800 marshall &
801 operator<<(marshall &m, char x)
802 {
803         m << (unsigned char) x;
804         return m;
805 }
806
807
808 marshall &
809 operator<<(marshall &m, unsigned short x)
810 {
811         m.rawbyte((x >> 8) & 0xff);
812         m.rawbyte(x & 0xff);
813         return m;
814 }
815
816 marshall &
817 operator<<(marshall &m, short x)
818 {
819         m << (unsigned short) x;
820         return m;
821 }
822
823 marshall &
824 operator<<(marshall &m, unsigned int x)
825 {
826         // network order is big-endian
827         m.rawbyte((x >> 24) & 0xff);
828         m.rawbyte((x >> 16) & 0xff);
829         m.rawbyte((x >> 8) & 0xff);
830         m.rawbyte(x & 0xff);
831         return m;
832 }
833
834 marshall &
835 operator<<(marshall &m, int x)
836 {
837         m << (unsigned int) x;
838         return m;
839 }
840
841 marshall &
842 operator<<(marshall &m, const std::string &s)
843 {
844         m << (unsigned int) s.size();
845         m.rawbytes(s.data(), s.size());
846         return m;
847 }
848
849 marshall &
850 operator<<(marshall &m, unsigned long long x)
851 {
852         m << (unsigned int) (x >> 32);
853         m << (unsigned int) x;
854         return m;
855 }
856
857 void
858 marshall::pack(int x)
859 {
860         rawbyte((x >> 24) & 0xff);
861         rawbyte((x >> 16) & 0xff);
862         rawbyte((x >> 8) & 0xff);
863         rawbyte(x & 0xff);
864 }
865
866 void
867 unmarshall::unpack(int *x)
868 {
869         (*x) = (rawbyte() & 0xff) << 24;
870         (*x) |= (rawbyte() & 0xff) << 16;
871         (*x) |= (rawbyte() & 0xff) << 8;
872         (*x) |= rawbyte() & 0xff;
873 }
874
875 // take the contents from another unmarshall object
876 void
877 unmarshall::take_in(unmarshall &another)
878 {
879         if(_buf)
880                 free(_buf);
881         another.take_buf(&_buf, &_sz);
882         _ind = RPC_HEADER_SZ;
883         _ok = _sz >= RPC_HEADER_SZ?true:false;
884 }
885
886 bool
887 unmarshall::okdone()
888 {
889         if(ok() && _ind == _sz){
890                 return true;
891         } else {
892                 return false;
893         }
894 }
895
896 unsigned int
897 unmarshall::rawbyte()
898 {
899         char c = 0;
900         if(_ind >= _sz)
901                 _ok = false;
902         else
903                 c = _buf[_ind++];
904         return c;
905 }
906
907 unmarshall &
908 operator>>(unmarshall &u, bool &x)
909 {
910         x = (bool) u.rawbyte() ;
911         return u;
912 }
913
914 unmarshall &
915 operator>>(unmarshall &u, unsigned char &x)
916 {
917         x = (unsigned char) u.rawbyte() ;
918         return u;
919 }
920
921 unmarshall &
922 operator>>(unmarshall &u, char &x)
923 {
924         x = (char) u.rawbyte();
925         return u;
926 }
927
928
929 unmarshall &
930 operator>>(unmarshall &u, unsigned short &x)
931 {
932         x = (u.rawbyte() & 0xff) << 8;
933         x |= u.rawbyte() & 0xff;
934         return u;
935 }
936
937 unmarshall &
938 operator>>(unmarshall &u, short &x)
939 {
940         x = (u.rawbyte() & 0xff) << 8;
941         x |= u.rawbyte() & 0xff;
942         return u;
943 }
944
945 unmarshall &
946 operator>>(unmarshall &u, unsigned int &x)
947 {
948         x = (u.rawbyte() & 0xff) << 24;
949         x |= (u.rawbyte() & 0xff) << 16;
950         x |= (u.rawbyte() & 0xff) << 8;
951         x |= u.rawbyte() & 0xff;
952         return u;
953 }
954
955 unmarshall &
956 operator>>(unmarshall &u, int &x)
957 {
958         x = (u.rawbyte() & 0xff) << 24;
959         x |= (u.rawbyte() & 0xff) << 16;
960         x |= (u.rawbyte() & 0xff) << 8;
961         x |= u.rawbyte() & 0xff;
962         return u;
963 }
964
965 unmarshall &
966 operator>>(unmarshall &u, unsigned long long &x)
967 {
968         unsigned int h, l;
969         u >> h;
970         u >> l;
971         x = l | ((unsigned long long) h << 32);
972         return u;
973 }
974
975 unmarshall &
976 operator>>(unmarshall &u, std::string &s)
977 {
978         unsigned sz;
979         u >> sz;
980         if(u.ok())
981                 u.rawbytes(s, sz);
982         return u;
983 }
984
985 void
986 unmarshall::rawbytes(std::string &ss, unsigned int n)
987 {
988         if((_ind+n) > (unsigned)_sz){
989                 _ok = false;
990         } else {
991                 std::string tmps = std::string(_buf+_ind, n);
992                 swap(ss, tmps);
993                 VERIFY(ss.size() == n);
994                 _ind += n;
995         }
996 }
997
998 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
999         return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
1000                         ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
1001                          ((a.sin_port < b.sin_port))));
1002 }
1003
1004 /*---------------auxilary function--------------*/
1005 void
1006 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
1007
1008         char host[200];
1009         const char *localhost = "127.0.0.1";
1010         const char *port = index(hostandport, ':');
1011         if(port == NULL){
1012                 memcpy(host, localhost, strlen(localhost)+1);
1013                 port = hostandport;
1014         } else {
1015                 memcpy(host, hostandport, port-hostandport);
1016                 host[port-hostandport] = '\0';
1017                 port++;
1018         }
1019
1020         make_sockaddr(host, port, dst);
1021
1022 }
1023
1024 void
1025 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1026
1027         in_addr_t a;
1028
1029         bzero(dst, sizeof(*dst));
1030         dst->sin_family = AF_INET;
1031
1032         a = inet_addr(host);
1033         if(a != INADDR_NONE){
1034                 dst->sin_addr.s_addr = a;
1035         } else {
1036                 struct hostent *hp = gethostbyname(host);
1037                 if(hp == 0 || hp->h_length != 4){
1038                         fprintf(stderr, "cannot find host name %s\n", host);
1039                         exit(1);
1040                 }
1041                 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1042         }
1043         dst->sin_port = htons(atoi(port));
1044 }
1045
1046 int
1047 cmp_timespec(const struct timespec &a, const struct timespec &b)
1048 {
1049         if(a.tv_sec > b.tv_sec)
1050                 return 1;
1051         else if(a.tv_sec < b.tv_sec)
1052                 return -1;
1053         else {
1054                 if(a.tv_nsec > b.tv_nsec)
1055                         return 1;
1056                 else if(a.tv_nsec < b.tv_nsec)
1057                         return -1;
1058                 else
1059                         return 0;
1060         }
1061 }
1062
1063 void
1064 add_timespec(const struct timespec &a, int b, struct timespec *result)
1065 {
1066         // convert to millisec, add timeout, convert back
1067         result->tv_sec = a.tv_sec + b/1000;
1068         result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
1069         VERIFY(result->tv_nsec >= 0);
1070         while (result->tv_nsec > 1000000000){
1071                 result->tv_sec++;
1072                 result->tv_nsec-=1000000000;
1073         }
1074 }
1075
1076 int
1077 diff_timespec(const struct timespec &end, const struct timespec &start)
1078 {
1079         int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
1080         VERIFY(diff || end.tv_sec == start.tv_sec);
1081         if(end.tv_nsec > start.tv_nsec){
1082                 diff += (end.tv_nsec-start.tv_nsec)/1000000;
1083         } else {
1084                 diff -= (start.tv_nsec-end.tv_nsec)/1000000;
1085         }
1086         return diff;
1087 }