More cleaning
[invirt/third/libt4.git] / rpc / connection.cc
1 // std::bind and syscall bind have the same name, so don't use std::bind in this file
2 #define LIBT4_NO_FUNCTIONAL
3 #include "connection.h"
4 #include <fcntl.h>
5 #include <sys/types.h>
6 #include <netinet/tcp.h>
7 #include <errno.h>
8 #include <signal.h>
9 #include <unistd.h>
10 #include "jsl_log.h"
11 #include <sys/socket.h>
12
13 #define MAX_PDU (10<<20) //maximum PDF is 10M
14
15 connection::connection(chanmgr *m1, int f1, int l1)
16 : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1)
17 {
18
19     int flags = fcntl(fd_, F_GETFL, NULL);
20     flags |= O_NONBLOCK;
21     fcntl(fd_, F_SETFL, flags);
22
23     signal(SIGPIPE, SIG_IGN);
24
25     create_time_ = steady_clock::now();
26
27     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
28 }
29
30 connection::~connection()
31 {
32     VERIFY(dead_);
33     if (rpdu_.buf)
34         free(rpdu_.buf);
35     VERIFY(!wpdu_.buf);
36     close(fd_);
37 }
38
39 void
40 connection::incref()
41 {
42     lock rl(ref_m_);
43     refno_++;
44 }
45
46 bool
47 connection::isdead()
48 {
49     lock ml(m_);
50     return dead_;
51 }
52
53 void
54 connection::closeconn()
55 {
56     {
57         lock ml(m_);
58         if (!dead_) {
59             dead_ = true;
60             shutdown(fd_,SHUT_RDWR);
61         } else {
62             return;
63         }
64     }
65     //after block_remove_fd, select will never wait on fd_
66     //and no callbacks will be active
67     PollMgr::Instance()->block_remove_fd(fd_);
68 }
69
70 void
71 connection::decref()
72 {
73     bool dead = false;
74     {
75         lock rl(ref_m_);
76         refno_--;
77         VERIFY(refno_>=0);
78         if (refno_==0) {
79             lock ml(m_);
80             dead = dead_;
81         }
82     }
83     if (dead) {
84         delete this;
85     }
86 }
87
88 int
89 connection::ref()
90 {
91     lock rl(ref_m_);
92         return refno_;
93 }
94
95 int
96 connection::compare(connection *another)
97 {
98     if (create_time_ > another->create_time_)
99         return 1;
100     if (create_time_ < another->create_time_)
101         return -1;
102     return 0;
103 }
104
105 bool
106 connection::send(char *b, size_t sz)
107 {
108     lock ml(m_);
109         waiters_++;
110         while (!dead_ && wpdu_.buf) {
111         send_wait_.wait(ml);
112         }
113         waiters_--;
114         if (dead_) {
115                 return false;
116         }
117         wpdu_.buf = b;
118         wpdu_.sz = sz;
119         wpdu_.solong = 0;
120
121         if (lossy_) {
122                 if ((random()%100) < lossy_) {
123                         jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_);
124                         shutdown(fd_,SHUT_RDWR);
125                 }
126         }
127
128         if (!writepdu()) {
129                 dead_ = true;
130         ml.unlock();
131                 PollMgr::Instance()->block_remove_fd(fd_);
132         ml.lock();
133         } else {
134                 if (wpdu_.solong == wpdu_.sz) {
135                 } else {
136                         //should be rare to need to explicitly add write callback
137                         PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
138                         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
139                 send_complete_.wait(ml);
140                         }
141                 }
142         }
143         bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
144         wpdu_.solong = wpdu_.sz = 0;
145         wpdu_.buf = NULL;
146         if (waiters_ > 0)
147         send_wait_.notify_all();
148         return ret;
149 }
150
151 //fd_ is ready to be written
152 void
153 connection::write_cb(int s)
154 {
155     lock ml(m_);
156         VERIFY(!dead_);
157         VERIFY(fd_ == s);
158         if (wpdu_.sz == 0) {
159                 PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
160                 return;
161         }
162         if (!writepdu()) {
163                 PollMgr::Instance()->del_callback(fd_, CB_RDWR);
164                 dead_ = true;
165         } else {
166                 VERIFY(wpdu_.solong != size_t_max);
167                 if (wpdu_.solong < wpdu_.sz) {
168                         return;
169                 }
170     }
171         send_complete_.notify_one();
172 }
173
174 //fd_ is ready to be read
175 void
176 connection::read_cb(int s)
177 {
178     lock ml(m_);
179         VERIFY(fd_ == s);
180         if (dead_)  {
181                 return;
182         }
183
184         bool succ = true;
185         if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
186                 succ = readpdu();
187         }
188
189         if (!succ) {
190                 PollMgr::Instance()->del_callback(fd_,CB_RDWR);
191                 dead_ = true;
192                 send_complete_.notify_one();
193         }
194
195         if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
196                 if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
197                         //chanmgr has successfully consumed the pdu
198                         rpdu_.buf = NULL;
199                         rpdu_.sz = rpdu_.solong = 0;
200                 }
201         }
202 }
203
204 bool
205 connection::writepdu()
206 {
207         VERIFY(wpdu_.solong != size_t_max);
208         if (wpdu_.solong == wpdu_.sz)
209                 return true;
210
211         if (wpdu_.solong == 0) {
212                 uint32_t sz = htonl((uint32_t)wpdu_.sz);
213                 bcopy(&sz,wpdu_.buf,sizeof(sz));
214         }
215         ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
216         if (n < 0) {
217                 if (errno != EAGAIN) {
218                         jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
219                         wpdu_.solong = size_t_max;
220                         wpdu_.sz = 0;
221                 }
222                 return (errno == EAGAIN);
223         }
224         wpdu_.solong += (size_t)n;
225         return true;
226 }
227
228 bool
229 connection::readpdu()
230 {
231         if (!rpdu_.sz) {
232                 uint32_t sz1;
233                 ssize_t n = read(fd_, &sz1, sizeof(sz1));
234
235                 if (n == 0) {
236                         return false;
237                 }
238
239                 if (n < 0) {
240                         VERIFY(errno!=EAGAIN);
241                         return false;
242                 }
243
244                 if (n > 0 && n != sizeof(sz1)) {
245                         jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
246                         return false;
247                 }
248
249                 size_t sz = ntohl(sz1);
250
251                 if (sz > MAX_PDU) {
252                         char *tmpb = (char *)&sz1;
253                         jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz,
254                                         sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
255                         return false;
256                 }
257
258                 rpdu_.sz = sz;
259                 VERIFY(rpdu_.buf == NULL);
260                 rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
261                 VERIFY(rpdu_.buf);
262                 bcopy(&sz1,rpdu_.buf,sizeof(sz1));
263                 rpdu_.solong = sizeof(sz1);
264         }
265
266         ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
267         if (n <= 0) {
268                 if (errno == EAGAIN)
269                         return true;
270                 if (rpdu_.buf)
271                         free(rpdu_.buf);
272                 rpdu_.buf = NULL;
273                 rpdu_.sz = rpdu_.solong = 0;
274                 return (errno == EAGAIN);
275         }
276         rpdu_.solong += (size_t)n;
277         return true;
278 }
279
280 tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
281 : mgr_(m1), lossy_(lossytest)
282 {
283         struct sockaddr_in sin;
284         memset(&sin, 0, sizeof(sin));
285         sin.sin_family = AF_INET;
286         sin.sin_port = htons(port);
287
288         tcp_ = socket(AF_INET, SOCK_STREAM, 0);
289         if (tcp_ < 0) {
290                 perror("tcpsconn::tcpsconn accept_loop socket:");
291                 VERIFY(0);
292         }
293
294         int yes = 1;
295         setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
296         setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
297
298         if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
299                 perror("accept_loop tcp bind:");
300                 VERIFY(0);
301         }
302
303         if (listen(tcp_, 1000) < 0) {
304                 perror("tcpsconn::tcpsconn listen:");
305                 VERIFY(0);
306         }
307
308     socklen_t addrlen = sizeof(sin);
309     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
310     port_ = ntohs(sin.sin_port);
311
312         jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port_,
313                 sin.sin_port);
314
315         if (pipe(pipe_) < 0) {
316                 perror("accept_loop pipe:");
317                 VERIFY(0);
318         }
319
320         int flags = fcntl(pipe_[0], F_GETFL, NULL);
321         flags |= O_NONBLOCK;
322         fcntl(pipe_[0], F_SETFL, flags);
323
324     th_ = thread(&tcpsconn::accept_conn, this);
325 }
326
327 tcpsconn::~tcpsconn()
328 {
329         VERIFY(close(pipe_[1]) == 0);
330     th_.join();
331
332         //close all the active connections
333         map<int, connection *>::iterator i;
334         for (i = conns_.begin(); i != conns_.end(); i++) {
335                 i->second->closeconn();
336                 i->second->decref();
337         }
338 }
339
340 void
341 tcpsconn::process_accept()
342 {
343         sockaddr_in sin;
344         socklen_t slen = sizeof(sin);
345         int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
346         if (s1 < 0) {
347                 perror("tcpsconn::accept_conn error");
348                 throw thread_exit_exception();
349         }
350
351         jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n",
352                         s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
353         connection *ch = new connection(mgr_, s1, lossy_);
354
355     // garbage collect all dead connections with refcount of 1
356     for (auto i = conns_.begin(); i != conns_.end();) {
357         if (i->second->isdead() && i->second->ref() == 1) {
358             jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n",
359                     i->second->channo());
360             i->second->decref();
361             // Careful not to reuse i right after erase. (i++) will
362             // be evaluated before the erase call because in C++,
363             // there is a sequence point before a function call.
364             // See http://en.wikipedia.org/wiki/Sequence_point.
365             conns_.erase(i++);
366         } else
367             ++i;
368     }
369
370         conns_[ch->channo()] = ch;
371 }
372
373 void
374 tcpsconn::accept_conn()
375 {
376         fd_set rfds;
377         int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
378
379     try {
380         while (1) {
381             FD_ZERO(&rfds);
382             FD_SET(pipe_[0], &rfds);
383             FD_SET(tcp_, &rfds);
384
385             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
386
387             if (ret < 0) {
388                 if (errno == EINTR) {
389                     continue;
390                 } else {
391                     perror("accept_conn select:");
392                     jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno);
393                     VERIFY(0);
394                 }
395             }
396
397             if (FD_ISSET(pipe_[0], &rfds)) {
398                 close(pipe_[0]);
399                 close(tcp_);
400                 return;
401             }
402             else if (FD_ISSET(tcp_, &rfds)) {
403                 process_accept();
404             } else {
405                 VERIFY(0);
406             }
407         }
408     }
409     catch (thread_exit_exception e)
410     {
411     }
412 }
413
414 connection *
415 connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy)
416 {
417         int s = socket(AF_INET, SOCK_STREAM, 0);
418         int yes = 1;
419         setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
420         if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
421                 jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n",
422                                 inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
423                 close(s);
424                 return NULL;
425         }
426         jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n",
427                         s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port));
428         return new connection(mgr, s, lossy);
429 }
430