1 #include "connection.h"
2 #include "rpc_protocol.h"
5 #include <netinet/tcp.h>
9 connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
10 : fd(move(f1)), delegate_(delegate), lossy_(l1)
12 fd.flags() |= O_NONBLOCK;
14 signal(SIGPIPE, SIG_IGN);
16 poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
19 connection::~connection() {
25 shutdown(fd,SHUT_RDWR);
27 // after block_remove_fd, select will never wait on fd and no callbacks
29 poll_mgr::shared_mgr.block_remove_fd(fd);
31 VERIFY(!wpdu_.buf.size());
34 shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
35 socket_t s = socket(AF_INET, SOCK_STREAM, 0);
36 s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
37 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
38 IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
42 IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
43 return make_shared<connection>(delegate, move(s), lossy);
46 bool connection::send(const string & b) {
50 while (!dead_ && wpdu_.buf.size())
61 if ((random()%100) < lossy_) {
62 IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
63 shutdown(fd,SHUT_RDWR);
70 poll_mgr::shared_mgr.block_remove_fd(fd);
72 } else if (wpdu_.solong != wpdu_.buf.size()) {
73 // should be rare to need to explicitly add write callback
74 poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
75 while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
76 send_complete_.wait(ml);
78 bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
82 send_wait_.notify_all();
86 // fd is ready to be written
87 void connection::write_cb(int s) {
91 if (wpdu_.buf.size() == 0) {
92 poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
96 poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
99 VERIFY(wpdu_.solong != size_t_max);
100 if (wpdu_.solong < wpdu_.buf.size()) {
104 send_complete_.notify_one();
107 // fd is ready to be read
108 void connection::read_cb(int s) {
114 IF_LEVEL(5) LOG("got data on fd " << s);
116 if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
118 IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
119 poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
121 send_complete_.notify_one();
125 if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
126 if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
127 // connection_delegate has successfully consumed the pdu
134 bool connection::writepdu() {
135 VERIFY(wpdu_.solong != size_t_max);
136 if (wpdu_.solong == wpdu_.buf.size())
139 ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
141 if (errno != EAGAIN) {
142 IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
143 wpdu_.solong = size_t_max;
146 return (errno == EAGAIN);
148 wpdu_.solong += (size_t)n;
152 bool connection::readpdu() {
153 IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
154 if (!rpdu_.buf.size()) {
155 rpc_protocol::rpc_sz_t sz1;
156 ssize_t n = fd.read(sz1);
162 VERIFY(errno!=EAGAIN);
166 if (n > 0 && n != sizeof(sz1)) {
167 IF_LEVEL(0) LOG("short read of sz");
171 size_t sz = ntoh(sz1);
173 if (sz > rpc_protocol::MAX_PDU) {
174 IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
178 IF_LEVEL(5) LOG("read size of datagram = " << sz);
180 rpdu_.buf.assign(sz+sizeof(sz1), 0);
181 rpdu_.solong = sizeof(sz1);
184 ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
186 IF_LEVEL(5) LOG("read " << n << " bytes");
195 rpdu_.solong += (size_t)n;
199 connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
200 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
202 tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
203 tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
204 tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
205 tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
207 sockaddr_in sin{}; // zero initialize
208 sin.sin_family = AF_INET;
209 sin.sin_port = hton(port);
211 if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
212 perror("accept_loop bind");
216 if (listen(tcp_, 1000) < 0) {
217 perror("accept_loop listen");
221 socklen_t addrlen = sizeof(sin);
222 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
223 port_ = ntoh(sin.sin_port);
225 IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
227 poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
230 connection_listener::~connection_listener() {
231 poll_mgr::shared_mgr.block_remove_fd(tcp_);
234 void connection_listener::read_cb(int) {
236 socklen_t slen = sizeof(sin);
237 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
239 perror("connection_listener::accept_conn error");
240 throw thread_exit_exception();
243 IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
244 auto ch = make_shared<connection>(delegate_, s1, lossy_);
246 // garbage collect dead connections
247 for (auto i = conns_.begin(); i != conns_.end();) {
248 if (i->second->isdead())