1 #include "include/rpc/connection.h"
2 #include "include/rpc/rpc_protocol.h"
5 #include <netinet/tcp.h>
7 #include "include/rpc/marshall.h"
9 connection_delegate::~connection_delegate() {}
11 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
12 : fd(std::move(f1)), delegate_(delegate), lossy_(l1)
14 fd.flags() |= O_NONBLOCK;
16 signal(SIGPIPE, SIG_IGN);
18 global->shared_mgr.add_callback(fd, CB_RDONLY, this);
21 connection::~connection() {
27 fd.shutdown(SHUT_RDWR);
29 // after block_remove_fd, select will never wait on fd and no callbacks
31 global->shared_mgr.block_remove_fd(fd);
32 VERIFY(dead_ && wpdu_.status == unused);
35 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
36 socket_t s = socket(AF_INET, SOCK_STREAM, 0);
37 s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
38 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
39 IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
43 IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
44 return std::make_shared<connection>(delegate, std::move(s), lossy);
47 bool connection::send(const string & b) {
50 while (!dead_ && wpdu_.status != unused)
56 wpdu_ = {inflight, b, 0};
58 if (std::bernoulli_distribution(lossy_*.01)(global->random_generator)) {
59 IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
60 fd.shutdown(SHUT_RDWR);
66 global->shared_mgr.block_remove_fd(fd);
68 } else if (wpdu_.status == inflight && wpdu_.cursor < b.size()) {
69 // should be rare to need to explicitly add write callback
70 global->shared_mgr.add_callback(fd, CB_WRONLY, this);
71 while (!dead_ && wpdu_.status == inflight && wpdu_.cursor < b.size())
72 send_complete_.wait(ml);
74 bool ret = (!dead_ && wpdu_.status == inflight && wpdu_.cursor == b.size());
75 wpdu_ = {unused, "", 0};
76 send_wait_.notify_all();
80 // fd is ready to be written
81 void connection::write_cb(int s) {
85 if (wpdu_.status != inflight) {
86 global->shared_mgr.del_callback(fd, CB_WRONLY);
90 global->shared_mgr.del_callback(fd, CB_RDWR);
93 VERIFY(wpdu_.status != error);
94 if (wpdu_.cursor < wpdu_.buf.size())
97 send_complete_.notify_one();
100 bool connection::writepdu() {
101 VERIFY(wpdu_.status == inflight);
102 if (wpdu_.cursor == wpdu_.buf.size())
105 ssize_t n = fd.write(&wpdu_.buf[wpdu_.cursor], (wpdu_.buf.size()-wpdu_.cursor));
107 if (errno != EAGAIN) {
108 IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
109 wpdu_ = {error, "", 0};
111 return (errno == EAGAIN);
113 wpdu_.cursor += (size_t)n;
117 // fd is ready to be read
118 void connection::read_cb(int s) {
124 IF_LEVEL(5) LOG << "got data on fd " << s;
126 if (rpdu_.status == unused || rpdu_.cursor < rpdu_.buf.size()) {
128 IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
129 global->shared_mgr.del_callback(fd, CB_RDWR);
131 send_complete_.notify_one();
135 if (rpdu_.status == inflight && rpdu_.buf.size() == rpdu_.cursor) {
136 if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
137 // connection_delegate has successfully consumed the pdu
138 rpdu_ = {unused, "", 0};
143 bool connection::readpdu() {
144 IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
145 if (rpdu_.status == unused) {
146 rpc_protocol::rpc_sz_t sz1;
147 ssize_t n = fd.read(sz1);
153 VERIFY(errno!=EAGAIN);
157 if (n > 0 && n != sizeof(sz1)) {
158 IF_LEVEL(0) LOG << "short read of sz";
162 size_t sz = ntoh(sz1);
164 if (sz > rpc_protocol::MAX_PDU) {
165 IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << std::hex << sz1;
169 IF_LEVEL(5) LOG << "read size of datagram = " << sz;
171 rpdu_ = {inflight, string(sz+sizeof(sz1), 0), sizeof(sz1)};
174 ssize_t n = fd.read(&rpdu_.buf[rpdu_.cursor], rpdu_.buf.size() - rpdu_.cursor);
176 IF_LEVEL(5) LOG << "read " << n << " bytes";
181 rpdu_ = {unused, "", 0};
184 rpdu_.cursor += (size_t)n;
188 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
189 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
191 tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, int{1});
192 tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, int{1});
193 tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
194 tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
196 sockaddr_in sin = sockaddr_in(); // zero initialize
197 sin.sin_family = AF_INET;
198 sin.sin_port = hton(port);
200 if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
201 perror("accept_loop bind");
205 if (listen(tcp_, 1000) < 0) {
206 perror("accept_loop listen");
210 socklen_t addrlen = sizeof(sin);
211 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
212 port_ = ntoh(sin.sin_port);
214 IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
216 global->shared_mgr.add_callback(tcp_, CB_RDONLY, this);
219 connection_listener::~connection_listener() {
220 global->shared_mgr.block_remove_fd(tcp_);
223 void connection_listener::read_cb(int) {
225 socklen_t slen = sizeof(sin);
226 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
228 perror("connection_listener::accept_conn error");
229 throw std::runtime_error("connection listener failure");
232 IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
234 // garbage collect dead connections
235 for (auto i = conns_.begin(); i != conns_.end();) {
236 if (i->second->isdead())
242 conns_[s1] = std::make_shared<connection>(delegate_, s1, lossy_);