#include "types.h"
#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
-PollMgr *PollMgr::instance = NULL;
-static once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
-static void
-PollMgrInit()
-{
- PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+ public:
+ virtual void watch_fd(int fd, poll_flag flag) = 0;
+ virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+ virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+ virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+ public :
+ SelectAIO();
+ ~SelectAIO() {}
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ void wait_ready(vector<int> & readable, vector<int> & writable);
+
+ private:
+ fd_set rfds_, wfds_;
+ int highfds_;
+ file_t pipe_[2];
+ mutex m_;
+};
-PollMgr *
-PollMgr::Instance()
-{
- call_once(pollmgr_is_initialized, PollMgrInit);
- return instance;
-}
+#ifdef __linux__
+class EPollAIO : public wait_manager {
+ public:
+ EPollAIO() {}
+ ~EPollAIO() throw() { }
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ void wait_ready(vector<int> & readable, vector<int> & writable);
+
+ private:
+ file_t poll_ = epoll_create(MAX_POLL_FDS);
+ struct epoll_event ready_[MAX_POLL_FDS];
+ vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
-PollMgr::PollMgr() : pending_change_(false)
-{
- bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
- aio_ = new SelectAIO();
- //aio_ = new EPollAIO();
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
th_ = thread(&PollMgr::wait_loop, this);
}
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
{
- //never kill me!!!
- VERIFY(0);
+ lock ml(m_);
+ for (auto p : callbacks_)
+ aio_->unwatch_fd(p.first, CB_RDWR);
+ pending_change_ = true;
+ shutdown_ = true;
+ changedone_c_.wait(ml);
+ delete aio_;
+ th_.join();
}
void
PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
{
- VERIFY(fd < MAX_POLL_FDS);
-
lock ml(m_);
aio_->watch_fd(fd, flag);
callbacks_[fd] = ch;
}
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd. After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
lock ml(m_);
aio_->unwatch_fd(fd, CB_RDWR);
pending_change_ = true;
changedone_c_.wait(ml);
- callbacks_[fd] = NULL;
+ callbacks_[fd] = nullptr;
}
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
lock ml(m_);
- if (aio_->unwatch_fd(fd, flag)) {
- callbacks_[fd] = NULL;
- }
+ if (aio_->unwatch_fd(fd, flag))
+ callbacks_[fd] = nullptr;
}
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
- lock ml(m_);
- if (!callbacks_[fd] || callbacks_[fd]!=c)
- return false;
-
- return aio_->is_watched(fd, flag);
-}
-
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
+void PollMgr::wait_loop() {
vector<int> readable;
vector<int> writable;
+ aio_callback * cb;
while (1) {
{
if (pending_change_) {
pending_change_ = false;
changedone_c_.notify_all();
+ if (shutdown_)
+ break;
}
}
readable.clear();
writable.clear();
- aio_->wait_ready(&readable,&writable);
-
- if (!readable.size() && !writable.size()) {
- continue;
- }
- //no locking of m_
- //because no add_callback() and del_callback should
- //modify callbacks_[fd] while the fd is not dead
- for (unsigned int i = 0; i < readable.size(); i++) {
- int fd = readable[i];
- if (callbacks_[fd])
- callbacks_[fd]->read_cb(fd);
+ aio_->wait_ready(readable, writable);
+
+ for (auto fd : readable) {
+ { lock ml(m_); cb = callbacks_[fd]; }
+ if (cb) cb->read_cb(fd);
}
- for (unsigned int i = 0; i < writable.size(); i++) {
- int fd = writable[i];
- if (callbacks_[fd])
- callbacks_[fd]->write_cb(fd);
+ for (auto fd : writable) {
+ { lock ml(m_); cb = callbacks_[fd]; }
+ if (cb) cb->write_cb(fd);
}
}
}
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
{
FD_ZERO(&rfds_);
FD_ZERO(&wfds_);
- VERIFY(pipe(pipefd_) == 0);
- FD_SET(pipefd_[0], &rfds_);
- highfds_ = pipefd_[0];
+ file_t::pipe(pipe_);
- int flags = fcntl(pipefd_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipefd_[0], F_SETFL, flags);
-}
+ FD_SET(pipe_[0], &rfds_);
+ highfds_ = pipe_[0];
-SelectAIO::~SelectAIO()
-{
+ pipe_[0].flags() |= O_NONBLOCK;
}
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+ VERIFY(fd < MAX_POLL_FDS);
+
lock ml(m_);
if (highfds_ <= fd)
highfds_ = fd;
- if (flag == CB_RDONLY) {
- FD_SET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- FD_SET(fd,&wfds_);
- }else {
+ if (flag & CB_RDONLY)
FD_SET(fd,&rfds_);
+
+ if (flag & CB_WRONLY)
FD_SET(fd,&wfds_);
- }
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ VERIFY(pipe_[1].write((char)1)==1);
}
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
- lock ml(m_);
- if (flag == CB_RDONLY) {
- return FD_ISSET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- return FD_ISSET(fd,&wfds_);
- }else{
- return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
- }
-}
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+ VERIFY(fd < MAX_POLL_FDS);
-bool
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
lock ml(m_);
- if (flag == CB_RDONLY) {
+ VERIFY((flag & ~CB_RDWR) == 0);
+ if (flag & CB_RDONLY)
FD_CLR(fd, &rfds_);
- }else if (flag == CB_WRONLY) {
+ if (flag & CB_WRONLY)
FD_CLR(fd, &wfds_);
- }else if (flag == CB_RDWR) {
- FD_CLR(fd, &wfds_);
- FD_CLR(fd, &rfds_);
- }else{
- VERIFY(0);
- }
- if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
- if (fd == highfds_) {
- int newh = pipefd_[0];
- for (int i = 0; i <= highfds_; i++) {
- if (FD_ISSET(i, &rfds_)) {
- newh = i;
- }else if (FD_ISSET(i, &wfds_)) {
- newh = i;
- }
- }
- highfds_ = newh;
- }
- }
- if (flag == CB_RDWR) {
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ int newh = pipe_[0];
+ for (int i = 0; i <= highfds_; i++) {
+ if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+ newh = i;
}
+ highfds_ = newh;
+
+ if (flag == CB_RDWR)
+ VERIFY(pipe_[1].write((char)1)==1);
+
return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
}
-void
-SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
fd_set trfds, twfds;
int high;
int ret = select(high+1, &trfds, &twfds, NULL, NULL);
- if (ret < 0) {
- if (errno == EINTR) {
- return;
- } else {
- perror("select:");
- IF_LEVEL(0) LOG("select_loop failure errno " << errno);
- VERIFY(0);
- }
+ if (ret < 0 && errno == EINTR)
+ return;
+ else if (ret < 0) {
+ perror("select:");
+ IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+ VERIFY(0);
}
for (int fd = 0; fd <= high; fd++) {
- if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+ if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
char tmp;
- VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+ VERIFY(pipe_[0].read(tmp)==1);
VERIFY(tmp==1);
- }else {
- if (FD_ISSET(fd, &twfds)) {
- writable->push_back(fd);
- }
- if (FD_ISSET(fd, &trfds)) {
- readable->push_back(fd);
- }
+ } else {
+ if (FD_ISSET(fd, &twfds))
+ writable.push_back(fd);
+
+ if (FD_ISSET(fd, &trfds))
+ readable.push_back(fd);
}
}
}
#ifdef __linux__
-EPollAIO::EPollAIO()
-{
- pollfd_ = epoll_create(MAX_POLL_FDS);
- VERIFY(pollfd_ >= 0);
- bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
-
-EPollAIO::~EPollAIO()
-{
- close(pollfd_);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+ size_t fd = (size_t)fd_;
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
- int f;
- if (flag == CB_RDONLY) {
- f = EPOLLIN;
- }else if (flag == CB_WRONLY) {
- f = EPOLLOUT;
- }else { //flag == CB_RDWR
- f = EPOLLIN | EPOLLOUT;
- }
- return f;
-}
-
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
VERIFY(fd < MAX_POLL_FDS);
struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
- fdstatus_[fd] |= (int)flag;
+ int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+ fdstatus_[fd] |= (unsigned)flag;
ev.events = EPOLLET;
- ev.data.fd = fd;
+ ev.data.fd = fd_;
- if (fdstatus_[fd] & CB_RDONLY) {
+ if (fdstatus_[fd] & CB_RDONLY)
ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
+
+ if (fdstatus_[fd] & CB_WRONLY)
ev.events |= EPOLLOUT;
- }
- if (flag == CB_RDWR) {
+ if (flag == CB_RDWR)
VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
- }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+ VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
}
-bool
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+ size_t fd = (size_t)fd_;
+
VERIFY(fd < MAX_POLL_FDS);
- fdstatus_[fd] &= ~(int)flag;
+ fdstatus_[fd] &= ~(unsigned)flag;
struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+ int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = EPOLLET;
- ev.data.fd = fd;
+ ev.data.fd = fd_;
- if (fdstatus_[fd] & CB_RDONLY) {
+ if (fdstatus_[fd] & CB_RDONLY)
ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
+
+ if (fdstatus_[fd] & CB_WRONLY)
ev.events |= EPOLLOUT;
- }
- if (flag == CB_RDWR) {
+ if (flag == CB_RDWR)
VERIFY(op == EPOLL_CTL_DEL);
- }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+
+ VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
return (op == EPOLL_CTL_DEL);
}
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
- VERIFY(fd < MAX_POLL_FDS);
- return ((fdstatus_[fd] & CB_MASK) == flag);
-}
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
-void
-EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
- int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+ int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
for (int i = 0; i < nfds; i++) {
- if (ready_[i].events & EPOLLIN) {
- readable->push_back(ready_[i].data.fd);
- }
- if (ready_[i].events & EPOLLOUT) {
- writable->push_back(ready_[i].data.fd);
- }
+ if (ready_[i].events & EPOLLIN)
+ readable.push_back(ready_[i].data.fd);
+
+ if (ready_[i].events & EPOLLOUT)
+ writable.push_back(ready_[i].data.fd);
}
}