2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 The previous version of the RPC library uses pthread_cancel* routines
42 to implement the deletion of rpcc and rpcs objects. The idea is to cancel
43 all active threads that might be holding a reference to an object before
44 deleting that object. However, pthread_cancel is not robust and there are
45 always bugs where outstanding references to deleted objects persist.
46 This version of the RPC library does not do pthread_cancel, but explicitly
47 joins exited threads to make sure no outstanding references exist before
50 To delete a rpcc object safely, the users of the library must ensure that
51 there are no outstanding calls on the rpcc object.
53 To delete a rpcs object safely, we do the following in sequence: 1. stop
54 accepting new incoming connections. 2. close existing active connections.
55 3. delete the dispatch thread pool which involves waiting for current active
56 RPC handlers to finish. It is interesting how a thread pool can be deleted
57 without using thread cancellation. The trick is to inject x "poison pills" for
58 a thread pool of x threads. Upon getting a poison pill instead of a normal
59 task, a worker thread will exit (and thread pool destructor waits to join all
60 x exited worker threads).
64 #include "method_thread.h"
67 #include <sys/types.h>
68 #include <arpa/inet.h>
69 #include <netinet/tcp.h>
76 #include "lang/verify.h"
78 const rpcc::TO rpcc::to_max = { 120000 };
79 const rpcc::TO rpcc::to_min = { 1000 };
81 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
82 : xid(xxid), un(xun), done(false)
84 VERIFY(pthread_mutex_init(&m,0) == 0);
85 VERIFY(pthread_cond_init(&c, 0) == 0);
88 rpcc::caller::~caller()
90 VERIFY(pthread_mutex_destroy(&m) == 0);
91 VERIFY(pthread_cond_destroy(&c) == 0);
98 clock_gettime(CLOCK_REALTIME, &ts);
99 srandom((int)ts.tv_nsec^((int)getpid()));
102 rpcc::rpcc(sockaddr_in d, bool retrans) :
103 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
104 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
106 VERIFY(pthread_mutex_init(&m_, 0) == 0);
107 VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
108 VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
112 clt_nonce_ = random();
114 // special client nonce 0 means this client does not
115 // require at-most-once logic from the server
116 // because it uses tcp and never retries a failed connection
120 char *loss_env = getenv("RPC_LOSSY");
121 if(loss_env != NULL){
122 lossytest_ = atoi(loss_env);
125 // xid starts with 1 and latest received reply starts with 0
126 xid_rep_window_.push_back(0);
128 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
129 clt_nonce_, lossytest_);
132 // IMPORTANT: destruction should happen only when no external threads
133 // are blocked inside rpcc or will use rpcc in the future
136 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
137 clt_nonce_, chan_?chan_->channo():-1);
142 VERIFY(calls_.size() == 0);
143 VERIFY(pthread_mutex_destroy(&m_) == 0);
144 VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
151 int ret = call(rpc_const::bind, 0, r, to);
157 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
158 inet_ntoa(dst_.sin_addr), ret);
163 // Cancel all outstanding calls
168 printf("rpcc::cancel: force callers to fail\n");
169 std::map<int,caller*>::iterator iter;
170 for(iter = calls_.begin(); iter != calls_.end(); iter++){
171 caller *ca = iter->second;
173 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
175 ScopedLock cl(&ca->m);
177 ca->intret = rpc_const::cancel_failure;
178 VERIFY(pthread_cond_signal(&ca->c) == 0);
182 while (calls_.size () > 0){
183 destroy_wait_ = true;
184 VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
186 printf("rpcc::cancel: done\n");
190 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
199 if((proc != rpc_const::bind && !bind_done_) ||
200 (proc == rpc_const::bind && bind_done_)){
201 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
202 return rpc_const::bind_failure;
206 return rpc_const::cancel_failure;
210 calls_[ca.xid] = &ca;
212 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
213 xid_rep_window_.front());
214 req.pack_req_header(h);
215 xid_rep = xid_rep_window_.front();
219 struct timespec now, nextdeadline, finaldeadline;
221 clock_gettime(CLOCK_REALTIME, &now);
222 add_timespec(now, to.to, &finaldeadline);
223 curr_to.to = to_min.to;
225 bool transmit = true;
226 connection *ch = NULL;
236 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
241 if (forgot.isvalid())
242 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
243 ch->send(req.cstr(), req.size());
245 else jsl_log(JSL_DBG_1, "not reachable\n");
247 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
248 clt_nonce_, proc, ca.xid, clt_nonce_);
250 transmit = false; // only send once on a given channel
253 if(!finaldeadline.tv_sec)
256 clock_gettime(CLOCK_REALTIME, &now);
257 add_timespec(now, curr_to.to, &nextdeadline);
258 if(cmp_timespec(nextdeadline,finaldeadline) > 0){
259 nextdeadline = finaldeadline;
260 finaldeadline.tv_sec = 0;
264 ScopedLock cal(&ca.m);
266 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
267 if(pthread_cond_timedwait(&ca.c, &ca.m,
268 &nextdeadline) == ETIMEDOUT){
269 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
274 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
279 if(retrans_ && (!ch || ch->isdead())){
280 // since connection is dead, retransmit
281 // on the new connection
288 // no locking of ca.m since only this thread changes ca.xid
290 calls_.erase(ca.xid);
291 // may need to update the xid again here, in case the
292 // packet times out before it's even sent by the channel.
293 // I don't think there's any harm in maybe doing it twice
294 update_xid_rep(ca.xid);
297 VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
301 if (ca.done && lossytest_)
304 if (!dup_req_.isvalid()) {
305 dup_req_.buf.assign(req.cstr(), req.size());
306 dup_req_.xid = ca.xid;
308 if (xid_rep > xid_rep_done_)
309 xid_rep_done_ = xid_rep;
312 ScopedLock cal(&ca.m);
315 "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
316 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
317 ntohs(dst_.sin_port), ca.done, ca.intret);
322 // destruction of req automatically frees its buffer
323 return (ca.done? ca.intret : rpc_const::timeout_failure);
327 rpcc::get_refconn(connection **ch)
329 ScopedLock ml(&chan_m_);
330 if(!chan_ || chan_->isdead()){
333 chan_ = connect_to_dst(dst_, this, lossytest_);
344 // PollMgr's thread is being used to
345 // make this upcall from connection object to rpcc.
346 // this funtion must not block.
348 // this function keeps no reference for connection *c
350 rpcc::got_pdu(connection *c, char *b, int sz)
352 unmarshall rep(b, sz);
354 rep.unpack_reply_header(&h);
357 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
363 update_xid_rep(h.xid);
365 if(calls_.find(h.xid) == calls_.end()){
366 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
369 caller *ca = calls_[h.xid];
371 ScopedLock cl(&ca->m);
373 ca->un->take_in(rep);
376 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
381 VERIFY(pthread_cond_broadcast(&ca->c) == 0);
385 // assumes thread holds mutex m
387 rpcc::update_xid_rep(unsigned int xid)
389 std::list<unsigned int>::iterator it;
391 if(xid <= xid_rep_window_.front()){
395 for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
397 xid_rep_window_.insert(it, xid);
401 xid_rep_window_.push_back(xid);
404 it = xid_rep_window_.begin();
405 for (it++; it != xid_rep_window_.end(); it++){
406 while (xid_rep_window_.front() + 1 == *it)
407 xid_rep_window_.pop_front();
412 rpcs::rpcs(unsigned int p1, int count)
413 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
415 VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
416 VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
417 VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
418 VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
422 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
424 char *loss_env = getenv("RPC_LOSSY");
425 if(loss_env != NULL){
426 lossytest_ = atoi(loss_env);
429 reg(rpc_const::bind, this, &rpcs::rpcbind);
430 dispatchpool_ = new ThrPool(6,false);
432 listener_ = new tcpsconn(this, port_, lossytest_);
437 // must delete listener before dispatchpool
439 delete dispatchpool_;
444 rpcs::got_pdu(connection *c, char *b, int sz)
447 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
451 djob_t *j = new djob_t(c, b, sz);
453 bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
454 if(!succ || !reachable_){
462 rpcs::reg1(unsigned int proc, handler *h)
464 ScopedLock pl(&procs_m_);
465 VERIFY(procs_.count(proc) == 0);
467 VERIFY(procs_.count(proc) >= 1);
471 rpcs::updatestat(unsigned int proc)
473 ScopedLock cl(&count_m_);
476 if(curr_counts_ == 0){
477 std::map<int, int>::iterator i;
478 printf("RPC STATS: ");
479 for (i = counts_.begin(); i != counts_.end(); i++){
480 printf("%x:%d ", i->first, i->second);
484 ScopedLock rwl(&reply_window_m_);
485 std::map<unsigned int,std::list<reply_t> >::iterator clt;
487 unsigned int totalrep = 0, maxrep = 0;
488 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
489 totalrep += clt->second.size();
490 if(clt->second.size() > maxrep)
491 maxrep = clt->second.size();
493 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
494 (int) reply_window_.size()-1, totalrep, maxrep);
495 curr_counts_ = counting_;
500 rpcs::dispatch(djob_t *j)
502 connection *c = j->conn;
503 unmarshall req(j->buf, j->sz);
507 req.unpack_req_header(&h);
511 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
517 "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
518 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
521 reply_header rh(h.xid,0);
523 // is client sending to an old instance of server?
524 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
526 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
527 h.srv_nonce, nonce_, h.proc);
528 rh.ret = rpc_const::oldsrv_failure;
529 rep.pack_reply_header(rh);
530 c->send(rep.cstr(),rep.size());
535 // is RPC proc a registered procedure?
537 ScopedLock pl(&procs_m_);
538 if(procs_.count(proc) < 1){
539 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
549 rpcs::rpcstate_t stat;
554 // have i seen this client before?
556 ScopedLock rwl(&reply_window_m_);
557 // if we don't know about this clt_nonce, create a cleanup object
558 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
559 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
560 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
562 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
563 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
567 // save the latest good connection to the client
569 ScopedLock rwl(&conss_m_);
570 if(conns_.find(h.clt_nonce) == conns_.end()){
572 conns_[h.clt_nonce] = c;
573 } else if(conns_[h.clt_nonce]->compare(c) < 0){
574 conns_[h.clt_nonce]->decref();
576 conns_[h.clt_nonce] = c;
580 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
581 h.xid_rep, &b1, &sz1);
583 // this client does not require at most once logic
588 case NEW: // new request
593 rh.ret = f->fn(req, rep);
594 if (rh.ret == rpc_const::unmarshal_args_failure) {
595 fprintf(stderr, "rpcs::dispatch: failed to"
596 " unmarshall the arguments. You are"
597 " probably calling RPC 0x%x with wrong"
598 " types of arguments.\n", proc);
603 rep.pack_reply_header(rh);
604 rep.take_buf(&b1,&sz1);
607 "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
608 sz1, h.xid, proc, rh.ret, h.clt_nonce);
611 // only record replies for clients that require at-most-once logic
612 add_reply(h.clt_nonce, h.xid, b1, sz1);
615 // get the latest connection to the client
617 ScopedLock rwl(&conss_m_);
618 if(c->isdead() && c != conns_[h.clt_nonce]){
620 c = conns_[h.clt_nonce];
626 if(h.clt_nonce == 0){
627 // reply is not added to at-most-once window, free it
631 case INPROGRESS: // server is working on this request
633 case DONE: // duplicate and we still have the response
636 case FORGOTTEN: // very old request and we don't have the response anymore
637 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
639 rh.ret = rpc_const::atmostonce_failure;
640 rep.pack_reply_header(rh);
641 c->send(rep.cstr(),rep.size());
647 // rpcs::dispatch calls this when an RPC request arrives.
649 // checks to see if an RPC with xid from clt_nonce has already been received.
650 // if not, remembers the request in reply_window_.
652 // deletes remembered requests with XIDs <= xid_rep; the client
653 // says it has received a reply for every RPC up through xid_rep.
654 // frees the reply_t::buf of each such request.
657 // NEW: never seen this xid before.
658 // INPROGRESS: seen this xid, and still processing it.
659 // DONE: seen this xid, previous reply returned in *b and *sz.
660 // FORGOTTEN: might have seen this xid, but deleted previous reply.
662 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
663 unsigned int xid_rep, char **b, int *sz)
665 ScopedLock rwl(&reply_window_m_);
667 std::list<reply_t> &l = reply_window_[clt_nonce];
669 VERIFY(l.size() > 0);
670 VERIFY(xid >= xid_rep);
672 unsigned int past_xid_rep = l.begin()->xid;
674 std::list<reply_t>::iterator start = l.begin(), it;
677 if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
678 // scan for deletion candidates
679 for (; it != l.end() && it->xid < xid_rep; it++) {
684 l.begin()->xid = xid_rep;
687 if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
690 // skip non-deletion candidates
691 while (it != l.end() && it->xid < xid)
694 // if it's in the list it must be right here
695 if (it != l.end() && it->xid == xid) {
696 if (it->cb_present) {
697 // return information about the remembered reply
705 // remember that a new request has arrived
706 l.insert(it, reply_t(xid));
711 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
712 // and passes the return value in b and sz.
713 // add_reply() should remember b and sz.
714 // free_reply_window() and checkduplicate_and_update is responsible for
717 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
720 ScopedLock rwl(&reply_window_m_);
721 // remember the RPC reply value
722 std::list<reply_t> &l = reply_window_[clt_nonce];
723 std::list<reply_t>::iterator it = l.begin();
724 // skip to our place in the list
725 for (it++; it != l.end() && it->xid < xid; it++);
726 // there should already be an entry, so whine if there isn't
727 if (it == l.end() || it->xid != xid) {
728 fprintf(stderr, "Could not find reply struct in add_reply");
729 l.insert(it, reply_t(xid, b, sz));
731 *it = reply_t(xid, b, sz);
736 rpcs::free_reply_window(void)
738 std::map<unsigned int,std::list<reply_t> >::iterator clt;
739 std::list<reply_t>::iterator it;
741 ScopedLock rwl(&reply_window_m_);
742 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
743 for (it = clt->second.begin(); it != clt->second.end(); it++){
749 reply_window_.clear();
754 rpcs::rpcbind(int a, int &r)
756 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
762 marshall::rawbyte(unsigned char x)
766 VERIFY (_buf != NULL);
767 _buf = (char *)realloc(_buf, _capa);
774 marshall::rawbytes(const char *p, int n)
776 if((_ind+n) > _capa){
777 _capa = _capa > n? 2*_capa:(_capa+n);
778 VERIFY (_buf != NULL);
779 _buf = (char *)realloc(_buf, _capa);
782 memcpy(_buf+_ind, p, n);
787 operator<<(marshall &m, bool x)
794 operator<<(marshall &m, unsigned char x)
801 operator<<(marshall &m, char x)
803 m << (unsigned char) x;
809 operator<<(marshall &m, unsigned short x)
811 m.rawbyte((x >> 8) & 0xff);
817 operator<<(marshall &m, short x)
819 m << (unsigned short) x;
824 operator<<(marshall &m, unsigned int x)
826 // network order is big-endian
827 m.rawbyte((x >> 24) & 0xff);
828 m.rawbyte((x >> 16) & 0xff);
829 m.rawbyte((x >> 8) & 0xff);
835 operator<<(marshall &m, int x)
837 m << (unsigned int) x;
842 operator<<(marshall &m, const std::string &s)
844 m << (unsigned int) s.size();
845 m.rawbytes(s.data(), s.size());
850 operator<<(marshall &m, unsigned long long x)
852 m << (unsigned int) (x >> 32);
853 m << (unsigned int) x;
858 marshall::pack(int x)
860 rawbyte((x >> 24) & 0xff);
861 rawbyte((x >> 16) & 0xff);
862 rawbyte((x >> 8) & 0xff);
867 unmarshall::unpack(int *x)
869 (*x) = (rawbyte() & 0xff) << 24;
870 (*x) |= (rawbyte() & 0xff) << 16;
871 (*x) |= (rawbyte() & 0xff) << 8;
872 (*x) |= rawbyte() & 0xff;
875 // take the contents from another unmarshall object
877 unmarshall::take_in(unmarshall &another)
881 another.take_buf(&_buf, &_sz);
882 _ind = RPC_HEADER_SZ;
883 _ok = _sz >= RPC_HEADER_SZ?true:false;
889 if(ok() && _ind == _sz){
897 unmarshall::rawbyte()
908 operator>>(unmarshall &u, bool &x)
910 x = (bool) u.rawbyte() ;
915 operator>>(unmarshall &u, unsigned char &x)
917 x = (unsigned char) u.rawbyte() ;
922 operator>>(unmarshall &u, char &x)
924 x = (char) u.rawbyte();
930 operator>>(unmarshall &u, unsigned short &x)
932 x = (u.rawbyte() & 0xff) << 8;
933 x |= u.rawbyte() & 0xff;
938 operator>>(unmarshall &u, short &x)
940 x = (u.rawbyte() & 0xff) << 8;
941 x |= u.rawbyte() & 0xff;
946 operator>>(unmarshall &u, unsigned int &x)
948 x = (u.rawbyte() & 0xff) << 24;
949 x |= (u.rawbyte() & 0xff) << 16;
950 x |= (u.rawbyte() & 0xff) << 8;
951 x |= u.rawbyte() & 0xff;
956 operator>>(unmarshall &u, int &x)
958 x = (u.rawbyte() & 0xff) << 24;
959 x |= (u.rawbyte() & 0xff) << 16;
960 x |= (u.rawbyte() & 0xff) << 8;
961 x |= u.rawbyte() & 0xff;
966 operator>>(unmarshall &u, unsigned long long &x)
971 x = l | ((unsigned long long) h << 32);
976 operator>>(unmarshall &u, std::string &s)
986 unmarshall::rawbytes(std::string &ss, unsigned int n)
988 if((_ind+n) > (unsigned)_sz){
991 std::string tmps = std::string(_buf+_ind, n);
993 VERIFY(ss.size() == n);
998 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
999 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
1000 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
1001 ((a.sin_port < b.sin_port))));
1004 /*---------------auxilary function--------------*/
1006 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
1009 const char *localhost = "127.0.0.1";
1010 const char *port = index(hostandport, ':');
1012 memcpy(host, localhost, strlen(localhost)+1);
1015 memcpy(host, hostandport, port-hostandport);
1016 host[port-hostandport] = '\0';
1020 make_sockaddr(host, port, dst);
1025 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1029 bzero(dst, sizeof(*dst));
1030 dst->sin_family = AF_INET;
1032 a = inet_addr(host);
1033 if(a != INADDR_NONE){
1034 dst->sin_addr.s_addr = a;
1036 struct hostent *hp = gethostbyname(host);
1037 if(hp == 0 || hp->h_length != 4){
1038 fprintf(stderr, "cannot find host name %s\n", host);
1041 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1043 dst->sin_port = htons(atoi(port));
1047 cmp_timespec(const struct timespec &a, const struct timespec &b)
1049 if(a.tv_sec > b.tv_sec)
1051 else if(a.tv_sec < b.tv_sec)
1054 if(a.tv_nsec > b.tv_nsec)
1056 else if(a.tv_nsec < b.tv_nsec)
1064 add_timespec(const struct timespec &a, int b, struct timespec *result)
1066 // convert to millisec, add timeout, convert back
1067 result->tv_sec = a.tv_sec + b/1000;
1068 result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
1069 VERIFY(result->tv_nsec >= 0);
1070 while (result->tv_nsec > 1000000000){
1072 result->tv_nsec-=1000000000;
1077 diff_timespec(const struct timespec &end, const struct timespec &start)
1079 int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
1080 VERIFY(diff || end.tv_sec == start.tv_sec);
1081 if(end.tv_nsec > start.tv_nsec){
1082 diff += (end.tv_nsec-start.tv_nsec)/1000000;
1084 diff -= (start.tv_nsec-end.tv_nsec)/1000000;