1 // std::bind and syscall bind have the same name, so don't use std::bind in this file
2 #define LIBT4_NO_FUNCTIONAL
3 #include "connection.h"
6 #include <netinet/tcp.h>
10 #include <sys/socket.h>
12 #define MAX_PDU (10<<20) //maximum PDF is 10M
14 connection::connection(chanmgr *m1, int f1, int l1)
15 : mgr_(m1), fd_(f1), lossy_(l1)
17 int flags = fcntl(fd_, F_GETFL, NULL);
19 fcntl(fd_, F_SETFL, flags);
21 signal(SIGPIPE, SIG_IGN);
23 create_time_ = steady_clock::now();
25 PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
28 connection::~connection() {
36 void connection::incref() {
41 bool connection::isdead() {
46 void connection::closeconn() {
51 shutdown(fd_,SHUT_RDWR);
56 //after block_remove_fd, select will never wait on fd_
57 //and no callbacks will be active
58 PollMgr::Instance()->block_remove_fd(fd_);
61 void connection::decref() {
76 int connection::compare(connection *another) {
77 if (create_time_ > another->create_time_)
79 if (create_time_ < another->create_time_)
84 bool connection::send(char *b, size_t sz) {
87 while (!dead_ && wpdu_.buf) {
99 if ((random()%100) < lossy_) {
100 IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
101 shutdown(fd_,SHUT_RDWR);
108 PollMgr::Instance()->block_remove_fd(fd_);
111 if (wpdu_.solong == wpdu_.sz) {
113 //should be rare to need to explicitly add write callback
114 PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
115 while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
116 send_complete_.wait(ml);
120 bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
121 wpdu_.solong = wpdu_.sz = 0;
124 send_wait_.notify_all();
128 //fd_ is ready to be written
129 void connection::write_cb(int s) {
134 PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
138 PollMgr::Instance()->del_callback(fd_, CB_RDWR);
141 VERIFY(wpdu_.solong != size_t_max);
142 if (wpdu_.solong < wpdu_.sz) {
146 send_complete_.notify_one();
149 //fd_ is ready to be read
150 void connection::read_cb(int s) {
158 if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
163 PollMgr::Instance()->del_callback(fd_,CB_RDWR);
165 send_complete_.notify_one();
168 if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
169 if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
170 //chanmgr has successfully consumed the pdu
172 rpdu_.sz = rpdu_.solong = 0;
177 bool connection::writepdu() {
178 VERIFY(wpdu_.solong != size_t_max);
179 if (wpdu_.solong == wpdu_.sz)
182 if (wpdu_.solong == 0) {
183 uint32_t sz = htonl((uint32_t)wpdu_.sz);
184 bcopy(&sz,wpdu_.buf,sizeof(sz));
186 ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
188 if (errno != EAGAIN) {
189 IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
190 wpdu_.solong = size_t_max;
193 return (errno == EAGAIN);
195 wpdu_.solong += (size_t)n;
199 bool connection::readpdu() {
202 ssize_t n = read(fd_, &sz1, sizeof(sz1));
209 VERIFY(errno!=EAGAIN);
213 if (n > 0 && n != sizeof(sz1)) {
214 IF_LEVEL(0) LOG("connection::readpdu short read of sz");
218 size_t sz = ntohl(sz1);
221 IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
226 VERIFY(rpdu_.buf == NULL);
227 rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
229 bcopy(&sz1,rpdu_.buf,sizeof(sz1));
230 rpdu_.solong = sizeof(sz1);
233 ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
240 rpdu_.sz = rpdu_.solong = 0;
241 return (errno == EAGAIN);
243 rpdu_.solong += (size_t)n;
247 tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
248 : mgr_(m1), lossy_(lossytest)
250 struct sockaddr_in sin;
251 memset(&sin, 0, sizeof(sin));
252 sin.sin_family = AF_INET;
253 sin.sin_port = htons(port);
255 tcp_ = socket(AF_INET, SOCK_STREAM, 0);
257 perror("tcpsconn::tcpsconn accept_loop socket:");
262 setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
263 setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
265 if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
266 perror("accept_loop tcp bind:");
270 if (listen(tcp_, 1000) < 0) {
271 perror("tcpsconn::tcpsconn listen:");
275 socklen_t addrlen = sizeof(sin);
276 VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
277 port_ = ntohs(sin.sin_port);
279 IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
281 if (pipe(pipe_) < 0) {
282 perror("accept_loop pipe:");
286 int flags = fcntl(pipe_[0], F_GETFL, NULL);
288 fcntl(pipe_[0], F_SETFL, flags);
290 th_ = thread(&tcpsconn::accept_conn, this);
293 tcpsconn::~tcpsconn()
295 VERIFY(close(pipe_[1]) == 0);
298 //close all the active connections
299 map<int, connection *>::iterator i;
300 for (i = conns_.begin(); i != conns_.end(); i++) {
301 i->second->closeconn();
306 void tcpsconn::process_accept() {
308 socklen_t slen = sizeof(sin);
309 int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
311 perror("tcpsconn::accept_conn error");
312 throw thread_exit_exception();
315 IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
316 connection *ch = new connection(mgr_, s1, lossy_);
318 // garbage collect all dead connections with refcount of 1
319 for (auto i = conns_.begin(); i != conns_.end();) {
320 if (i->second->isdead() && i->second->ref() == 1) {
321 IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
323 // Careful not to reuse i right after erase. (i++) will
324 // be evaluated before the erase call because in C++,
325 // there is a sequence point before a function call.
326 // See http://en.wikipedia.org/wiki/Sequence_point.
332 conns_[ch->channo()] = ch;
335 void tcpsconn::accept_conn() {
337 int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
342 FD_SET(pipe_[0], &rfds);
345 int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
348 if (errno == EINTR) {
351 perror("accept_conn select:");
352 IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
357 if (FD_ISSET(pipe_[0], &rfds)) {
362 else if (FD_ISSET(tcp_, &rfds)) {
369 catch (thread_exit_exception e)
374 connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
375 int s = socket(AF_INET, SOCK_STREAM, 0);
377 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
378 if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
379 IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
383 IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
384 return new connection(mgr, s, lossy);