Working on g++ compatibility
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <netinet/tcp.h>
6 #include <unistd.h>
7 #include "marshall.h"
8
9 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
10 : fd(move(f1)), delegate_(delegate), lossy_(l1)
11 {
12     fd.flags() |= O_NONBLOCK;
13
14     signal(SIGPIPE, SIG_IGN);
15
16     poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
17 }
18
19 connection::~connection() {
20     {
21         lock ml(m_);
22         if (dead_)
23             return;
24         dead_ = true;
25         shutdown(fd,SHUT_RDWR);
26     }
27     // after block_remove_fd, select will never wait on fd and no callbacks
28     // will be active
29     poll_mgr::shared_mgr.block_remove_fd(fd);
30     VERIFY(dead_);
31     VERIFY(!wpdu_.buf.size());
32 }
33
34 shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
35     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
36     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
37     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
38         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
39         close(s);
40         return nullptr;
41     }
42     IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
43     return make_shared<connection>(delegate, move(s), lossy);
44 }
45
46 bool connection::send(const string & b) {
47     lock ml(m_);
48
49     waiters_++;
50     while (!dead_ && wpdu_.buf.size())
51         send_wait_.wait(ml);
52     waiters_--;
53
54     if (dead_)
55         return false;
56
57     wpdu_.buf = b;
58     wpdu_.solong = 0;
59
60     if (lossy_) {
61         if ((random()%100) < lossy_) {
62             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
63             shutdown(fd,SHUT_RDWR);
64         }
65     }
66
67     if (!writepdu()) {
68         dead_ = true;
69         ml.unlock();
70         poll_mgr::shared_mgr.block_remove_fd(fd);
71         ml.lock();
72     } else if (wpdu_.solong != wpdu_.buf.size()) {
73         // should be rare to need to explicitly add write callback
74         poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
75         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
76             send_complete_.wait(ml);
77     }
78     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
79     wpdu_.solong = 0;
80     wpdu_.buf.clear();
81     if (waiters_ > 0)
82         send_wait_.notify_all();
83     return ret;
84 }
85
86 // fd is ready to be written
87 void connection::write_cb(int s) {
88     lock ml(m_);
89     VERIFY(!dead_);
90     VERIFY(fd == s);
91     if (wpdu_.buf.size() == 0) {
92         poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
93         return;
94     }
95     if (!writepdu()) {
96         poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
97         dead_ = true;
98     } else {
99         VERIFY(wpdu_.solong != size_t_max);
100         if (wpdu_.solong < wpdu_.buf.size()) {
101             return;
102         }
103     }
104     send_complete_.notify_one();
105 }
106
107 // fd is ready to be read
108 void connection::read_cb(int s) {
109     lock ml(m_);
110     VERIFY(fd == s);
111     if (dead_)
112         return;
113
114     IF_LEVEL(5) LOG("got data on fd " << s);
115
116     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
117         if (!readpdu()) {
118             IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
119             poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
120             dead_ = true;
121             send_complete_.notify_one();
122         }
123     }
124
125     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
126         if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
127             // connection_delegate has successfully consumed the pdu
128             rpdu_.buf.clear();
129             rpdu_.solong = 0;
130         }
131     }
132 }
133
134 bool connection::writepdu() {
135     VERIFY(wpdu_.solong != size_t_max);
136     if (wpdu_.solong == wpdu_.buf.size())
137         return true;
138
139     ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
140     if (n < 0) {
141         if (errno != EAGAIN) {
142             IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
143             wpdu_.solong = size_t_max;
144             wpdu_.buf.clear();
145         }
146         return (errno == EAGAIN);
147     }
148     wpdu_.solong += (size_t)n;
149     return true;
150 }
151
152 bool connection::readpdu() {
153     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
154     if (!rpdu_.buf.size()) {
155         rpc_protocol::rpc_sz_t sz1;
156         ssize_t n = fd.read(sz1);
157
158         if (n == 0)
159             return false;
160
161         if (n < 0) {
162             VERIFY(errno!=EAGAIN);
163             return false;
164         }
165
166         if (n > 0 && n != sizeof(sz1)) {
167             IF_LEVEL(0) LOG("short read of sz");
168             return false;
169         }
170
171         size_t sz = ntoh(sz1);
172
173         if (sz > rpc_protocol::MAX_PDU) {
174             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
175             return false;
176         }
177
178         IF_LEVEL(5) LOG("read size of datagram = " << sz);
179
180         rpdu_.buf.assign(sz+sizeof(sz1), 0);
181         rpdu_.solong = sizeof(sz1);
182     }
183
184     ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
185
186     IF_LEVEL(5) LOG("read " << n << " bytes");
187
188     if (n <= 0) {
189         if (errno == EAGAIN)
190             return true;
191         rpdu_.buf.clear();
192         rpdu_.solong = 0;
193         return false;
194     }
195     rpdu_.solong += (size_t)n;
196     return true;
197 }
198
199 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
200 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
201 {
202     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
203     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
204     tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
205     tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
206
207     sockaddr_in sin{}; // zero initialize
208     sin.sin_family = AF_INET;
209     sin.sin_port = hton(port);
210
211     if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
212         perror("accept_loop bind");
213         VERIFY(0);
214     }
215
216     if (listen(tcp_, 1000) < 0) {
217         perror("accept_loop listen");
218         VERIFY(0);
219     }
220
221     socklen_t addrlen = sizeof(sin);
222     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
223     port_ = ntoh(sin.sin_port);
224
225     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
226
227     poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
228 }
229
230 connection_listener::~connection_listener() {
231     poll_mgr::shared_mgr.block_remove_fd(tcp_);
232 }
233
234 void connection_listener::read_cb(int) {
235     sockaddr_in sin;
236     socklen_t slen = sizeof(sin);
237     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
238     if (s1 < 0) {
239         perror("connection_listener::accept_conn error");
240         throw thread_exit_exception();
241     }
242
243     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
244     auto ch = make_shared<connection>(delegate_, s1, lossy_);
245
246     // garbage collect dead connections
247     for (auto i = conns_.begin(); i != conns_.end();) {
248         if (i->second->isdead())
249             conns_.erase(i++);
250         else
251             ++i;
252     }
253
254     conns_[s1] = ch;
255 }