Includes cleanups
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <netinet/tcp.h>
6 #include <unistd.h>
7 #include "marshall.h"
8
9 connection::connection(chanmgr *m1, int f1, int l1)
10 : mgr_(m1), fd_(f1), lossy_(l1)
11 {
12     fd_.flags() |= O_NONBLOCK;
13
14     signal(SIGPIPE, SIG_IGN);
15
16     create_time_ = steady_clock::now();
17
18     PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
19 }
20
21 connection::~connection() {
22     closeconn();
23     VERIFY(dead_);
24     VERIFY(!wpdu_.buf.size());
25 }
26
27 void connection::closeconn() {
28     {
29         lock ml(m_);
30         if (dead_)
31             return;
32         dead_ = true;
33         shutdown(fd_,SHUT_RDWR);
34     }
35     //after block_remove_fd, select will never wait on fd_
36     //and no callbacks will be active
37     PollMgr::Instance().block_remove_fd(fd_);
38 }
39
40 bool connection::send(const string & b) {
41     lock ml(m_);
42
43     waiters_++;
44     while (!dead_ && wpdu_.buf.size())
45         send_wait_.wait(ml);
46     waiters_--;
47
48     if (dead_)
49         return false;
50
51     wpdu_.buf = b;
52     wpdu_.solong = 0;
53
54     if (lossy_) {
55         if ((random()%100) < lossy_) {
56             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
57             shutdown(fd_,SHUT_RDWR);
58         }
59     }
60
61     if (!writepdu()) {
62         dead_ = true;
63         ml.unlock();
64         PollMgr::Instance().block_remove_fd(fd_);
65         ml.lock();
66     } else if (wpdu_.solong != wpdu_.buf.size()) {
67         // should be rare to need to explicitly add write callback
68         PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
69         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
70             send_complete_.wait(ml);
71     }
72     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
73     wpdu_.solong = 0;
74     wpdu_.buf.clear();
75     if (waiters_ > 0)
76         send_wait_.notify_all();
77     return ret;
78 }
79
80 //fd_ is ready to be written
81 void connection::write_cb(int s) {
82     lock ml(m_);
83     VERIFY(!dead_);
84     VERIFY(fd_ == s);
85     if (wpdu_.buf.size() == 0) {
86         PollMgr::Instance().del_callback(fd_,CB_WRONLY);
87         return;
88     }
89     if (!writepdu()) {
90         PollMgr::Instance().del_callback(fd_, CB_RDWR);
91         dead_ = true;
92     } else {
93         VERIFY(wpdu_.solong != size_t_max);
94         if (wpdu_.solong < wpdu_.buf.size()) {
95             return;
96         }
97     }
98     send_complete_.notify_one();
99 }
100
101 // fd_ is ready to be read
102 void connection::read_cb(int s) {
103     lock ml(m_);
104     VERIFY(fd_ == s);
105     if (dead_)  {
106         return;
107     }
108
109     IF_LEVEL(5) LOG("got data on fd " << s);
110
111     bool succ = true;
112     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
113         succ = readpdu();
114     }
115
116     if (!succ) {
117         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
118         PollMgr::Instance().del_callback(fd_,CB_RDWR);
119         dead_ = true;
120         send_complete_.notify_one();
121     }
122
123     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
124         if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
125             // chanmgr has successfully consumed the pdu
126             rpdu_.buf.clear();
127             rpdu_.solong = 0;
128         }
129     }
130 }
131
132 bool connection::writepdu() {
133     VERIFY(wpdu_.solong != size_t_max);
134     if (wpdu_.solong == wpdu_.buf.size())
135         return true;
136
137     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
138     if (n < 0) {
139         if (errno != EAGAIN) {
140             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
141             wpdu_.solong = size_t_max;
142             wpdu_.buf.clear();
143         }
144         return (errno == EAGAIN);
145     }
146     wpdu_.solong += (size_t)n;
147     return true;
148 }
149
150 bool connection::readpdu() {
151     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
152     if (!rpdu_.buf.size()) {
153         rpc_sz_t sz1;
154         ssize_t n = fd_.read(sz1);
155
156         if (n == 0)
157             return false;
158
159         if (n < 0) {
160             VERIFY(errno!=EAGAIN);
161             return false;
162         }
163
164         if (n > 0 && n != sizeof(sz1)) {
165             IF_LEVEL(0) LOG("short read of sz");
166             return false;
167         }
168
169         size_t sz = ntoh(sz1);
170
171         if (sz > MAX_PDU) {
172             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
173             return false;
174         }
175
176         IF_LEVEL(5) LOG("read size of datagram = " << sz);
177
178         VERIFY(rpdu_.buf.size() == 0);
179         rpdu_.buf = string(sz+sizeof(sz1), 0);
180         rpdu_.solong = sizeof(sz1);
181     }
182
183     ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
184
185     IF_LEVEL(5) LOG("read " << n << " bytes");
186
187     if (n <= 0) {
188         if (errno == EAGAIN)
189             return true;
190         rpdu_.buf.clear();
191         rpdu_.solong = 0;
192         return (errno == EAGAIN);
193     }
194     rpdu_.solong += (size_t)n;
195     return true;
196 }
197
198 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
199 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
200 {
201     struct sockaddr_in sin;
202     memset(&sin, 0, sizeof(sin));
203     sin.sin_family = AF_INET;
204     sin.sin_port = hton(port);
205
206     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
207     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
208
209     struct timeval timeout = {0, 50000};
210
211     if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
212         perror("accept_loop setsockopt");
213
214     if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
215         perror("accept_loop setsockopt");
216
217     // careful to exactly match type signature of bind arguments so we don't
218     // get std::bind instead
219     if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
220         perror("accept_loop bind");
221         VERIFY(0);
222     }
223
224     if (listen(tcp_, 1000) < 0) {
225         perror("accept_loop listen");
226         VERIFY(0);
227     }
228
229     socklen_t addrlen = sizeof(sin);
230     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
231     port_ = ntoh(sin.sin_port);
232
233     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
234
235     file_t::pipe(pipe_);
236
237     pipe_[0].flags() |= O_NONBLOCK;
238
239     th_ = thread(&tcpsconn::accept_conn, this);
240 }
241
242 tcpsconn::~tcpsconn()
243 {
244     pipe_[1].close();
245     th_.join();
246
247     for (auto & i : conns_)
248         i.second->closeconn();
249 }
250
251 void tcpsconn::process_accept() {
252     sockaddr_in sin;
253     socklen_t slen = sizeof(sin);
254     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
255     if (s1 < 0) {
256         perror("tcpsconn::accept_conn error");
257         throw thread_exit_exception();
258     }
259
260     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
261     auto ch = make_shared<connection>(mgr_, s1, lossy_);
262
263     // garbage collect dead connections
264     for (auto i = conns_.begin(); i != conns_.end();) {
265         if (i->second->isdead())
266             conns_.erase(i++);
267         else
268             ++i;
269     }
270
271     conns_[ch->channo()] = ch;
272 }
273
274 void tcpsconn::accept_conn() {
275     fd_set rfds;
276     int max_fd = max((int)pipe_[0], (int)tcp_);
277
278     while (1) {
279         FD_ZERO(&rfds);
280         FD_SET(pipe_[0], &rfds);
281         FD_SET(tcp_, &rfds);
282
283         int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
284
285         if (ret < 0 && errno == EINTR)
286             continue;
287         else if (ret < 0) {
288             perror("accept_conn select:");
289             IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
290             VERIFY(0);
291         }
292
293         if (FD_ISSET(pipe_[0], &rfds))
294             return;
295
296         if (!FD_ISSET(tcp_, &rfds))
297             VERIFY(0);
298
299         try {
300             process_accept();
301         } catch (thread_exit_exception e) {
302             break;
303         }
304     }
305 }
306
307 shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
308     int s = socket(AF_INET, SOCK_STREAM, 0);
309     int yes = 1;
310     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
311     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
312         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
313         close(s);
314         return nullptr;
315     }
316     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
317     return make_shared<connection>(mgr, s, lossy);
318 }
319