More clean-ups
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "connection.h"
2 #include "rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <netinet/tcp.h>
6 #include <unistd.h>
7 #include "marshall.h"
8
9 connection::connection(connection_delegate *m1, socket_t && f1, int l1)
10 : mgr_(m1), fd_(move(f1)), lossy_(l1)
11 {
12     fd_.flags() |= O_NONBLOCK;
13
14     signal(SIGPIPE, SIG_IGN);
15
16     create_time_ = steady_clock::now();
17
18     poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
19 }
20
21 connection::~connection() {
22     closeconn();
23     VERIFY(dead_);
24     VERIFY(!wpdu_.buf.size());
25 }
26
27 shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
28     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
29     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
30     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
31         IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
32         close(s);
33         return nullptr;
34     }
35     IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
36     return make_shared<connection>(mgr, move(s), lossy);
37 }
38
39 void connection::closeconn() {
40     {
41         lock ml(m_);
42         if (dead_)
43             return;
44         dead_ = true;
45         shutdown(fd_,SHUT_RDWR);
46     }
47     //after block_remove_fd, select will never wait on fd_
48     //and no callbacks will be active
49     poll_mgr::shared_mgr.block_remove_fd(fd_);
50 }
51
52 bool connection::send(const string & b) {
53     lock ml(m_);
54
55     waiters_++;
56     while (!dead_ && wpdu_.buf.size())
57         send_wait_.wait(ml);
58     waiters_--;
59
60     if (dead_)
61         return false;
62
63     wpdu_.buf = b;
64     wpdu_.solong = 0;
65
66     if (lossy_) {
67         if ((random()%100) < lossy_) {
68             IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
69             shutdown(fd_,SHUT_RDWR);
70         }
71     }
72
73     if (!writepdu()) {
74         dead_ = true;
75         ml.unlock();
76         poll_mgr::shared_mgr.block_remove_fd(fd_);
77         ml.lock();
78     } else if (wpdu_.solong != wpdu_.buf.size()) {
79         // should be rare to need to explicitly add write callback
80         poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
81         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
82             send_complete_.wait(ml);
83     }
84     bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
85     wpdu_.solong = 0;
86     wpdu_.buf.clear();
87     if (waiters_ > 0)
88         send_wait_.notify_all();
89     return ret;
90 }
91
92 //fd_ is ready to be written
93 void connection::write_cb(int s) {
94     lock ml(m_);
95     VERIFY(!dead_);
96     VERIFY(fd_ == s);
97     if (wpdu_.buf.size() == 0) {
98         poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
99         return;
100     }
101     if (!writepdu()) {
102         poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
103         dead_ = true;
104     } else {
105         VERIFY(wpdu_.solong != size_t_max);
106         if (wpdu_.solong < wpdu_.buf.size()) {
107             return;
108         }
109     }
110     send_complete_.notify_one();
111 }
112
113 // fd_ is ready to be read
114 void connection::read_cb(int s) {
115     lock ml(m_);
116     VERIFY(fd_ == s);
117     if (dead_)
118         return;
119
120     IF_LEVEL(5) LOG("got data on fd " << s);
121
122     bool succ = true;
123     if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
124         succ = readpdu();
125
126     if (!succ) {
127         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
128         poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
129         dead_ = true;
130         send_complete_.notify_one();
131     }
132
133     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
134         if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
135             // connection_delegate has successfully consumed the pdu
136             rpdu_.buf.clear();
137             rpdu_.solong = 0;
138         }
139     }
140 }
141
142 bool connection::writepdu() {
143     VERIFY(wpdu_.solong != size_t_max);
144     if (wpdu_.solong == wpdu_.buf.size())
145         return true;
146
147     ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
148     if (n < 0) {
149         if (errno != EAGAIN) {
150             IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
151             wpdu_.solong = size_t_max;
152             wpdu_.buf.clear();
153         }
154         return (errno == EAGAIN);
155     }
156     wpdu_.solong += (size_t)n;
157     return true;
158 }
159
160 bool connection::readpdu() {
161     IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
162     if (!rpdu_.buf.size()) {
163         rpc_protocol::rpc_sz_t sz1;
164         ssize_t n = fd_.read(sz1);
165
166         if (n == 0)
167             return false;
168
169         if (n < 0) {
170             VERIFY(errno!=EAGAIN);
171             return false;
172         }
173
174         if (n > 0 && n != sizeof(sz1)) {
175             IF_LEVEL(0) LOG("short read of sz");
176             return false;
177         }
178
179         size_t sz = ntoh(sz1);
180
181         if (sz > rpc_protocol::MAX_PDU) {
182             IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
183             return false;
184         }
185
186         IF_LEVEL(5) LOG("read size of datagram = " << sz);
187
188         VERIFY(rpdu_.buf.size() == 0);
189         rpdu_.buf = string(sz+sizeof(sz1), 0);
190         rpdu_.solong = sizeof(sz1);
191     }
192
193     ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
194
195     IF_LEVEL(5) LOG("read " << n << " bytes");
196
197     if (n <= 0) {
198         if (errno == EAGAIN)
199             return true;
200         rpdu_.buf.clear();
201         rpdu_.solong = 0;
202         return (errno == EAGAIN);
203     }
204     rpdu_.solong += (size_t)n;
205     return true;
206 }
207
208 tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
209 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
210 {
211     sockaddr_in sin{}; // zero initialize
212     sin.sin_family = AF_INET;
213     sin.sin_port = hton(port);
214
215     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
216     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
217     tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
218     tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
219
220     // careful to exactly match type signature of bind arguments so we don't
221     // get std::bind instead
222     if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
223         perror("accept_loop bind");
224         VERIFY(0);
225     }
226
227     if (listen(tcp_, 1000) < 0) {
228         perror("accept_loop listen");
229         VERIFY(0);
230     }
231
232     socklen_t addrlen = sizeof(sin);
233     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
234     port_ = ntoh(sin.sin_port);
235
236     IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
237
238     poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
239 }
240
241 tcpsconn::~tcpsconn()
242 {
243     poll_mgr::shared_mgr.block_remove_fd(tcp_);
244
245     for (auto & i : conns_)
246         i.second->closeconn();
247 }
248
249 void tcpsconn::read_cb(int) {
250     sockaddr_in sin;
251     socklen_t slen = sizeof(sin);
252     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
253     if (s1 < 0) {
254         perror("tcpsconn::accept_conn error");
255         throw thread_exit_exception();
256     }
257
258     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
259     auto ch = make_shared<connection>(mgr_, s1, lossy_);
260
261     // garbage collect dead connections
262     for (auto i = conns_.begin(); i != conns_.end();) {
263         if (i->second->isdead())
264             conns_.erase(i++);
265         else
266             ++i;
267     }
268
269     conns_[ch->channo()] = ch;
270 }