1 #include "connection.h"
2 #include "rpc_protocol.h"
5 #include <netinet/tcp.h>
9 connection_delegate::~connection_delegate() {}
11 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
12 : fd(std::move(f1)), delegate_(delegate), lossy_(l1)
14 fd.flags() |= O_NONBLOCK;
16 signal(SIGPIPE, SIG_IGN);
18 global->shared_mgr.add_callback(fd, CB_RDONLY, this);
21 connection::~connection() {
27 shutdown(fd,SHUT_RDWR);
29 // after block_remove_fd, select will never wait on fd and no callbacks
31 global->shared_mgr.block_remove_fd(fd);
33 VERIFY(wpdu_.status == unused);
36 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
37 socket_t s = socket(AF_INET, SOCK_STREAM, 0);
38 s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
39 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
40 IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
44 IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
45 return std::make_shared<connection>(delegate, std::move(s), lossy);
48 bool connection::send(const string & b) {
51 while (!dead_ && wpdu_.status != unused)
57 wpdu_ = {inflight, b, 0};
59 if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
60 IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
61 shutdown(fd,SHUT_RDWR);
67 global->shared_mgr.block_remove_fd(fd);
69 } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
70 // should be rare to need to explicitly add write callback
71 global->shared_mgr.add_callback(fd, CB_WRONLY, this);
72 while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
73 send_complete_.wait(ml);
75 bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size());
76 wpdu_ = {unused, "", 0};
77 send_wait_.notify_all();
81 // fd is ready to be written
82 void connection::write_cb(int s) {
86 if (wpdu_.status != inflight) {
87 global->shared_mgr.del_callback(fd, CB_WRONLY);
91 global->shared_mgr.del_callback(fd, CB_RDWR);
94 VERIFY(wpdu_.status != error);
95 if (wpdu_.cursor < wpdu_.buf.size())
98 send_complete_.notify_one();
101 bool connection::writepdu() {
102 VERIFY(wpdu_.status == inflight);
103 if (wpdu_.cursor == wpdu_.buf.size())
106 ssize_t n = write(fd, &wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
108 if (errno != EAGAIN) {
109 IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
110 wpdu_ = {error, "", 0};
112 return (errno == EAGAIN);
114 wpdu_.cursor += (size_t)n;
118 // fd is ready to be read
119 void connection::read_cb(int s) {
125 IF_LEVEL(5) LOG << "got data on fd " << s;
127 if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
129 IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
130 global->shared_mgr.del_callback(fd, CB_RDWR);
132 send_complete_.notify_one();
136 if (rpdu_.status == inflight && rpdu_.buf.size() == rpdu_.cursor) {
137 if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
138 // connection_delegate has successfully consumed the pdu
139 rpdu_ = {unused, "", 0};
144 bool connection::readpdu() {
145 IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
146 if (rpdu_.status == unused) {
147 rpc_protocol::rpc_sz_t sz1;
148 ssize_t n = fd.read(sz1);
154 VERIFY(errno!=EAGAIN);
158 if (n > 0 && n != sizeof(sz1)) {
159 IF_LEVEL(0) LOG << "short read of sz";
163 size_t sz = ntoh(sz1);
165 if (sz > rpc_protocol::MAX_PDU) {
166 IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << std::hex << sz1;
170 IF_LEVEL(5) LOG << "read size of datagram = " << sz;
172 rpdu_ = {inflight, string(sz+sizeof(sz1), 0), sizeof(sz1)};
175 ssize_t n = fd.read(&rpdu_.buf[rpdu_.cursor], rpdu_.buf.size() - rpdu_.cursor);
177 IF_LEVEL(5) LOG << "read " << n << " bytes";
182 rpdu_ = {unused, "", 0};
185 rpdu_.cursor += (size_t)n;
189 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
190 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
192 tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
193 tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
194 tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
195 tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
197 sockaddr_in sin = sockaddr_in(); // zero initialize
198 sin.sin_family = AF_INET;
199 sin.sin_port = hton(port);
201 if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
202 perror("accept_loop bind");
206 if (listen(tcp_, 1000) < 0) {
207 perror("accept_loop listen");
211 socklen_t addrlen = sizeof(sin);
212 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
213 port_ = ntoh(sin.sin_port);
215 IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
217 global->shared_mgr.add_callback(tcp_, CB_RDONLY, this);
220 connection_listener::~connection_listener() {
221 global->shared_mgr.block_remove_fd(tcp_);
224 void connection_listener::read_cb(int) {
226 socklen_t slen = sizeof(sin);
227 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
229 perror("connection_listener::accept_conn error");
230 throw std::runtime_error("connection listener failure");
233 IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
235 // garbage collect dead connections
236 for (auto i = conns_.begin(); i != conns_.end();) {
237 if (i->second->isdead())
243 conns_[s1] = std::make_shared<connection>(delegate_, s1, lossy_);