55e374aa68501aafe87b7dc575751f11cbd0b972
[invirt/third/libt4.git] / rpc / connection.cc
1 // std::bind and syscall bind have the same name, so don't use std::bind in this file
2 #define LIBT4_NO_FUNCTIONAL
3 #include "connection.h"
4 #include <cerrno>
5 #include <csignal>
6 #include <fcntl.h>
7 #include <sys/types.h>
8 #include <netinet/tcp.h>
9 #include <unistd.h>
10 #include <sys/socket.h>
11
12 #define MAX_PDU (10<<20) //maximum PDF is 10M
13
14 connection::connection(chanmgr *m1, int f1, int l1)
15 : mgr_(m1), fd_(f1), lossy_(l1)
16 {
17     int flags = fcntl(fd_, F_GETFL, NULL);
18     fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
19
20     signal(SIGPIPE, SIG_IGN);
21
22     create_time_ = steady_clock::now();
23
24     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
25 }
26
27 connection::~connection() {
28     VERIFY(dead_);
29     VERIFY(!wpdu_.buf.size());
30     close(fd_);
31 }
32
33 void connection::incref() {
34     lock rl(ref_m_);
35     refno_++;
36 }
37
38 bool connection::isdead() {
39     lock ml(m_);
40     return dead_;
41 }
42
43 void connection::closeconn() {
44     {
45         lock ml(m_);
46         if (dead_)
47             return;
48         dead_ = true;
49         shutdown(fd_,SHUT_RDWR);
50     }
51     //after block_remove_fd, select will never wait on fd_
52     //and no callbacks will be active
53     PollMgr::Instance()->block_remove_fd(fd_);
54 }
55
56 void connection::decref() {
57     bool dead = false;
58     {
59         lock rl(ref_m_);
60         refno_--;
61         VERIFY(refno_>=0);
62         if (refno_==0) {
63             lock ml(m_);
64             dead = dead_;
65         }
66     }
67     if (dead)
68         delete this;
69 }
70
71 int connection::compare(connection *another) {
72     if (create_time_ > another->create_time_)
73         return 1;
74     if (create_time_ < another->create_time_)
75         return -1;
76     return 0;
77 }
78
79 bool connection::send(const string & b) {
80     lock ml(m_);
81
82     waiters_++;
83     while (!dead_ && wpdu_.buf.size())
84         send_wait_.wait(ml);
85     waiters_--;
86
87     if (dead_)
88         return false;
89
90     wpdu_.buf = b;
91     wpdu_.solong = 0;
92
93     if (lossy_) {
94         if ((random()%100) < lossy_) {
95             IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
96             shutdown(fd_,SHUT_RDWR);
97         }
98     }
99
100     if (!writepdu()) {
101         dead_ = true;
102         ml.unlock();
103         PollMgr::Instance()->block_remove_fd(fd_);
104         ml.lock();
105     } else if (wpdu_.solong != wpdu_.buf.size()) {
106         // should be rare to need to explicitly add write callback
107         PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
108         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
109             send_complete_.wait(ml);
110     }
111     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
112     wpdu_.solong = 0;
113     wpdu_.buf.clear();
114     if (waiters_ > 0)
115         send_wait_.notify_all();
116     return ret;
117 }
118
119 //fd_ is ready to be written
120 void connection::write_cb(int s) {
121     lock ml(m_);
122     VERIFY(!dead_);
123     VERIFY(fd_ == s);
124     if (wpdu_.buf.size() == 0) {
125         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
126         return;
127     }
128     if (!writepdu()) {
129         PollMgr::Instance()->del_callback(fd_, CB_RDWR);
130         dead_ = true;
131     } else {
132         VERIFY(wpdu_.solong != size_t_max);
133         if (wpdu_.solong < wpdu_.buf.size()) {
134             return;
135         }
136     }
137     send_complete_.notify_one();
138 }
139
140 //fd_ is ready to be read
141 void connection::read_cb(int s) {
142     lock ml(m_);
143     VERIFY(fd_ == s);
144     if (dead_)  {
145         return;
146     }
147
148     IF_LEVEL(5) LOG("got data on fd " << s);
149
150     bool succ = true;
151     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
152         succ = readpdu();
153     }
154
155     if (!succ) {
156         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
157         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
158         dead_ = true;
159         send_complete_.notify_one();
160     }
161
162     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
163         if (mgr_->got_pdu(this, rpdu_.buf)) {
164             //chanmgr has successfully consumed the pdu
165             rpdu_.buf.clear();
166             rpdu_.solong = 0;
167         }
168     }
169 }
170
171 bool connection::writepdu() {
172     VERIFY(wpdu_.solong != size_t_max);
173     if (wpdu_.solong == wpdu_.buf.size())
174         return true;
175
176     if (wpdu_.solong == 0) {
177         uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t));
178         copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
179     }
180     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
181     if (n < 0) {
182         if (errno != EAGAIN) {
183             IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
184             wpdu_.solong = size_t_max;
185             wpdu_.buf.clear();
186         }
187         return (errno == EAGAIN);
188     }
189     wpdu_.solong += (size_t)n;
190     return true;
191 }
192
193 bool connection::readpdu() {
194     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
195     if (!rpdu_.buf.size()) {
196         uint32_t sz1;
197         ssize_t n = read(fd_, &sz1, sizeof(sz1));
198
199         if (n == 0) {
200             return false;
201         }
202
203         if (n < 0) {
204             VERIFY(errno!=EAGAIN);
205             return false;
206         }
207
208         if (n > 0 && n != sizeof(sz1)) {
209             IF_LEVEL(0) LOG("short read of sz");
210             return false;
211         }
212
213         size_t sz = ntohl(sz1);
214
215         if (sz > MAX_PDU) {
216             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
217             return false;
218         }
219
220         IF_LEVEL(5) LOG("read size of datagram = " << sz);
221
222         VERIFY(rpdu_.buf.size() == 0);
223         rpdu_.buf = string(sz+sizeof(sz1), 0);
224         copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
225         rpdu_.solong = sizeof(sz1);
226     }
227
228     ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
229
230     IF_LEVEL(5) LOG("read " << n << " bytes");
231
232     if (n <= 0) {
233         if (errno == EAGAIN)
234             return true;
235         rpdu_.buf.clear();
236         rpdu_.solong = 0;
237         return (errno == EAGAIN);
238     }
239     rpdu_.solong += (size_t)n;
240     return true;
241 }
242
243 tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
244 : mgr_(m1), lossy_(lossytest)
245 {
246     struct sockaddr_in sin;
247     memset(&sin, 0, sizeof(sin));
248     sin.sin_family = AF_INET;
249     sin.sin_port = htons(port);
250
251     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
252     if (tcp_ < 0) {
253         perror("accept_loop socket:");
254         VERIFY(0);
255     }
256
257     int yes = 1;
258     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
259     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
260
261     if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
262         perror("accept_loop tcp bind:");
263         VERIFY(0);
264     }
265
266     if (listen(tcp_, 1000) < 0) {
267         perror("listen:");
268         VERIFY(0);
269     }
270
271     socklen_t addrlen = sizeof(sin);
272     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
273     port_ = ntohs(sin.sin_port);
274
275     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
276
277     if (pipe(pipe_) < 0) {
278         perror("accept_loop pipe:");
279         VERIFY(0);
280     }
281
282     int flags = fcntl(pipe_[0], F_GETFL, NULL);
283     flags |= O_NONBLOCK;
284     fcntl(pipe_[0], F_SETFL, flags);
285
286     th_ = thread(&tcpsconn::accept_conn, this);
287 }
288
289 tcpsconn::~tcpsconn()
290 {
291     VERIFY(close(pipe_[1]) == 0);
292     th_.join();
293
294     //close all the active connections
295     map<int, connection *>::iterator i;
296     for (i = conns_.begin(); i != conns_.end(); i++) {
297         i->second->closeconn();
298         i->second->decref();
299     }
300 }
301
302 void tcpsconn::process_accept() {
303     sockaddr_in sin;
304     socklen_t slen = sizeof(sin);
305     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
306     if (s1 < 0) {
307         perror("tcpsconn::accept_conn error");
308         throw thread_exit_exception();
309     }
310
311     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
312     connection *ch = new connection(mgr_, s1, lossy_);
313
314     // garbage collect all dead connections with refcount of 1
315     for (auto i = conns_.begin(); i != conns_.end();) {
316         if (i->second->isdead() && i->second->ref() == 1) {
317             IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
318             i->second->decref();
319             // Careful not to reuse i right after erase. (i++) will
320             // be evaluated before the erase call because in C++,
321             // there is a sequence point before a function call.
322             // See http://en.wikipedia.org/wiki/Sequence_point.
323             conns_.erase(i++);
324         } else
325             ++i;
326     }
327
328     conns_[ch->channo()] = ch;
329 }
330
331 void tcpsconn::accept_conn() {
332     fd_set rfds;
333     int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
334
335     try {
336         while (1) {
337             FD_ZERO(&rfds);
338             FD_SET(pipe_[0], &rfds);
339             FD_SET(tcp_, &rfds);
340
341             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
342
343             if (ret < 0) {
344                 if (errno == EINTR) {
345                     continue;
346                 } else {
347                     perror("accept_conn select:");
348                     IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
349                     VERIFY(0);
350                 }
351             }
352
353             if (FD_ISSET(pipe_[0], &rfds)) {
354                 close(pipe_[0]);
355                 close(tcp_);
356                 return;
357             }
358             else if (FD_ISSET(tcp_, &rfds)) {
359                 process_accept();
360             } else {
361                 VERIFY(0);
362             }
363         }
364     }
365     catch (thread_exit_exception e)
366     {
367     }
368 }
369
370 connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
371     int s = socket(AF_INET, SOCK_STREAM, 0);
372     int yes = 1;
373     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
374     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
375         IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
376         close(s);
377         return NULL;
378     }
379     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
380     return new connection(mgr, s, lossy);
381 }
382