bool connection::readpdu() {
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
- rpc_sz_t sz1;
+ rpc_protocol::rpc_sz_t sz1;
ssize_t n = fd_.read(sz1);
if (n == 0)
size_t sz = ntoh(sz1);
- if (sz > MAX_PDU) {
+ if (sz > rpc_protocol::MAX_PDU) {
IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
return false;
}
tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
{
- sockaddr_in sin;
- memset(&sin, 0, sizeof(sin));
+ sockaddr_in sin{}; // zero initialize
sin.sin_family = AF_INET;
sin.sin_port = hton(port);
tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
-
- if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
- perror("accept_loop setsockopt");
-
- if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
- perror("accept_loop setsockopt");
+ tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
+ tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
// careful to exactly match type signature of bind arguments so we don't
// get std::bind instead
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
- file_t::pipe(pipe_);
-
- pipe_[0].flags() |= O_NONBLOCK;
-
- th_ = thread(&tcpsconn::accept_conn, this);
+ poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
}
tcpsconn::~tcpsconn()
{
- pipe_[1].close();
- th_.join();
+ poll_mgr::shared_mgr.block_remove_fd(tcp_);
for (auto & i : conns_)
i.second->closeconn();
}
-void tcpsconn::process_accept() {
+void tcpsconn::read_cb(int) {
sockaddr_in sin;
socklen_t slen = sizeof(sin);
int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
conns_[ch->channo()] = ch;
}
-
-void tcpsconn::accept_conn() {
- fd_set rfds;
- int max_fd = max((int)pipe_[0], (int)tcp_);
-
- while (1) {
- FD_ZERO(&rfds);
- FD_SET(pipe_[0], &rfds);
- FD_SET(tcp_, &rfds);
-
- int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
- if (ret < 0 && errno == EINTR)
- continue;
- else if (ret < 0) {
- perror("accept_conn select:");
- IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
- VERIFY(0);
- }
-
- if (FD_ISSET(pipe_[0], &rfds))
- return;
-
- VERIFY(FD_ISSET(tcp_, &rfds));
-
- try {
- process_accept();
- } catch (thread_exit_exception e) {
- break;
- }
- }
-}
-