2 The rpcc class handles client-side RPC. Each rpcc is bound to a
3 single RPC server. The jobs of rpcc include maintaining a connection to
4 server, sending RPC requests and waiting for responses, retransmissions,
5 at-most-once delivery etc.
7 The rpcs class handles the server side of RPC. Each rpcs handles multiple
8 connections from different rpcc objects. The jobs of rpcs include accepting
9 connections, dispatching requests to registered RPC handlers, at-most-once
12 Both rpcc and rpcs use the connection class as an abstraction for the
13 underlying communication channel. To send an RPC request/reply, one calls
14 connection::send() which blocks until data is sent or the connection has failed
15 (thus the caller can free the buffer when send() returns). When a
16 request/reply is received, connection makes a callback into the corresponding
17 rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
20 rpcc uses application threads to send RPC requests and blocks to receive the
21 reply or error. All connections use a single PollMgr object to perform async
22 socket IO. PollMgr creates a single thread to examine the readiness of socket
23 file descriptors and informs the corresponding connection whenever a socket is
24 ready to be read or written. (We use asynchronous socket IO to reduce the
25 number of threads needed to manage these connections; without async IO, at
26 least one thread is needed per connection to read data without blocking other
27 activities.) Each rpcs object creates one thread for listening on the server
28 port and a pool of threads for executing RPC requests. The
29 thread pool allows us to control the number of threads spawned at the server
30 (spawning one thread per request will hurt when the server faces thousands of
33 In order to delete a connection object, we must maintain a reference count.
35 multiple client threads might be invoking the rpcc::call() functions and thus
36 holding multiple references to the underlying connection object. For rpcs,
37 multiple dispatch threads might be holding references to the same connection
38 object. A connection object is deleted only when the underlying connection is
39 dead and the reference count reaches zero.
41 The previous version of the RPC library uses pthread_cancel* routines
42 to implement the deletion of rpcc and rpcs objects. The idea is to cancel
43 all active threads that might be holding a reference to an object before
44 deleting that object. However, pthread_cancel is not robust and there are
45 always bugs where outstanding references to deleted objects persist.
46 This version of the RPC library does not do pthread_cancel, but explicitly
47 joins exited threads to make sure no outstanding references exist before
50 To delete a rpcc object safely, the users of the library must ensure that
51 there are no outstanding calls on the rpcc object.
53 To delete a rpcs object safely, we do the following in sequence: 1. stop
54 accepting new incoming connections. 2. close existing active connections.
55 3. delete the dispatch thread pool which involves waiting for current active
56 RPC handlers to finish. It is interesting how a thread pool can be deleted
57 without using thread cancellation. The trick is to inject x "poison pills" for
58 a thread pool of x threads. Upon getting a poison pill instead of a normal
59 task, a worker thread will exit (and thread pool destructor waits to join all
60 x exited worker threads).
64 #include "method_thread.h"
67 #include <sys/types.h>
68 #include <arpa/inet.h>
69 #include <netinet/tcp.h>
75 #include "lang/verify.h"
77 const rpcc::TO rpcc::to_max = { 120000 };
78 const rpcc::TO rpcc::to_min = { 1000 };
80 rpcc::caller::caller(unsigned int xxid, unmarshall *xun)
81 : xid(xxid), un(xun), done(false)
83 VERIFY(pthread_mutex_init(&m,0) == 0);
84 VERIFY(pthread_cond_init(&c, 0) == 0);
87 rpcc::caller::~caller()
89 VERIFY(pthread_mutex_destroy(&m) == 0);
90 VERIFY(pthread_cond_destroy(&c) == 0);
97 clock_gettime(CLOCK_REALTIME, &ts);
98 srandom((int)ts.tv_nsec^((int)getpid()));
101 rpcc::rpcc(sockaddr_in d, bool retrans) :
102 dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
103 retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1)
105 VERIFY(pthread_mutex_init(&m_, 0) == 0);
106 VERIFY(pthread_mutex_init(&chan_m_, 0) == 0);
107 VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0);
111 clt_nonce_ = random();
113 // special client nonce 0 means this client does not
114 // require at-most-once logic from the server
115 // because it uses tcp and never retries a failed connection
119 char *loss_env = getenv("RPC_LOSSY");
120 if(loss_env != NULL){
121 lossytest_ = atoi(loss_env);
124 // xid starts with 1 and latest received reply starts with 0
125 xid_rep_window_.push_back(0);
127 jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n",
128 clt_nonce_, lossytest_);
131 // IMPORTANT: destruction should happen only when no external threads
132 // are blocked inside rpcc or will use rpcc in the future
135 jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n",
136 clt_nonce_, chan_?chan_->channo():-1);
141 VERIFY(calls_.size() == 0);
142 VERIFY(pthread_mutex_destroy(&m_) == 0);
143 VERIFY(pthread_mutex_destroy(&chan_m_) == 0);
150 int ret = call(rpc_const::bind, 0, r, to);
156 jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n",
157 inet_ntoa(dst_.sin_addr), ret);
162 // Cancel all outstanding calls
167 printf("rpcc::cancel: force callers to fail\n");
168 std::map<int,caller*>::iterator iter;
169 for(iter = calls_.begin(); iter != calls_.end(); iter++){
170 caller *ca = iter->second;
172 jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n");
174 ScopedLock cl(&ca->m);
176 ca->intret = rpc_const::cancel_failure;
177 VERIFY(pthread_cond_signal(&ca->c) == 0);
181 while (calls_.size () > 0){
182 destroy_wait_ = true;
183 VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0);
185 printf("rpcc::cancel: done\n");
189 rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep,
198 if((proc != rpc_const::bind && !bind_done_) ||
199 (proc == rpc_const::bind && bind_done_)){
200 jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n");
201 return rpc_const::bind_failure;
205 return rpc_const::cancel_failure;
209 calls_[ca.xid] = &ca;
211 req_header h(ca.xid, proc, clt_nonce_, srv_nonce_,
212 xid_rep_window_.front());
213 req.pack_req_header(h);
214 xid_rep = xid_rep_window_.front();
218 struct timespec now, nextdeadline, finaldeadline;
220 clock_gettime(CLOCK_REALTIME, &now);
221 add_timespec(now, to.to, &finaldeadline);
222 curr_to.to = to_min.to;
224 bool transmit = true;
225 connection *ch = NULL;
235 if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) {
240 if (forgot.isvalid())
241 ch->send((char *)forgot.buf.c_str(), forgot.buf.size());
242 ch->send(req.cstr(), req.size());
244 else jsl_log(JSL_DBG_1, "not reachable\n");
246 "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n",
247 clt_nonce_, proc, ca.xid, clt_nonce_);
249 transmit = false; // only send once on a given channel
252 if(!finaldeadline.tv_sec)
255 clock_gettime(CLOCK_REALTIME, &now);
256 add_timespec(now, curr_to.to, &nextdeadline);
257 if(cmp_timespec(nextdeadline,finaldeadline) > 0){
258 nextdeadline = finaldeadline;
259 finaldeadline.tv_sec = 0;
263 ScopedLock cal(&ca.m);
265 jsl_log(JSL_DBG_2, "rpcc:call1: wait\n");
266 if(pthread_cond_timedwait(&ca.c, &ca.m,
267 &nextdeadline) == ETIMEDOUT){
268 jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n");
273 jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n");
278 if(retrans_ && (!ch || ch->isdead())){
279 // since connection is dead, retransmit
280 // on the new connection
287 // no locking of ca.m since only this thread changes ca.xid
289 calls_.erase(ca.xid);
290 // may need to update the xid again here, in case the
291 // packet times out before it's even sent by the channel.
292 // I don't think there's any harm in maybe doing it twice
293 update_xid_rep(ca.xid);
296 VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0);
300 if (ca.done && lossytest_)
303 if (!dup_req_.isvalid()) {
304 dup_req_.buf.assign(req.cstr(), req.size());
305 dup_req_.xid = ca.xid;
307 if (xid_rep > xid_rep_done_)
308 xid_rep_done_ = xid_rep;
311 ScopedLock cal(&ca.m);
314 "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n",
315 clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr),
316 ntohs(dst_.sin_port), ca.done, ca.intret);
321 // destruction of req automatically frees its buffer
322 return (ca.done? ca.intret : rpc_const::timeout_failure);
326 rpcc::get_refconn(connection **ch)
328 ScopedLock ml(&chan_m_);
329 if(!chan_ || chan_->isdead()){
332 chan_ = connect_to_dst(dst_, this, lossytest_);
343 // PollMgr's thread is being used to
344 // make this upcall from connection object to rpcc.
345 // this funtion must not block.
347 // this function keeps no reference for connection *c
349 rpcc::got_pdu(connection *c, char *b, int sz)
351 unmarshall rep(b, sz);
353 rep.unpack_reply_header(&h);
356 jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n");
362 update_xid_rep(h.xid);
364 if(calls_.find(h.xid) == calls_.end()){
365 jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid);
368 caller *ca = calls_[h.xid];
370 ScopedLock cl(&ca->m);
372 ca->un->take_in(rep);
375 jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n",
380 VERIFY(pthread_cond_broadcast(&ca->c) == 0);
384 // assumes thread holds mutex m
386 rpcc::update_xid_rep(unsigned int xid)
388 std::list<unsigned int>::iterator it;
390 if(xid <= xid_rep_window_.front()){
394 for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){
396 xid_rep_window_.insert(it, xid);
400 xid_rep_window_.push_back(xid);
403 it = xid_rep_window_.begin();
404 for (it++; it != xid_rep_window_.end(); it++){
405 while (xid_rep_window_.front() + 1 == *it)
406 xid_rep_window_.pop_front();
411 rpcs::rpcs(unsigned int p1, int count)
412 : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
414 VERIFY(pthread_mutex_init(&procs_m_, 0) == 0);
415 VERIFY(pthread_mutex_init(&count_m_, 0) == 0);
416 VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0);
417 VERIFY(pthread_mutex_init(&conss_m_, 0) == 0);
421 jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_);
423 char *loss_env = getenv("RPC_LOSSY");
424 if(loss_env != NULL){
425 lossytest_ = atoi(loss_env);
428 reg(rpc_const::bind, this, &rpcs::rpcbind);
429 dispatchpool_ = new ThrPool(6,false);
431 listener_ = new tcpsconn(this, port_, lossytest_);
436 // must delete listener before dispatchpool
438 delete dispatchpool_;
443 rpcs::got_pdu(connection *c, char *b, int sz)
446 jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
450 djob_t *j = new djob_t(c, b, sz);
452 bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j);
453 if(!succ || !reachable_){
461 rpcs::reg1(unsigned int proc, handler *h)
463 ScopedLock pl(&procs_m_);
464 VERIFY(procs_.count(proc) == 0);
466 VERIFY(procs_.count(proc) >= 1);
470 rpcs::updatestat(unsigned int proc)
472 ScopedLock cl(&count_m_);
475 if(curr_counts_ == 0){
476 std::map<int, int>::iterator i;
477 printf("RPC STATS: ");
478 for (i = counts_.begin(); i != counts_.end(); i++){
479 printf("%x:%d ", i->first, i->second);
483 ScopedLock rwl(&reply_window_m_);
484 std::map<unsigned int,std::list<reply_t> >::iterator clt;
486 unsigned int totalrep = 0, maxrep = 0;
487 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
488 totalrep += clt->second.size();
489 if(clt->second.size() > maxrep)
490 maxrep = clt->second.size();
492 jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n",
493 (int) reply_window_.size()-1, totalrep, maxrep);
494 curr_counts_ = counting_;
499 rpcs::dispatch(djob_t *j)
501 connection *c = j->conn;
502 unmarshall req(j->buf, j->sz);
506 req.unpack_req_header(&h);
510 jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n");
516 "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n",
517 h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce);
520 reply_header rh(h.xid,0);
522 // is client sending to an old instance of server?
523 if(h.srv_nonce != 0 && h.srv_nonce != nonce_){
525 "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n",
526 h.srv_nonce, nonce_, h.proc);
527 rh.ret = rpc_const::oldsrv_failure;
528 rep.pack_reply_header(rh);
529 c->send(rep.cstr(),rep.size());
534 // is RPC proc a registered procedure?
536 ScopedLock pl(&procs_m_);
537 if(procs_.count(proc) < 1){
538 fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n",
548 rpcs::rpcstate_t stat;
553 // have i seen this client before?
555 ScopedLock rwl(&reply_window_m_);
556 // if we don't know about this clt_nonce, create a cleanup object
557 if(reply_window_.find(h.clt_nonce) == reply_window_.end()){
558 VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
559 reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
561 "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n",
562 h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()-1);
566 // save the latest good connection to the client
568 ScopedLock rwl(&conss_m_);
569 if(conns_.find(h.clt_nonce) == conns_.end()){
571 conns_[h.clt_nonce] = c;
572 } else if(conns_[h.clt_nonce]->compare(c) < 0){
573 conns_[h.clt_nonce]->decref();
575 conns_[h.clt_nonce] = c;
579 stat = checkduplicate_and_update(h.clt_nonce, h.xid,
580 h.xid_rep, &b1, &sz1);
582 // this client does not require at most once logic
587 case NEW: // new request
592 rh.ret = f->fn(req, rep);
593 if (rh.ret == rpc_const::unmarshal_args_failure) {
594 fprintf(stderr, "rpcs::dispatch: failed to"
595 " unmarshall the arguments. You are"
596 " probably calling RPC 0x%x with wrong"
597 " types of arguments.\n", proc);
602 rep.pack_reply_header(rh);
603 rep.take_buf(&b1,&sz1);
606 "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n",
607 sz1, h.xid, proc, rh.ret, h.clt_nonce);
610 // only record replies for clients that require at-most-once logic
611 add_reply(h.clt_nonce, h.xid, b1, sz1);
614 // get the latest connection to the client
616 ScopedLock rwl(&conss_m_);
617 if(c->isdead() && c != conns_[h.clt_nonce]){
619 c = conns_[h.clt_nonce];
625 if(h.clt_nonce == 0){
626 // reply is not added to at-most-once window, free it
630 case INPROGRESS: // server is working on this request
632 case DONE: // duplicate and we still have the response
635 case FORGOTTEN: // very old request and we don't have the response anymore
636 jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n",
638 rh.ret = rpc_const::atmostonce_failure;
639 rep.pack_reply_header(rh);
640 c->send(rep.cstr(),rep.size());
646 // rpcs::dispatch calls this when an RPC request arrives.
648 // checks to see if an RPC with xid from clt_nonce has already been received.
649 // if not, remembers the request in reply_window_.
651 // deletes remembered requests with XIDs <= xid_rep; the client
652 // says it has received a reply for every RPC up through xid_rep.
653 // frees the reply_t::buf of each such request.
656 // NEW: never seen this xid before.
657 // INPROGRESS: seen this xid, and still processing it.
658 // DONE: seen this xid, previous reply returned in *b and *sz.
659 // FORGOTTEN: might have seen this xid, but deleted previous reply.
661 rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
662 unsigned int xid_rep, char **b, int *sz)
664 ScopedLock rwl(&reply_window_m_);
666 std::list<reply_t> &l = reply_window_[clt_nonce];
668 VERIFY(l.size() > 0);
669 VERIFY(xid >= xid_rep);
671 unsigned int past_xid_rep = l.begin()->xid;
673 std::list<reply_t>::iterator start = l.begin(), it;
676 if (past_xid_rep < xid_rep || past_xid_rep == (unsigned int)-1) {
677 // scan for deletion candidates
678 for (; it != l.end() && it->xid < xid_rep; it++) {
683 l.begin()->xid = xid_rep;
686 if (xid < past_xid_rep && past_xid_rep != (unsigned int)-1)
689 // skip non-deletion candidates
690 while (it != l.end() && it->xid < xid)
693 // if it's in the list it must be right here
694 if (it != l.end() && it->xid == xid) {
695 if (it->cb_present) {
696 // return information about the remembered reply
704 // remember that a new request has arrived
705 l.insert(it, reply_t(xid));
710 // rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
711 // and passes the return value in b and sz.
712 // add_reply() should remember b and sz.
713 // free_reply_window() and checkduplicate_and_update is responsible for
716 rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
719 ScopedLock rwl(&reply_window_m_);
720 // remember the RPC reply value
721 std::list<reply_t> &l = reply_window_[clt_nonce];
722 std::list<reply_t>::iterator it = l.begin();
723 // skip to our place in the list
724 for (it++; it != l.end() && it->xid < xid; it++);
725 // there should already be an entry, so whine if there isn't
726 if (it == l.end() || it->xid != xid) {
727 fprintf(stderr, "Could not find reply struct in add_reply");
728 l.insert(it, reply_t(xid, b, sz));
730 *it = reply_t(xid, b, sz);
735 rpcs::free_reply_window(void)
737 std::map<unsigned int,std::list<reply_t> >::iterator clt;
738 std::list<reply_t>::iterator it;
740 ScopedLock rwl(&reply_window_m_);
741 for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){
742 for (it = clt->second.begin(); it != clt->second.end(); it++){
748 reply_window_.clear();
753 rpcs::rpcbind(int a, int &r)
755 jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
761 marshall::rawbyte(unsigned char x)
765 VERIFY (_buf != NULL);
766 _buf = (char *)realloc(_buf, _capa);
773 marshall::rawbytes(const char *p, int n)
775 if((_ind+n) > _capa){
776 _capa = _capa > n? 2*_capa:(_capa+n);
777 VERIFY (_buf != NULL);
778 _buf = (char *)realloc(_buf, _capa);
781 memcpy(_buf+_ind, p, n);
786 operator<<(marshall &m, bool x)
793 operator<<(marshall &m, unsigned char x)
800 operator<<(marshall &m, char x)
802 m << (unsigned char) x;
808 operator<<(marshall &m, unsigned short x)
810 m.rawbyte((x >> 8) & 0xff);
816 operator<<(marshall &m, short x)
818 m << (unsigned short) x;
823 operator<<(marshall &m, unsigned int x)
825 // network order is big-endian
826 m.rawbyte((x >> 24) & 0xff);
827 m.rawbyte((x >> 16) & 0xff);
828 m.rawbyte((x >> 8) & 0xff);
834 operator<<(marshall &m, int x)
836 m << (unsigned int) x;
841 operator<<(marshall &m, const std::string &s)
843 m << (unsigned int) s.size();
844 m.rawbytes(s.data(), s.size());
849 operator<<(marshall &m, unsigned long long x)
851 m << (unsigned int) (x >> 32);
852 m << (unsigned int) x;
857 marshall::pack(int x)
859 rawbyte((x >> 24) & 0xff);
860 rawbyte((x >> 16) & 0xff);
861 rawbyte((x >> 8) & 0xff);
866 unmarshall::unpack(int *x)
868 (*x) = (rawbyte() & 0xff) << 24;
869 (*x) |= (rawbyte() & 0xff) << 16;
870 (*x) |= (rawbyte() & 0xff) << 8;
871 (*x) |= rawbyte() & 0xff;
874 // take the contents from another unmarshall object
876 unmarshall::take_in(unmarshall &another)
880 another.take_buf(&_buf, &_sz);
881 _ind = RPC_HEADER_SZ;
882 _ok = _sz >= RPC_HEADER_SZ?true:false;
888 if(ok() && _ind == _sz){
896 unmarshall::rawbyte()
907 operator>>(unmarshall &u, bool &x)
909 x = (bool) u.rawbyte() ;
914 operator>>(unmarshall &u, unsigned char &x)
916 x = (unsigned char) u.rawbyte() ;
921 operator>>(unmarshall &u, char &x)
923 x = (char) u.rawbyte();
929 operator>>(unmarshall &u, unsigned short &x)
931 x = (u.rawbyte() & 0xff) << 8;
932 x |= u.rawbyte() & 0xff;
937 operator>>(unmarshall &u, short &x)
939 x = (u.rawbyte() & 0xff) << 8;
940 x |= u.rawbyte() & 0xff;
945 operator>>(unmarshall &u, unsigned int &x)
947 x = (u.rawbyte() & 0xff) << 24;
948 x |= (u.rawbyte() & 0xff) << 16;
949 x |= (u.rawbyte() & 0xff) << 8;
950 x |= u.rawbyte() & 0xff;
955 operator>>(unmarshall &u, int &x)
957 x = (u.rawbyte() & 0xff) << 24;
958 x |= (u.rawbyte() & 0xff) << 16;
959 x |= (u.rawbyte() & 0xff) << 8;
960 x |= u.rawbyte() & 0xff;
965 operator>>(unmarshall &u, unsigned long long &x)
970 x = l | ((unsigned long long) h << 32);
975 operator>>(unmarshall &u, std::string &s)
985 unmarshall::rawbytes(std::string &ss, unsigned int n)
987 if((_ind+n) > (unsigned)_sz){
990 std::string tmps = std::string(_buf+_ind, n);
992 VERIFY(ss.size() == n);
997 bool operator<(const sockaddr_in &a, const sockaddr_in &b){
998 return ((a.sin_addr.s_addr < b.sin_addr.s_addr) ||
999 ((a.sin_addr.s_addr == b.sin_addr.s_addr) &&
1000 ((a.sin_port < b.sin_port))));
1003 /*---------------auxilary function--------------*/
1005 make_sockaddr(const char *hostandport, struct sockaddr_in *dst){
1008 const char *localhost = "127.0.0.1";
1009 const char *port = index(hostandport, ':');
1011 memcpy(host, localhost, strlen(localhost)+1);
1014 memcpy(host, hostandport, port-hostandport);
1015 host[port-hostandport] = '\0';
1019 make_sockaddr(host, port, dst);
1024 make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){
1028 bzero(dst, sizeof(*dst));
1029 dst->sin_family = AF_INET;
1031 a = inet_addr(host);
1032 if(a != INADDR_NONE){
1033 dst->sin_addr.s_addr = a;
1035 struct hostent *hp = gethostbyname(host);
1036 if(hp == 0 || hp->h_length != 4){
1037 fprintf(stderr, "cannot find host name %s\n", host);
1040 dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr;
1042 dst->sin_port = htons(atoi(port));
1046 cmp_timespec(const struct timespec &a, const struct timespec &b)
1048 if(a.tv_sec > b.tv_sec)
1050 else if(a.tv_sec < b.tv_sec)
1053 if(a.tv_nsec > b.tv_nsec)
1055 else if(a.tv_nsec < b.tv_nsec)
1063 add_timespec(const struct timespec &a, int b, struct timespec *result)
1065 // convert to millisec, add timeout, convert back
1066 result->tv_sec = a.tv_sec + b/1000;
1067 result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000;
1068 VERIFY(result->tv_nsec >= 0);
1069 while (result->tv_nsec > 1000000000){
1071 result->tv_nsec-=1000000000;
1076 diff_timespec(const struct timespec &end, const struct timespec &start)
1078 int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0;
1079 VERIFY(diff || end.tv_sec == start.tv_sec);
1080 if(end.tv_nsec > start.tv_nsec){
1081 diff += (end.tv_nsec-start.tv_nsec)/1000000;
1083 diff -= (start.tv_nsec-end.tv_nsec)/1000000;