c16f6dc17315917fbd122ae0d92e64879c25572a
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <fcntl.h>
6 #include <sys/types.h>
7 #include <netinet/tcp.h>
8 #include <unistd.h>
9 #include <sys/socket.h>
10 #include "marshall.h"
11
12 connection::connection(chanmgr *m1, int f1, int l1)
13 : mgr_(m1), fd_(f1), lossy_(l1)
14 {
15     int flags = fcntl(fd_, F_GETFL, NULL);
16     fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
17
18     signal(SIGPIPE, SIG_IGN);
19
20     create_time_ = steady_clock::now();
21
22     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
23 }
24
25 connection::~connection() {
26     VERIFY(dead_);
27     VERIFY(!wpdu_.buf.size());
28     close(fd_);
29 }
30
31 void connection::incref() {
32     lock rl(ref_m_);
33     refno_++;
34 }
35
36 bool connection::isdead() {
37     lock ml(m_);
38     return dead_;
39 }
40
41 void connection::closeconn() {
42     {
43         lock ml(m_);
44         if (dead_)
45             return;
46         dead_ = true;
47         shutdown(fd_,SHUT_RDWR);
48     }
49     //after block_remove_fd, select will never wait on fd_
50     //and no callbacks will be active
51     PollMgr::Instance()->block_remove_fd(fd_);
52 }
53
54 void connection::decref() {
55     bool dead = false;
56     {
57         lock rl(ref_m_);
58         refno_--;
59         VERIFY(refno_>=0);
60         if (refno_==0) {
61             lock ml(m_);
62             dead = dead_;
63         }
64     }
65     if (dead)
66         delete this;
67 }
68
69 int connection::compare(connection *another) {
70     if (create_time_ > another->create_time_)
71         return 1;
72     if (create_time_ < another->create_time_)
73         return -1;
74     return 0;
75 }
76
77 bool connection::send(const string & b) {
78     lock ml(m_);
79
80     waiters_++;
81     while (!dead_ && wpdu_.buf.size())
82         send_wait_.wait(ml);
83     waiters_--;
84
85     if (dead_)
86         return false;
87
88     wpdu_.buf = b;
89     wpdu_.solong = 0;
90
91     if (lossy_) {
92         if ((random()%100) < lossy_) {
93             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
94             shutdown(fd_,SHUT_RDWR);
95         }
96     }
97
98     if (!writepdu()) {
99         dead_ = true;
100         ml.unlock();
101         PollMgr::Instance()->block_remove_fd(fd_);
102         ml.lock();
103     } else if (wpdu_.solong != wpdu_.buf.size()) {
104         // should be rare to need to explicitly add write callback
105         PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
106         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
107             send_complete_.wait(ml);
108     }
109     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
110     wpdu_.solong = 0;
111     wpdu_.buf.clear();
112     if (waiters_ > 0)
113         send_wait_.notify_all();
114     return ret;
115 }
116
117 //fd_ is ready to be written
118 void connection::write_cb(int s) {
119     lock ml(m_);
120     VERIFY(!dead_);
121     VERIFY(fd_ == s);
122     if (wpdu_.buf.size() == 0) {
123         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
124         return;
125     }
126     if (!writepdu()) {
127         PollMgr::Instance()->del_callback(fd_, CB_RDWR);
128         dead_ = true;
129     } else {
130         VERIFY(wpdu_.solong != size_t_max);
131         if (wpdu_.solong < wpdu_.buf.size()) {
132             return;
133         }
134     }
135     send_complete_.notify_one();
136 }
137
138 //fd_ is ready to be read
139 void connection::read_cb(int s) {
140     lock ml(m_);
141     VERIFY(fd_ == s);
142     if (dead_)  {
143         return;
144     }
145
146     IF_LEVEL(5) LOG("got data on fd " << s);
147
148     bool succ = true;
149     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
150         succ = readpdu();
151     }
152
153     if (!succ) {
154         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
155         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
156         dead_ = true;
157         send_complete_.notify_one();
158     }
159
160     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
161         if (mgr_->got_pdu(this, rpdu_.buf)) {
162             //chanmgr has successfully consumed the pdu
163             rpdu_.buf.clear();
164             rpdu_.solong = 0;
165         }
166     }
167 }
168
169 bool connection::writepdu() {
170     VERIFY(wpdu_.solong != size_t_max);
171     if (wpdu_.solong == wpdu_.buf.size())
172         return true;
173
174     if (wpdu_.solong == 0) {
175         rpc_sz_t sz = hton((rpc_sz_t)(wpdu_.buf.size() - sizeof(uint32_t)));
176         copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]);
177     }
178     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
179     if (n < 0) {
180         if (errno != EAGAIN) {
181             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
182             wpdu_.solong = size_t_max;
183             wpdu_.buf.clear();
184         }
185         return (errno == EAGAIN);
186     }
187     wpdu_.solong += (size_t)n;
188     return true;
189 }
190
191 bool connection::readpdu() {
192     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
193     if (!rpdu_.buf.size()) {
194         rpc_sz_t sz1;
195         ssize_t n = read(fd_, &sz1, sizeof(sz1));
196
197         if (n == 0) {
198             return false;
199         }
200
201         if (n < 0) {
202             VERIFY(errno!=EAGAIN);
203             return false;
204         }
205
206         if (n > 0 && n != sizeof(sz1)) {
207             IF_LEVEL(0) LOG("short read of sz");
208             return false;
209         }
210
211         size_t sz = ntoh(sz1);
212
213         if (sz > MAX_PDU) {
214             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
215             return false;
216         }
217
218         IF_LEVEL(5) LOG("read size of datagram = " << sz);
219
220         VERIFY(rpdu_.buf.size() == 0);
221         rpdu_.buf = string(sz+sizeof(sz1), 0);
222         copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]);
223         rpdu_.solong = sizeof(sz1);
224     }
225
226     ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
227
228     IF_LEVEL(5) LOG("read " << n << " bytes");
229
230     if (n <= 0) {
231         if (errno == EAGAIN)
232             return true;
233         rpdu_.buf.clear();
234         rpdu_.solong = 0;
235         return (errno == EAGAIN);
236     }
237     rpdu_.solong += (size_t)n;
238     return true;
239 }
240
241 tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
242 : mgr_(m1), lossy_(lossytest)
243 {
244     struct sockaddr_in sin;
245     memset(&sin, 0, sizeof(sin));
246     sin.sin_family = AF_INET;
247     sin.sin_port = hton(port);
248
249     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
250     if (tcp_ < 0) {
251         perror("accept_loop socket:");
252         VERIFY(0);
253     }
254
255     int yes = 1;
256     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
257     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
258
259     // careful to exactly match type signature of bind arguments so we don't
260     // get std::bind instead
261     if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
262         perror("accept_loop tcp bind:");
263         VERIFY(0);
264     }
265
266     if (listen(tcp_, 1000) < 0) {
267         perror("listen:");
268         VERIFY(0);
269     }
270
271     socklen_t addrlen = sizeof(sin);
272     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
273     port_ = ntoh(sin.sin_port);
274
275     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
276
277     if (pipe(pipe_) < 0) {
278         perror("accept_loop pipe:");
279         VERIFY(0);
280     }
281
282     int flags = fcntl(pipe_[0], F_GETFL, NULL);
283     flags |= O_NONBLOCK;
284     fcntl(pipe_[0], F_SETFL, flags);
285
286     th_ = thread(&tcpsconn::accept_conn, this);
287 }
288
289 tcpsconn::~tcpsconn()
290 {
291     VERIFY(close(pipe_[1]) == 0);
292     th_.join();
293
294     //close all the active connections
295     map<int, connection *>::iterator i;
296     for (i = conns_.begin(); i != conns_.end(); i++) {
297         i->second->closeconn();
298         i->second->decref();
299     }
300 }
301
302 void tcpsconn::process_accept() {
303     sockaddr_in sin;
304     socklen_t slen = sizeof(sin);
305     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
306     if (s1 < 0) {
307         perror("tcpsconn::accept_conn error");
308         throw thread_exit_exception();
309     }
310
311     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
312     connection *ch = new connection(mgr_, s1, lossy_);
313
314     // garbage collect all dead connections with refcount of 1
315     for (auto i = conns_.begin(); i != conns_.end();) {
316         if (i->second->isdead() && i->second->ref() == 1) {
317             IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
318             i->second->decref();
319             // Careful not to reuse i right after erase. (i++) will
320             // be evaluated before the erase call because in C++,
321             // there is a sequence point before a function call.
322             // See http://en.wikipedia.org/wiki/Sequence_point.
323             conns_.erase(i++);
324         } else
325             ++i;
326     }
327
328     conns_[ch->channo()] = ch;
329 }
330
331 void tcpsconn::accept_conn() {
332     fd_set rfds;
333     int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
334
335     try {
336         while (1) {
337             FD_ZERO(&rfds);
338             FD_SET(pipe_[0], &rfds);
339             FD_SET(tcp_, &rfds);
340
341             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
342
343             if (ret < 0) {
344                 if (errno == EINTR) {
345                     continue;
346                 } else {
347                     perror("accept_conn select:");
348                     IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
349                     VERIFY(0);
350                 }
351             }
352
353             if (FD_ISSET(pipe_[0], &rfds)) {
354                 close(pipe_[0]);
355                 close(tcp_);
356                 return;
357             }
358             else if (FD_ISSET(tcp_, &rfds)) {
359                 process_accept();
360             } else {
361                 VERIFY(0);
362             }
363         }
364     }
365     catch (thread_exit_exception e)
366     {
367     }
368 }
369
370 connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
371     int s = socket(AF_INET, SOCK_STREAM, 0);
372     int yes = 1;
373     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
374     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
375         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
376         close(s);
377         return NULL;
378     }
379     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
380     return new connection(mgr, s, lossy);
381 }
382