Explicit refcounting removed from connection object
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <sys/types.h>
6 #include <netinet/tcp.h>
7 #include <unistd.h>
8 #include "marshall.h"
9
10 connection::connection(chanmgr *m1, int f1, int l1)
11 : mgr_(m1), fd_(f1), lossy_(l1)
12 {
13     fd_.flags() |= O_NONBLOCK;
14
15     signal(SIGPIPE, SIG_IGN);
16
17     create_time_ = steady_clock::now();
18
19     PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
20 }
21
22 connection::~connection() {
23     closeconn();
24     VERIFY(dead_);
25     VERIFY(!wpdu_.buf.size());
26 }
27
28 void connection::closeconn() {
29     {
30         lock ml(m_);
31         if (dead_)
32             return;
33         dead_ = true;
34         shutdown(fd_,SHUT_RDWR);
35     }
36     //after block_remove_fd, select will never wait on fd_
37     //and no callbacks will be active
38     PollMgr::Instance().block_remove_fd(fd_);
39 }
40
41 bool connection::send(const string & b) {
42     lock ml(m_);
43
44     waiters_++;
45     while (!dead_ && wpdu_.buf.size())
46         send_wait_.wait(ml);
47     waiters_--;
48
49     if (dead_)
50         return false;
51
52     wpdu_.buf = b;
53     wpdu_.solong = 0;
54
55     if (lossy_) {
56         if ((random()%100) < lossy_) {
57             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
58             shutdown(fd_,SHUT_RDWR);
59         }
60     }
61
62     if (!writepdu()) {
63         dead_ = true;
64         ml.unlock();
65         PollMgr::Instance().block_remove_fd(fd_);
66         ml.lock();
67     } else if (wpdu_.solong != wpdu_.buf.size()) {
68         // should be rare to need to explicitly add write callback
69         PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
70         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
71             send_complete_.wait(ml);
72     }
73     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
74     wpdu_.solong = 0;
75     wpdu_.buf.clear();
76     if (waiters_ > 0)
77         send_wait_.notify_all();
78     return ret;
79 }
80
81 //fd_ is ready to be written
82 void connection::write_cb(int s) {
83     lock ml(m_);
84     VERIFY(!dead_);
85     VERIFY(fd_ == s);
86     if (wpdu_.buf.size() == 0) {
87         PollMgr::Instance().del_callback(fd_,CB_WRONLY);
88         return;
89     }
90     if (!writepdu()) {
91         PollMgr::Instance().del_callback(fd_, CB_RDWR);
92         dead_ = true;
93     } else {
94         VERIFY(wpdu_.solong != size_t_max);
95         if (wpdu_.solong < wpdu_.buf.size()) {
96             return;
97         }
98     }
99     send_complete_.notify_one();
100 }
101
102 // fd_ is ready to be read
103 void connection::read_cb(int s) {
104     lock ml(m_);
105     VERIFY(fd_ == s);
106     if (dead_)  {
107         return;
108     }
109
110     IF_LEVEL(5) LOG("got data on fd " << s);
111
112     bool succ = true;
113     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
114         succ = readpdu();
115     }
116
117     if (!succ) {
118         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
119         PollMgr::Instance().del_callback(fd_,CB_RDWR);
120         dead_ = true;
121         send_complete_.notify_one();
122     }
123
124     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
125         if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
126             // chanmgr has successfully consumed the pdu
127             rpdu_.buf.clear();
128             rpdu_.solong = 0;
129         }
130     }
131 }
132
133 bool connection::writepdu() {
134     VERIFY(wpdu_.solong != size_t_max);
135     if (wpdu_.solong == wpdu_.buf.size())
136         return true;
137
138     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
139     if (n < 0) {
140         if (errno != EAGAIN) {
141             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
142             wpdu_.solong = size_t_max;
143             wpdu_.buf.clear();
144         }
145         return (errno == EAGAIN);
146     }
147     wpdu_.solong += (size_t)n;
148     return true;
149 }
150
151 bool connection::readpdu() {
152     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
153     if (!rpdu_.buf.size()) {
154         rpc_sz_t sz1;
155         ssize_t n = fd_.read(sz1);
156
157         if (n == 0)
158             return false;
159
160         if (n < 0) {
161             VERIFY(errno!=EAGAIN);
162             return false;
163         }
164
165         if (n > 0 && n != sizeof(sz1)) {
166             IF_LEVEL(0) LOG("short read of sz");
167             return false;
168         }
169
170         size_t sz = ntoh(sz1);
171
172         if (sz > MAX_PDU) {
173             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
174             return false;
175         }
176
177         IF_LEVEL(5) LOG("read size of datagram = " << sz);
178
179         VERIFY(rpdu_.buf.size() == 0);
180         rpdu_.buf = string(sz+sizeof(sz1), 0);
181         rpdu_.solong = sizeof(sz1);
182     }
183
184     ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
185
186     IF_LEVEL(5) LOG("read " << n << " bytes");
187
188     if (n <= 0) {
189         if (errno == EAGAIN)
190             return true;
191         rpdu_.buf.clear();
192         rpdu_.solong = 0;
193         return (errno == EAGAIN);
194     }
195     rpdu_.solong += (size_t)n;
196     return true;
197 }
198
199 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
200 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
201 {
202     struct sockaddr_in sin;
203     memset(&sin, 0, sizeof(sin));
204     sin.sin_family = AF_INET;
205     sin.sin_port = hton(port);
206
207     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
208     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
209
210     struct timeval timeout = {0, 50000};
211
212     if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
213         perror("accept_loop setsockopt");
214
215     if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
216         perror("accept_loop setsockopt");
217
218     // careful to exactly match type signature of bind arguments so we don't
219     // get std::bind instead
220     if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
221         perror("accept_loop bind");
222         VERIFY(0);
223     }
224
225     if (listen(tcp_, 1000) < 0) {
226         perror("accept_loop listen");
227         VERIFY(0);
228     }
229
230     socklen_t addrlen = sizeof(sin);
231     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
232     port_ = ntoh(sin.sin_port);
233
234     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
235
236     file_t::pipe(pipe_);
237
238     pipe_[0].flags() |= O_NONBLOCK;
239
240     th_ = thread(&tcpsconn::accept_conn, this);
241 }
242
243 tcpsconn::~tcpsconn()
244 {
245     pipe_[1].close();
246     th_.join();
247
248     for (auto & i : conns_)
249         i.second->closeconn();
250 }
251
252 void tcpsconn::process_accept() {
253     sockaddr_in sin;
254     socklen_t slen = sizeof(sin);
255     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
256     if (s1 < 0) {
257         perror("tcpsconn::accept_conn error");
258         throw thread_exit_exception();
259     }
260
261     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
262     auto ch = make_shared<connection>(mgr_, s1, lossy_);
263
264     // garbage collect dead connections
265     for (auto i = conns_.begin(); i != conns_.end();) {
266         if (i->second->isdead())
267             conns_.erase(i++);
268         else
269             ++i;
270     }
271
272     conns_[ch->channo()] = ch;
273 }
274
275 void tcpsconn::accept_conn() {
276     fd_set rfds;
277     int max_fd = max((int)pipe_[0], (int)tcp_);
278
279     while (1) {
280         FD_ZERO(&rfds);
281         FD_SET(pipe_[0], &rfds);
282         FD_SET(tcp_, &rfds);
283
284         int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
285
286         if (ret < 0 && errno == EINTR)
287             continue;
288         else if (ret < 0) {
289             perror("accept_conn select:");
290             IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
291             VERIFY(0);
292         }
293
294         if (FD_ISSET(pipe_[0], &rfds))
295             return;
296
297         if (!FD_ISSET(tcp_, &rfds))
298             VERIFY(0);
299
300         try {
301             process_accept();
302         } catch (thread_exit_exception e) {
303             break;
304         }
305     }
306 }
307
308 shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
309     int s = socket(AF_INET, SOCK_STREAM, 0);
310     int yes = 1;
311     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
312     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
313         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
314         close(s);
315         return nullptr;
316     }
317     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
318     return make_shared<connection>(mgr, s, lossy);
319 }
320