Refactoring
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <netinet/tcp.h>
6 #include <unistd.h>
7 #include "marshall.h"
8
9 connection::connection(connection_delegate *m1, socket_t && f1, int l1)
10 : mgr_(m1), fd_(move(f1)), lossy_(l1)
11 {
12     fd_.flags() |= O_NONBLOCK;
13
14     signal(SIGPIPE, SIG_IGN);
15
16     create_time_ = steady_clock::now();
17
18     poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
19 }
20
21 connection::~connection() {
22     closeconn();
23     VERIFY(dead_);
24     VERIFY(!wpdu_.buf.size());
25 }
26
27 shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
28     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
29     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
30     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
31         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
32         close(s);
33         return nullptr;
34     }
35     IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
36     return make_shared<connection>(mgr, move(s), lossy);
37 }
38
39 void connection::closeconn() {
40     {
41         lock ml(m_);
42         if (dead_)
43             return;
44         dead_ = true;
45         shutdown(fd_,SHUT_RDWR);
46     }
47     //after block_remove_fd, select will never wait on fd_
48     //and no callbacks will be active
49     poll_mgr::shared_mgr.block_remove_fd(fd_);
50 }
51
52 bool connection::send(const string & b) {
53     lock ml(m_);
54
55     waiters_++;
56     while (!dead_ && wpdu_.buf.size())
57         send_wait_.wait(ml);
58     waiters_--;
59
60     if (dead_)
61         return false;
62
63     wpdu_.buf = b;
64     wpdu_.solong = 0;
65
66     if (lossy_) {
67         if ((random()%100) < lossy_) {
68             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
69             shutdown(fd_,SHUT_RDWR);
70         }
71     }
72
73     if (!writepdu()) {
74         dead_ = true;
75         ml.unlock();
76         poll_mgr::shared_mgr.block_remove_fd(fd_);
77         ml.lock();
78     } else if (wpdu_.solong != wpdu_.buf.size()) {
79         // should be rare to need to explicitly add write callback
80         poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
81         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
82             send_complete_.wait(ml);
83     }
84     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
85     wpdu_.solong = 0;
86     wpdu_.buf.clear();
87     if (waiters_ > 0)
88         send_wait_.notify_all();
89     return ret;
90 }
91
92 //fd_ is ready to be written
93 void connection::write_cb(int s) {
94     lock ml(m_);
95     VERIFY(!dead_);
96     VERIFY(fd_ == s);
97     if (wpdu_.buf.size() == 0) {
98         poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
99         return;
100     }
101     if (!writepdu()) {
102         poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
103         dead_ = true;
104     } else {
105         VERIFY(wpdu_.solong != size_t_max);
106         if (wpdu_.solong < wpdu_.buf.size()) {
107             return;
108         }
109     }
110     send_complete_.notify_one();
111 }
112
113 // fd_ is ready to be read
114 void connection::read_cb(int s) {
115     lock ml(m_);
116     VERIFY(fd_ == s);
117     if (dead_)
118         return;
119
120     IF_LEVEL(5) LOG("got data on fd " << s);
121
122     bool succ = true;
123     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
124         succ = readpdu();
125
126     if (!succ) {
127         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
128         poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
129         dead_ = true;
130         send_complete_.notify_one();
131     }
132
133     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
134         if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
135             // connection_delegate has successfully consumed the pdu
136             rpdu_.buf.clear();
137             rpdu_.solong = 0;
138         }
139     }
140 }
141
142 bool connection::writepdu() {
143     VERIFY(wpdu_.solong != size_t_max);
144     if (wpdu_.solong == wpdu_.buf.size())
145         return true;
146
147     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
148     if (n < 0) {
149         if (errno != EAGAIN) {
150             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
151             wpdu_.solong = size_t_max;
152             wpdu_.buf.clear();
153         }
154         return (errno == EAGAIN);
155     }
156     wpdu_.solong += (size_t)n;
157     return true;
158 }
159
160 bool connection::readpdu() {
161     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
162     if (!rpdu_.buf.size()) {
163         rpc_sz_t sz1;
164         ssize_t n = fd_.read(sz1);
165
166         if (n == 0)
167             return false;
168
169         if (n < 0) {
170             VERIFY(errno!=EAGAIN);
171             return false;
172         }
173
174         if (n > 0 && n != sizeof(sz1)) {
175             IF_LEVEL(0) LOG("short read of sz");
176             return false;
177         }
178
179         size_t sz = ntoh(sz1);
180
181         if (sz > MAX_PDU) {
182             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
183             return false;
184         }
185
186         IF_LEVEL(5) LOG("read size of datagram = " << sz);
187
188         VERIFY(rpdu_.buf.size() == 0);
189         rpdu_.buf = string(sz+sizeof(sz1), 0);
190         rpdu_.solong = sizeof(sz1);
191     }
192
193     ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
194
195     IF_LEVEL(5) LOG("read " << n << " bytes");
196
197     if (n <= 0) {
198         if (errno == EAGAIN)
199             return true;
200         rpdu_.buf.clear();
201         rpdu_.solong = 0;
202         return (errno == EAGAIN);
203     }
204     rpdu_.solong += (size_t)n;
205     return true;
206 }
207
208 tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
209 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
210 {
211     sockaddr_in sin;
212     memset(&sin, 0, sizeof(sin));
213     sin.sin_family = AF_INET;
214     sin.sin_port = hton(port);
215
216     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
217     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
218
219     if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
220         perror("accept_loop setsockopt");
221
222     if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
223         perror("accept_loop setsockopt");
224
225     // careful to exactly match type signature of bind arguments so we don't
226     // get std::bind instead
227     if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
228         perror("accept_loop bind");
229         VERIFY(0);
230     }
231
232     if (listen(tcp_, 1000) < 0) {
233         perror("accept_loop listen");
234         VERIFY(0);
235     }
236
237     socklen_t addrlen = sizeof(sin);
238     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
239     port_ = ntoh(sin.sin_port);
240
241     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
242
243     file_t::pipe(pipe_);
244
245     pipe_[0].flags() |= O_NONBLOCK;
246
247     th_ = thread(&tcpsconn::accept_conn, this);
248 }
249
250 tcpsconn::~tcpsconn()
251 {
252     pipe_[1].close();
253     th_.join();
254
255     for (auto & i : conns_)
256         i.second->closeconn();
257 }
258
259 void tcpsconn::process_accept() {
260     sockaddr_in sin;
261     socklen_t slen = sizeof(sin);
262     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
263     if (s1 < 0) {
264         perror("tcpsconn::accept_conn error");
265         throw thread_exit_exception();
266     }
267
268     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
269     auto ch = make_shared<connection>(mgr_, s1, lossy_);
270
271     // garbage collect dead connections
272     for (auto i = conns_.begin(); i != conns_.end();) {
273         if (i->second->isdead())
274             conns_.erase(i++);
275         else
276             ++i;
277     }
278
279     conns_[ch->channo()] = ch;
280 }
281
282 void tcpsconn::accept_conn() {
283     fd_set rfds;
284     int max_fd = max((int)pipe_[0], (int)tcp_);
285
286     while (1) {
287         FD_ZERO(&rfds);
288         FD_SET(pipe_[0], &rfds);
289         FD_SET(tcp_, &rfds);
290
291         int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
292
293         if (ret < 0 && errno == EINTR)
294             continue;
295         else if (ret < 0) {
296             perror("accept_conn select:");
297             IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
298             VERIFY(0);
299         }
300
301         if (FD_ISSET(pipe_[0], &rfds))
302             return;
303
304         VERIFY(FD_ISSET(tcp_, &rfds));
305
306         try {
307             process_accept();
308         } catch (thread_exit_exception e) {
309             break;
310         }
311     }
312 }
313