1 #include "connection.h"
2 #include "rpc_protocol.h"
5 #include <netinet/tcp.h>
9 connection::connection(connection_delegate *m1, socket_t && f1, int l1)
10 : mgr_(m1), fd_(move(f1)), lossy_(l1)
12 fd_.flags() |= O_NONBLOCK;
14 signal(SIGPIPE, SIG_IGN);
16 create_time_ = steady_clock::now();
18 poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
21 connection::~connection() {
24 VERIFY(!wpdu_.buf.size());
27 shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
28 socket_t s = socket(AF_INET, SOCK_STREAM, 0);
29 s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
30 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
31 IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
35 IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
36 return make_shared<connection>(mgr, move(s), lossy);
39 void connection::closeconn() {
45 shutdown(fd_,SHUT_RDWR);
47 //after block_remove_fd, select will never wait on fd_
48 //and no callbacks will be active
49 poll_mgr::shared_mgr.block_remove_fd(fd_);
52 bool connection::send(const string & b) {
56 while (!dead_ && wpdu_.buf.size())
67 if ((random()%100) < lossy_) {
68 IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
69 shutdown(fd_,SHUT_RDWR);
76 poll_mgr::shared_mgr.block_remove_fd(fd_);
78 } else if (wpdu_.solong != wpdu_.buf.size()) {
79 // should be rare to need to explicitly add write callback
80 poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
81 while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
82 send_complete_.wait(ml);
84 bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size());
88 send_wait_.notify_all();
92 //fd_ is ready to be written
93 void connection::write_cb(int s) {
97 if (wpdu_.buf.size() == 0) {
98 poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
102 poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
105 VERIFY(wpdu_.solong != size_t_max);
106 if (wpdu_.solong < wpdu_.buf.size()) {
110 send_complete_.notify_one();
113 // fd_ is ready to be read
114 void connection::read_cb(int s) {
120 IF_LEVEL(5) LOG("got data on fd " << s);
123 if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
127 IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
128 poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
130 send_complete_.notify_one();
133 if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
134 if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
135 // connection_delegate has successfully consumed the pdu
142 bool connection::writepdu() {
143 VERIFY(wpdu_.solong != size_t_max);
144 if (wpdu_.solong == wpdu_.buf.size())
147 ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
149 if (errno != EAGAIN) {
150 IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
151 wpdu_.solong = size_t_max;
154 return (errno == EAGAIN);
156 wpdu_.solong += (size_t)n;
160 bool connection::readpdu() {
161 IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
162 if (!rpdu_.buf.size()) {
163 rpc_protocol::rpc_sz_t sz1;
164 ssize_t n = fd_.read(sz1);
170 VERIFY(errno!=EAGAIN);
174 if (n > 0 && n != sizeof(sz1)) {
175 IF_LEVEL(0) LOG("short read of sz");
179 size_t sz = ntoh(sz1);
181 if (sz > rpc_protocol::MAX_PDU) {
182 IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
186 IF_LEVEL(5) LOG("read size of datagram = " << sz);
188 VERIFY(rpdu_.buf.size() == 0);
189 rpdu_.buf = string(sz+sizeof(sz1), 0);
190 rpdu_.solong = sizeof(sz1);
193 ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
195 IF_LEVEL(5) LOG("read " << n << " bytes");
202 return (errno == EAGAIN);
204 rpdu_.solong += (size_t)n;
208 tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
209 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
211 sockaddr_in sin{}; // zero initialize
212 sin.sin_family = AF_INET;
213 sin.sin_port = hton(port);
215 tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
216 tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
217 tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
218 tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
220 // careful to exactly match type signature of bind arguments so we don't
221 // get std::bind instead
222 if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
223 perror("accept_loop bind");
227 if (listen(tcp_, 1000) < 0) {
228 perror("accept_loop listen");
232 socklen_t addrlen = sizeof(sin);
233 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
234 port_ = ntoh(sin.sin_port);
236 IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
238 poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
241 tcpsconn::~tcpsconn()
243 poll_mgr::shared_mgr.block_remove_fd(tcp_);
245 for (auto & i : conns_)
246 i.second->closeconn();
249 void tcpsconn::read_cb(int) {
251 socklen_t slen = sizeof(sin);
252 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
254 perror("tcpsconn::accept_conn error");
255 throw thread_exit_exception();
258 IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
259 auto ch = make_shared<connection>(mgr_, s1, lossy_);
261 // garbage collect dead connections
262 for (auto i = conns_.begin(); i != conns_.end();) {
263 if (i->second->isdead())
269 conns_[ch->channo()] = ch;