Reduced timeouts by 10x
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <fcntl.h>
6 #include <sys/types.h>
7 #include <netinet/tcp.h>
8 #include <unistd.h>
9 #include <sys/socket.h>
10 #include "marshall.h"
11
12 connection::connection(chanmgr *m1, int f1, int l1)
13 : mgr_(m1), fd_(f1), lossy_(l1)
14 {
15     int flags = fcntl(fd_, F_GETFL, NULL);
16     fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
17
18     signal(SIGPIPE, SIG_IGN);
19
20     create_time_ = steady_clock::now();
21
22     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
23 }
24
25 connection::~connection() {
26     VERIFY(dead_);
27     VERIFY(!wpdu_.buf.size());
28     close(fd_);
29 }
30
31 void connection::incref() {
32     lock rl(ref_m_);
33     refno_++;
34 }
35
36 bool connection::isdead() {
37     lock ml(m_);
38     return dead_;
39 }
40
41 void connection::closeconn() {
42     {
43         lock ml(m_);
44         if (dead_)
45             return;
46         dead_ = true;
47         shutdown(fd_,SHUT_RDWR);
48     }
49     //after block_remove_fd, select will never wait on fd_
50     //and no callbacks will be active
51     PollMgr::Instance()->block_remove_fd(fd_);
52 }
53
54 void connection::decref() {
55     bool dead = false;
56     {
57         lock rl(ref_m_);
58         refno_--;
59         VERIFY(refno_>=0);
60         if (refno_==0) {
61             lock ml(m_);
62             dead = dead_;
63         }
64     }
65     if (dead)
66         delete this;
67 }
68
69 int connection::compare(connection *another) {
70     if (create_time_ > another->create_time_)
71         return 1;
72     if (create_time_ < another->create_time_)
73         return -1;
74     return 0;
75 }
76
77 bool connection::send(const string & b) {
78     lock ml(m_);
79
80     waiters_++;
81     while (!dead_ && wpdu_.buf.size())
82         send_wait_.wait(ml);
83     waiters_--;
84
85     if (dead_)
86         return false;
87
88     wpdu_.buf = b;
89     wpdu_.solong = 0;
90
91     if (lossy_) {
92         if ((random()%100) < lossy_) {
93             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
94             shutdown(fd_,SHUT_RDWR);
95         }
96     }
97
98     if (!writepdu()) {
99         dead_ = true;
100         ml.unlock();
101         PollMgr::Instance()->block_remove_fd(fd_);
102         ml.lock();
103     } else if (wpdu_.solong != wpdu_.buf.size()) {
104         // should be rare to need to explicitly add write callback
105         PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
106         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
107             send_complete_.wait(ml);
108     }
109     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
110     wpdu_.solong = 0;
111     wpdu_.buf.clear();
112     if (waiters_ > 0)
113         send_wait_.notify_all();
114     return ret;
115 }
116
117 //fd_ is ready to be written
118 void connection::write_cb(int s) {
119     lock ml(m_);
120     VERIFY(!dead_);
121     VERIFY(fd_ == s);
122     if (wpdu_.buf.size() == 0) {
123         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
124         return;
125     }
126     if (!writepdu()) {
127         PollMgr::Instance()->del_callback(fd_, CB_RDWR);
128         dead_ = true;
129     } else {
130         VERIFY(wpdu_.solong != size_t_max);
131         if (wpdu_.solong < wpdu_.buf.size()) {
132             return;
133         }
134     }
135     send_complete_.notify_one();
136 }
137
138 //fd_ is ready to be read
139 void connection::read_cb(int s) {
140     lock ml(m_);
141     VERIFY(fd_ == s);
142     if (dead_)  {
143         return;
144     }
145
146     IF_LEVEL(5) LOG("got data on fd " << s);
147
148     bool succ = true;
149     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
150         succ = readpdu();
151     }
152
153     if (!succ) {
154         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
155         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
156         dead_ = true;
157         send_complete_.notify_one();
158     }
159
160     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
161         if (mgr_->got_pdu(this, rpdu_.buf)) {
162             //chanmgr has successfully consumed the pdu
163             rpdu_.buf.clear();
164             rpdu_.solong = 0;
165         }
166     }
167 }
168
169 bool connection::writepdu() {
170     VERIFY(wpdu_.solong != size_t_max);
171     if (wpdu_.solong == wpdu_.buf.size())
172         return true;
173
174     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
175     if (n < 0) {
176         if (errno != EAGAIN) {
177             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
178             wpdu_.solong = size_t_max;
179             wpdu_.buf.clear();
180         }
181         return (errno == EAGAIN);
182     }
183     wpdu_.solong += (size_t)n;
184     return true;
185 }
186
187 bool connection::readpdu() {
188     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
189     if (!rpdu_.buf.size()) {
190         rpc_sz_t sz1;
191         ssize_t n = read(fd_, &sz1, sizeof(sz1));
192
193         if (n == 0) {
194             return false;
195         }
196
197         if (n < 0) {
198             VERIFY(errno!=EAGAIN);
199             return false;
200         }
201
202         if (n > 0 && n != sizeof(sz1)) {
203             IF_LEVEL(0) LOG("short read of sz");
204             return false;
205         }
206
207         size_t sz = ntoh(sz1);
208
209         if (sz > MAX_PDU) {
210             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
211             return false;
212         }
213
214         IF_LEVEL(5) LOG("read size of datagram = " << sz);
215
216         VERIFY(rpdu_.buf.size() == 0);
217         rpdu_.buf = string(sz+sizeof(sz1), 0);
218         rpdu_.solong = sizeof(sz1);
219     }
220
221     ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
222
223     IF_LEVEL(5) LOG("read " << n << " bytes");
224
225     if (n <= 0) {
226         if (errno == EAGAIN)
227             return true;
228         rpdu_.buf.clear();
229         rpdu_.solong = 0;
230         return (errno == EAGAIN);
231     }
232     rpdu_.solong += (size_t)n;
233     return true;
234 }
235
236 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
237 : mgr_(m1), lossy_(lossytest)
238 {
239     struct sockaddr_in sin;
240     memset(&sin, 0, sizeof(sin));
241     sin.sin_family = AF_INET;
242     sin.sin_port = hton(port);
243
244     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
245     if (tcp_ < 0) {
246         perror("accept_loop socket:");
247         VERIFY(0);
248     }
249
250     int yes = 1;
251     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
252     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
253
254     // careful to exactly match type signature of bind arguments so we don't
255     // get std::bind instead
256     if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
257         perror("accept_loop tcp bind:");
258         VERIFY(0);
259     }
260
261     if (listen(tcp_, 1000) < 0) {
262         perror("listen:");
263         VERIFY(0);
264     }
265
266     socklen_t addrlen = sizeof(sin);
267     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
268     port_ = ntoh(sin.sin_port);
269
270     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
271
272     if (pipe(pipe_) < 0) {
273         perror("accept_loop pipe:");
274         VERIFY(0);
275     }
276
277     int flags = fcntl(pipe_[0], F_GETFL, NULL);
278     flags |= O_NONBLOCK;
279     fcntl(pipe_[0], F_SETFL, flags);
280
281     th_ = thread(&tcpsconn::accept_conn, this);
282 }
283
284 tcpsconn::~tcpsconn()
285 {
286     VERIFY(close(pipe_[1]) == 0);
287     th_.join();
288
289     //close all the active connections
290     map<int, connection *>::iterator i;
291     for (i = conns_.begin(); i != conns_.end(); i++) {
292         i->second->closeconn();
293         i->second->decref();
294     }
295 }
296
297 void tcpsconn::process_accept() {
298     sockaddr_in sin;
299     socklen_t slen = sizeof(sin);
300     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
301     if (s1 < 0) {
302         perror("tcpsconn::accept_conn error");
303         throw thread_exit_exception();
304     }
305
306     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
307     connection *ch = new connection(mgr_, s1, lossy_);
308
309     // garbage collect all dead connections with refcount of 1
310     for (auto i = conns_.begin(); i != conns_.end();) {
311         if (i->second->isdead() && i->second->ref() == 1) {
312             IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
313             i->second->decref();
314             // Careful not to reuse i right after erase. (i++) will
315             // be evaluated before the erase call because in C++,
316             // there is a sequence point before a function call.
317             // See http://en.wikipedia.org/wiki/Sequence_point.
318             conns_.erase(i++);
319         } else
320             ++i;
321     }
322
323     conns_[ch->channo()] = ch;
324 }
325
326 void tcpsconn::accept_conn() {
327     fd_set rfds;
328     int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
329
330     try {
331         while (1) {
332             FD_ZERO(&rfds);
333             FD_SET(pipe_[0], &rfds);
334             FD_SET(tcp_, &rfds);
335
336             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
337
338             if (ret < 0) {
339                 if (errno == EINTR) {
340                     continue;
341                 } else {
342                     perror("accept_conn select:");
343                     IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
344                     VERIFY(0);
345                 }
346             }
347
348             if (FD_ISSET(pipe_[0], &rfds)) {
349                 close(pipe_[0]);
350                 close(tcp_);
351                 return;
352             }
353             else if (FD_ISSET(tcp_, &rfds)) {
354                 process_accept();
355             } else {
356                 VERIFY(0);
357             }
358         }
359     }
360     catch (thread_exit_exception e)
361     {
362     }
363 }
364
365 connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
366     int s = socket(AF_INET, SOCK_STREAM, 0);
367     int yes = 1;
368     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
369     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
370         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
371         close(s);
372         return NULL;
373     }
374     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
375     return new connection(mgr, s, lossy);
376 }
377