fec7a4c56fee776bbafa16b8fe334247ec98f6fc
[invirt/third/libt4.git] / rpc / connection.cc
1 #include <fcntl.h>
2 #include <sys/types.h>
3 #include <sys/time.h>
4 #include <netinet/tcp.h>
5 #include <errno.h>
6 #include <signal.h>
7 #include <unistd.h>
8
9 #include "connection.h"
10 #include "pollmgr.h"
11 #include "jsl_log.h"
12 #include "lang/verify.h"
13 #include "lock.h"
14
15 #define MAX_PDU (10<<20) //maximum PDF is 10M
16
17
18 connection::connection(chanmgr *m1, int f1, int l1)
19 : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
20 {
21
22     int flags = fcntl(fd_, F_GETFL, NULL);
23     flags |= O_NONBLOCK;
24     fcntl(fd_, F_SETFL, flags);
25
26     signal(SIGPIPE, SIG_IGN);
27
28     VERIFY(gettimeofday(&create_time_, NULL) == 0);
29
30     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
31 }
32
33 connection::~connection()
34 {
35     VERIFY(dead_);
36     if (rpdu_.buf)
37         free(rpdu_.buf);
38     VERIFY(!wpdu_.buf);
39     close(fd_);
40 }
41
42 void
43 connection::incref()
44 {
45     lock rl(ref_m_);
46     refno_++;
47 }
48
49 bool
50 connection::isdead()
51 {
52     lock ml(m_);
53     return dead_;
54 }
55
56 void
57 connection::closeconn()
58 {
59     {
60         lock ml(m_);
61         if (!dead_) {
62             dead_ = true;
63             shutdown(fd_,SHUT_RDWR);
64         } else {
65             return;
66         }
67     }
68     //after block_remove_fd, select will never wait on fd_
69     //and no callbacks will be active
70     PollMgr::Instance()->block_remove_fd(fd_);
71 }
72
73 void
74 connection::decref()
75 {
76     bool dead = false;
77     {
78         lock rl(ref_m_);
79         refno_--;
80         VERIFY(refno_>=0);
81         if (refno_==0) {
82             lock ml(m_);
83             dead = dead_;
84         }
85     }
86     if (dead) {
87         delete this;
88     }
89 }
90
91 int
92 connection::ref()
93 {
94     lock rl(ref_m_);
95         return refno_;
96 }
97
98 int
99 connection::compare(connection *another)
100 {
101     if (create_time_.tv_sec > another->create_time_.tv_sec)
102         return 1;
103     if (create_time_.tv_sec < another->create_time_.tv_sec)
104         return -1;
105     if (create_time_.tv_usec > another->create_time_.tv_usec)
106         return 1;
107     if (create_time_.tv_usec < another->create_time_.tv_usec)
108         return -1;
109     return 0;
110 }
111
112 bool
113 connection::send(char *b, int sz)
114 {
115     lock ml(m_);
116         waiters_++;
117         while (!dead_ && wpdu_.buf) {
118         send_wait_.wait(ml);
119         }
120         waiters_--;
121         if (dead_) {
122                 return false;
123         }
124         wpdu_.buf = b;
125         wpdu_.sz = sz;
126         wpdu_.solong = 0;
127
128         if (lossy_) {
129                 if ((random()%100) < lossy_) {
130                         jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
131                         shutdown(fd_,SHUT_RDWR);
132                 }
133         }
134
135         if (!writepdu()) {
136                 dead_ = true;
137         ml.unlock();
138                 PollMgr::Instance()->block_remove_fd(fd_);
139         ml.lock();
140         }else{
141                 if (wpdu_.solong == wpdu_.sz) {
142                 }else{
143                         //should be rare to need to explicitly add write callback
144                         PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
145                         while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
146                 send_complete_.wait(ml);
147                         }
148                 }
149         }
150         bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
151         wpdu_.solong = wpdu_.sz = 0;
152         wpdu_.buf = NULL;
153         if (waiters_ > 0)
154         send_wait_.notify_all();
155         return ret;
156 }
157
158 //fd_ is ready to be written
159 void
160 connection::write_cb(int s)
161 {
162     lock ml(m_);
163         VERIFY(!dead_);
164         VERIFY(fd_ == s);
165         if (wpdu_.sz == 0) {
166                 PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
167                 return;
168         }
169         if (!writepdu()) {
170                 PollMgr::Instance()->del_callback(fd_, CB_RDWR);
171                 dead_ = true;
172         } else {
173                 VERIFY(wpdu_.solong >= 0);
174                 if (wpdu_.solong < wpdu_.sz) {
175                         return;
176                 }
177     }
178         send_complete_.notify_one();
179 }
180
181 //fd_ is ready to be read
182 void
183 connection::read_cb(int s)
184 {
185     lock ml(m_);
186         VERIFY(fd_ == s);
187         if (dead_)  {
188                 return;
189         }
190
191         bool succ = true;
192         if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
193                 succ = readpdu();
194         }
195
196         if (!succ) {
197                 PollMgr::Instance()->del_callback(fd_,CB_RDWR);
198                 dead_ = true;
199                 send_complete_.notify_one();
200         }
201
202         if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
203                 if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
204                         //chanmgr has successfully consumed the pdu
205                         rpdu_.buf = NULL;
206                         rpdu_.sz = rpdu_.solong = 0;
207                 }
208         }
209 }
210
211 bool
212 connection::writepdu()
213 {
214         VERIFY(wpdu_.solong >= 0);
215         if (wpdu_.solong == wpdu_.sz)
216                 return true;
217
218         if (wpdu_.solong == 0) {
219                 int sz = htonl(wpdu_.sz);
220                 bcopy(&sz,wpdu_.buf,sizeof(sz));
221         }
222         int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
223         if (n < 0) {
224                 if (errno != EAGAIN) {
225                         jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
226                         wpdu_.solong = -1;
227                         wpdu_.sz = 0;
228                 }
229                 return (errno == EAGAIN);
230         }
231         wpdu_.solong += n;
232         return true;
233 }
234
235 bool
236 connection::readpdu()
237 {
238         if (!rpdu_.sz) {
239                 int sz, sz1;
240                 int n = read(fd_, &sz1, sizeof(sz1));
241
242                 if (n == 0) {
243                         return false;
244                 }
245
246                 if (n < 0) {
247                         VERIFY(errno!=EAGAIN);
248                         return false;
249                 }
250
251                 if (n >0 && n!= sizeof(sz)) {
252                         jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
253                         return false;
254                 }
255
256                 sz = ntohl(sz1);
257
258                 if (sz > MAX_PDU) {
259                         char *tmpb = (char *)&sz1;
260                         jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
261                                         sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
262                         return false;
263                 }
264
265                 rpdu_.sz = sz;
266                 VERIFY(rpdu_.buf == NULL);
267                 rpdu_.buf = (char *)malloc(sz+sizeof(sz));
268                 VERIFY(rpdu_.buf);
269                 bcopy(&sz1,rpdu_.buf,sizeof(sz));
270                 rpdu_.solong = sizeof(sz);
271         }
272
273         int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
274         if (n <= 0) {
275                 if (errno == EAGAIN)
276                         return true;
277                 if (rpdu_.buf)
278                         free(rpdu_.buf);
279                 rpdu_.buf = NULL;
280                 rpdu_.sz = rpdu_.solong = 0;
281                 return (errno == EAGAIN);
282         }
283         rpdu_.solong += n;
284         return true;
285 }
286
287 tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
288 : mgr_(m1), lossy_(lossytest)
289 {
290         struct sockaddr_in sin;
291         memset(&sin, 0, sizeof(sin));
292         sin.sin_family = AF_INET;
293         sin.sin_port = htons(port);
294
295         tcp_ = socket(AF_INET, SOCK_STREAM, 0);
296         if(tcp_ < 0){
297                 perror("tcpsconn::tcpsconn accept_loop socket:");
298                 VERIFY(0);
299         }
300
301         int yes = 1;
302         setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
303         setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
304
305         if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){
306                 perror("accept_loop tcp bind:");
307                 VERIFY(0);
308         }
309
310         if(listen(tcp_, 1000) < 0) {
311                 perror("tcpsconn::tcpsconn listen:");
312                 VERIFY(0);
313         }
314
315     socklen_t addrlen = sizeof(sin);
316     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
317     port_ = ntohs(sin.sin_port);
318
319         jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
320                 sin.sin_port);
321
322         if (pipe(pipe_) < 0) {
323                 perror("accept_loop pipe:");
324                 VERIFY(0);
325         }
326
327         int flags = fcntl(pipe_[0], F_GETFL, NULL);
328         flags |= O_NONBLOCK;
329         fcntl(pipe_[0], F_SETFL, flags);
330
331     th_ = std::thread(&tcpsconn::accept_conn, this);
332 }
333
334 tcpsconn::~tcpsconn()
335 {
336         VERIFY(close(pipe_[1]) == 0);
337     th_.join();
338
339         //close all the active connections
340         std::map<int, connection *>::iterator i;
341         for (i = conns_.begin(); i != conns_.end(); i++) {
342                 i->second->closeconn();
343                 i->second->decref();
344         }
345 }
346
347 void
348 tcpsconn::process_accept()
349 {
350         sockaddr_in sin;
351         socklen_t slen = sizeof(sin);
352         int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
353         if (s1 < 0) {
354                 perror("tcpsconn::accept_conn error");
355                 throw thread_exit_exception();
356         }
357
358         jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
359                         s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
360         connection *ch = new connection(mgr_, s1, lossy_);
361
362         // garbage collect all dead connections with refcount of 1
363         std::map<int, connection *>::iterator i;
364         for (i = conns_.begin(); i != conns_.end();) {
365                 if (i->second->isdead() && i->second->ref() == 1) {
366                         jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
367                                         i->second->channo());
368                         i->second->decref();
369                         // Careful not to reuse i right after erase. (i++) will
370                         // be evaluated before the erase call because in C++,
371                         // there is a sequence point before a function call.
372                         // See http://en.wikipedia.org/wiki/Sequence_point.
373                         conns_.erase(i++);
374                 } else
375                         ++i;
376         }
377
378         conns_[ch->channo()] = ch;
379 }
380
381 void
382 tcpsconn::accept_conn()
383 {
384         fd_set rfds;
385         int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
386
387     try {
388
389         while (1) {
390             FD_ZERO(&rfds);
391             FD_SET(pipe_[0], &rfds);
392             FD_SET(tcp_, &rfds);
393
394             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
395
396             if (ret < 0) {
397                 if (errno == EINTR) {
398                     continue;
399                 } else {
400                     perror("accept_conn select:");
401                     jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
402                     VERIFY(0);
403                 }
404             }
405
406             if (FD_ISSET(pipe_[0], &rfds)) {
407                 close(pipe_[0]);
408                 close(tcp_);
409                 return;
410             }
411             else if (FD_ISSET(tcp_, &rfds)) {
412                 process_accept();
413             } else {
414                 VERIFY(0);
415             }
416         }
417     }
418     catch (thread_exit_exception e)
419     {
420         return;
421     }
422 }
423
424 connection *
425 connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
426 {
427         int s= socket(AF_INET, SOCK_STREAM, 0);
428         int yes = 1;
429         setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
430         if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
431                 jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
432                                 inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
433                 close(s);
434                 return NULL;
435         }
436         jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
437                         s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
438         return new connection(mgr, s, lossy);
439 }
440