1 #include "connection.h"
2 #include "rpc_protocol.h"
5 #include <netinet/tcp.h>
9 connection_delegate::~connection_delegate() {}
11 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
12 : fd(move(f1)), delegate_(delegate), lossy_(l1)
14 fd.flags() |= O_NONBLOCK;
16 signal(SIGPIPE, SIG_IGN);
18 poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
21 connection::~connection() {
27 shutdown(fd,SHUT_RDWR);
29 // after block_remove_fd, select will never wait on fd and no callbacks
31 poll_mgr::shared_mgr.block_remove_fd(fd);
33 VERIFY(!wpdu_.buf.size());
36 shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
37 socket_t s = socket(AF_INET, SOCK_STREAM, 0);
38 s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
39 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
40 IF_LEVEL(1) LOG_NONMEMBER << "failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
44 IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
45 return make_shared<connection>(delegate, move(s), lossy);
48 bool connection::send(const string & b) {
52 while (!dead_ && wpdu_.buf.size())
63 if ((random()%100) < lossy_) {
64 IF_LEVEL(1) LOG << "send LOSSY TEST shutdown fd " << fd;
65 shutdown(fd,SHUT_RDWR);
72 poll_mgr::shared_mgr.block_remove_fd(fd);
74 } else if (wpdu_.solong != wpdu_.buf.size()) {
75 // should be rare to need to explicitly add write callback
76 poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
77 while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
78 send_complete_.wait(ml);
80 bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
84 send_wait_.notify_all();
88 // fd is ready to be written
89 void connection::write_cb(int s) {
93 if (wpdu_.buf.size() == 0) {
94 poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
98 poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
101 VERIFY(wpdu_.solong != size_t_max);
102 if (wpdu_.solong < wpdu_.buf.size()) {
106 send_complete_.notify_one();
109 // fd is ready to be read
110 void connection::read_cb(int s) {
116 IF_LEVEL(5) LOG << "got data on fd " << s;
118 if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
120 IF_LEVEL(5) LOG << "readpdu on fd " << s << " failed; dying";
121 poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
123 send_complete_.notify_one();
127 if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
128 if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
129 // connection_delegate has successfully consumed the pdu
136 bool connection::writepdu() {
137 VERIFY(wpdu_.solong != size_t_max);
138 if (wpdu_.solong == wpdu_.buf.size())
141 ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
143 if (errno != EAGAIN) {
144 IF_LEVEL(1) LOG << "writepdu fd " << fd << " failure errno=" << errno;
145 wpdu_.solong = size_t_max;
148 return (errno == EAGAIN);
150 wpdu_.solong += (size_t)n;
154 bool connection::readpdu() {
155 IF_LEVEL(5) LOG << "the receive buffer has length " << rpdu_.buf.size();
156 if (!rpdu_.buf.size()) {
157 rpc_protocol::rpc_sz_t sz1;
158 ssize_t n = fd.read(sz1);
164 VERIFY(errno!=EAGAIN);
168 if (n > 0 && n != sizeof(sz1)) {
169 IF_LEVEL(0) LOG << "short read of sz";
173 size_t sz = ntoh(sz1);
175 if (sz > rpc_protocol::MAX_PDU) {
176 IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1;
180 IF_LEVEL(5) LOG << "read size of datagram = " << sz;
182 rpdu_.buf.assign(sz+sizeof(sz1), 0);
183 rpdu_.solong = sizeof(sz1);
186 ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
188 IF_LEVEL(5) LOG << "read " << n << " bytes";
197 rpdu_.solong += (size_t)n;
201 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
202 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
204 tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
205 tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
206 tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
207 tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
209 sockaddr_in sin = sockaddr_in(); // zero initialize
210 sin.sin_family = AF_INET;
211 sin.sin_port = hton(port);
213 if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
214 perror("accept_loop bind");
218 if (listen(tcp_, 1000) < 0) {
219 perror("accept_loop listen");
223 socklen_t addrlen = sizeof(sin);
224 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
225 port_ = ntoh(sin.sin_port);
227 IF_LEVEL(2) LOG << "listen on " << port_ << " " << sin.sin_port;
229 poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
232 connection_listener::~connection_listener() {
233 poll_mgr::shared_mgr.block_remove_fd(tcp_);
236 void connection_listener::read_cb(int) {
238 socklen_t slen = sizeof(sin);
239 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
241 perror("connection_listener::accept_conn error");
242 throw runtime_error("connection listener failure");
245 IF_LEVEL(2) LOG << "accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port);
246 auto ch = make_shared<connection>(delegate_, s1, lossy_);
248 // garbage collect dead connections
249 for (auto i = conns_.begin(); i != conns_.end();) {
250 if (i->second->isdead())