So many changes. Broken.
[invirt/third/libt4.git] / rpc / connection.cc
1 #include "include/rpc/connection.h"
2 #include "include/rpc/rpc_protocol.h"
3 #include <cerrno>
4 #include <csignal>
5 #include <netinet/tcp.h>
6 #include <unistd.h>
7 #include "include/rpc/marshall.h"
8
9 connection_delegate::~connection_delegate() {}
10
11 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
12 : fd(std::move(f1)), delegate_(delegate), lossy_(l1)
13 {
14     fd.flags() |= O_NONBLOCK;
15
16     signal(SIGPIPE, SIG_IGN);
17
18     global->shared_mgr.add_callback(fd, CB_RDONLY, this);
19 }
20
21 connection::~connection() {
22     {
23         lock ml(m_);
24         if (dead_)
25             return;
26         dead_ = true;
27         fd.shutdown(SHUT_RDWR);
28     }
29     // after block_remove_fd, select will never wait on fd and no callbacks
30     // will be active
31     global->shared_mgr.block_remove_fd(fd);
32     VERIFY(dead_ && wpdu_.status == unused);
33 }
34
35 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
36     socket_t s = socket(AF_INET, SOCK_STREAM, 0);
37     s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
38     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
39         IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
40         close(s);
41         return nullptr;
42     }
43     IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
44     return std::make_shared<connection>(delegate, std::move(s), lossy);
45 }
46
47 bool connection::send(const string & b) {
48     lock ml(m_);
49
50     while (!dead_ && wpdu_.status != unused)
51         send_wait_.wait(ml);
52
53     if (dead_)
54         return false;
55
56     wpdu_ = {inflight, b, 0};
57
58     if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
59         IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
60         fd.shutdown(SHUT_RDWR);
61     }
62
63     if (!writepdu()) {
64         dead_ = true;
65         ml.unlock();
66         global->shared_mgr.block_remove_fd(fd);
67         ml.lock();
68     } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
69         // should be rare to need to explicitly add write callback
70         global->shared_mgr.add_callback(fd, CB_WRONLY, this);
71         while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
72             send_complete_.wait(ml);
73     }
74     bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size());
75     wpdu_ = {unused, "", 0};
76     send_wait_.notify_all();
77     return ret;
78 }
79
80 // fd is ready to be written
81 void connection::write_cb(int s) {
82     lock ml(m_);
83     VERIFY(!dead_);
84     VERIFY(fd == s);
85     if (wpdu_.status != inflight) {
86         global->shared_mgr.del_callback(fd, CB_WRONLY);
87         return;
88     }
89     if (!writepdu()) {
90         global->shared_mgr.del_callback(fd, CB_RDWR);
91         dead_ = true;
92     } else {
93         VERIFY(wpdu_.status != error);
94         if (wpdu_.cursor < wpdu_.buf.size())
95             return;
96     }
97     send_complete_.notify_one();
98 }
99
100 bool connection::writepdu() {
101     VERIFY(wpdu_.status == inflight);
102     if (wpdu_.cursor == wpdu_.buf.size())
103         return true;
104
105     ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
106     if (n < 0) {
107         if (errno != EAGAIN) {
108             IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
109             wpdu_ = {error, "", 0};
110         }
111         return (errno == EAGAIN);
112     }
113     wpdu_.cursor += (size_t)n;
114     return true;
115 }
116
117 // fd is ready to be read
118 void connection::read_cb(int s) {
119     lock ml(m_);
120     VERIFY(fd == s);
121     if (dead_)
122         return;
123
124     IF_LEVEL(5) LOG << "got data on fd " << s;
125
126     if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
127         if (!readpdu()) {
128             IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
129             global->shared_mgr.del_callback(fd, CB_RDWR);
130             dead_ = true;
131             send_complete_.notify_one();
132         }
133     }
134
135     if (rpdu_.status == inflight && rpdu_.buf.size() == rpdu_.cursor) {
136         if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
137             // connection_delegate has successfully consumed the pdu
138             rpdu_ = {unused, "", 0};
139         }
140     }
141 }
142
143 bool connection::readpdu() {
144     IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
145     if (rpdu_.status == unused) {
146         rpc_protocol::rpc_sz_t sz1;
147         ssize_t n = fd.read(sz1);
148
149         if (n == 0)
150             return false;
151
152         if (n < 0) {
153             VERIFY(errno!=EAGAIN);
154             return false;
155         }
156
157         if (n > 0 && n != sizeof(sz1)) {
158             IF_LEVEL(0) LOG << "short read of sz";
159             return false;
160         }
161
162         size_t sz = ntoh(sz1);
163
164         if (sz > rpc_protocol::MAX_PDU) {
165             IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << std::hex << sz1;
166             return false;
167         }
168
169         IF_LEVEL(5) LOG << "read size of datagram = " << sz;
170
171         rpdu_ = {inflight, string(sz+sizeof(sz1), 0), sizeof(sz1)};
172     }
173
174     ssize_t n = fd.read(&rpdu_.buf[rpdu_.cursor], rpdu_.buf.size() - rpdu_.cursor);
175
176     IF_LEVEL(5) LOG << "read " << n << " bytes";
177
178     if (n <= 0) {
179         if (errno == EAGAIN)
180             return true;
181         rpdu_ = {unused, "", 0};
182         return false;
183     }
184     rpdu_.cursor += (size_t)n;
185     return true;
186 }
187
188 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
189 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
190 {
191     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1});
192     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1});
193     tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
194     tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
195
196     sockaddr_in sin = sockaddr_in(); // zero initialize
197     sin.sin_family = AF_INET;
198     sin.sin_port = hton(port);
199
200     if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
201         perror("accept_loop bind");
202         VERIFY(0);
203     }
204
205     if (listen(tcp_, 1000) < 0) {
206         perror("accept_loop listen");
207         VERIFY(0);
208     }
209
210     socklen_t addrlen = sizeof(sin);
211     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
212     port_ = ntoh(sin.sin_port);
213
214     IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
215
216     global->shared_mgr.add_callback(tcp_, CB_RDONLY, this);
217 }
218
219 connection_listener::~connection_listener() {
220     global->shared_mgr.block_remove_fd(tcp_);
221 }
222
223 void connection_listener::read_cb(int) {
224     sockaddr_in sin;
225     socklen_t slen = sizeof(sin);
226     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
227     if (s1 < 0) {
228         perror("connection_listener::accept_conn error");
229         throw std::runtime_error("connection listener failure");
230     }
231
232     IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
233
234     // garbage collect dead connections
235     for (auto i = conns_.begin(); i != conns_.end();) {
236         if (i->second->isdead())
237             conns_.erase(i++);
238         else
239             ++i;
240     }
241
242     conns_[s1] = std::make_shared<connection>(delegate_, s1, lossy_);
243 }