#include "rpc_protocol.h"
#include <cerrno>
#include <csignal>
-#include <fcntl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include <unistd.h>
-#include <sys/socket.h>
#include "marshall.h"
connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), lossy_(l1)
{
- int flags = fcntl(fd_, F_GETFL, NULL);
- fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+ fd_.flags() |= O_NONBLOCK;
signal(SIGPIPE, SIG_IGN);
create_time_ = steady_clock::now();
- PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+ PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
}
connection::~connection() {
VERIFY(dead_);
VERIFY(!wpdu_.buf.size());
- close(fd_);
}
void connection::incref() {
}
//after block_remove_fd, select will never wait on fd_
//and no callbacks will be active
- PollMgr::Instance()->block_remove_fd(fd_);
+ PollMgr::Instance().block_remove_fd(fd_);
}
void connection::decref() {
if (!writepdu()) {
dead_ = true;
ml.unlock();
- PollMgr::Instance()->block_remove_fd(fd_);
+ PollMgr::Instance().block_remove_fd(fd_);
ml.lock();
} else if (wpdu_.solong != wpdu_.buf.size()) {
// should be rare to need to explicitly add write callback
- PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+ PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
send_complete_.wait(ml);
}
VERIFY(!dead_);
VERIFY(fd_ == s);
if (wpdu_.buf.size() == 0) {
- PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+ PollMgr::Instance().del_callback(fd_,CB_WRONLY);
return;
}
if (!writepdu()) {
- PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+ PollMgr::Instance().del_callback(fd_, CB_RDWR);
dead_ = true;
} else {
VERIFY(wpdu_.solong != size_t_max);
if (!succ) {
IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
- PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+ PollMgr::Instance().del_callback(fd_,CB_RDWR);
dead_ = true;
send_complete_.notify_one();
}
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
rpc_sz_t sz1;
- ssize_t n = read(fd_, &sz1, sizeof(sz1));
+ ssize_t n = fd_.read(sz1);
- if (n == 0) {
+ if (n == 0)
return false;
- }
if (n < 0) {
VERIFY(errno!=EAGAIN);
rpdu_.solong = sizeof(sz1);
}
- ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+ ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
IF_LEVEL(5) LOG("read " << n << " bytes");
}
tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
-: mgr_(m1), lossy_(lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = hton(port);
- tcp_ = socket(AF_INET, SOCK_STREAM, 0);
- if (tcp_ < 0) {
- perror("accept_loop socket:");
- VERIFY(0);
- }
+ tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
+ tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
- int yes = 1;
- setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
- setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+ struct timeval timeout = {0, 50000};
+
+ if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+ perror("accept_loop setsockopt");
+
+ if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+ perror("accept_loop setsockopt");
// careful to exactly match type signature of bind arguments so we don't
// get std::bind instead
- if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
- perror("accept_loop tcp bind:");
+ if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+ perror("accept_loop bind");
VERIFY(0);
}
if (listen(tcp_, 1000) < 0) {
- perror("listen:");
+ perror("accept_loop listen");
VERIFY(0);
}
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
- if (pipe(pipe_) < 0) {
- perror("accept_loop pipe:");
- VERIFY(0);
- }
+ file_t::pipe(pipe_);
- int flags = fcntl(pipe_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipe_[0], F_SETFL, flags);
+ pipe_[0].flags() |= O_NONBLOCK;
th_ = thread(&tcpsconn::accept_conn, this);
}
tcpsconn::~tcpsconn()
{
- VERIFY(close(pipe_[1]) == 0);
+ pipe_[1].close();
th_.join();
- //close all the active connections
+ // close all the active connections
map<int, connection *>::iterator i;
for (i = conns_.begin(); i != conns_.end(); i++) {
i->second->closeconn();
void tcpsconn::accept_conn() {
fd_set rfds;
- int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
-
- try {
- while (1) {
- FD_ZERO(&rfds);
- FD_SET(pipe_[0], &rfds);
- FD_SET(tcp_, &rfds);
-
- int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
- if (ret < 0) {
- if (errno == EINTR) {
- continue;
- } else {
- perror("accept_conn select:");
- IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
- VERIFY(0);
- }
- }
-
- if (FD_ISSET(pipe_[0], &rfds)) {
- close(pipe_[0]);
- close(tcp_);
- return;
- }
- else if (FD_ISSET(tcp_, &rfds)) {
- process_accept();
- } else {
- VERIFY(0);
- }
+ int max_fd = max((int)pipe_[0], (int)tcp_);
+
+ while (1) {
+ FD_ZERO(&rfds);
+ FD_SET(pipe_[0], &rfds);
+ FD_SET(tcp_, &rfds);
+
+ int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+ if (ret < 0 && errno == EINTR)
+ continue;
+ else if (ret < 0) {
+ perror("accept_conn select:");
+ IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
+ VERIFY(0);
+ }
+
+ if (FD_ISSET(pipe_[0], &rfds))
+ return;
+
+ if (!FD_ISSET(tcp_, &rfds))
+ VERIFY(0);
+
+ try {
+ process_accept();
+ } catch (thread_exit_exception e) {
+ break;
}
- }
- catch (thread_exit_exception e)
- {
}
}