X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5a5c578e2e358a121cdb9234a6cb11c4ecfbf323..ded1e837093f09ec1234be29320525ad6ff200ae:/rpc/pollmgr.cc diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 023a7aa..4acff93 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,351 +1,281 @@ #include "types.h" #include -#include -#include +#include +#include "file.h" -#include "jsl_log.h" -#include "pollmgr.h" +#ifdef __linux__ +#include +#endif -PollMgr *PollMgr::instance = NULL; -static std::once_flag pollmgr_is_initialized; +#include "pollmgr.h" -static void -PollMgrInit() -{ - PollMgr::instance = new PollMgr(); -} +static PollMgr instance; + +PollMgr & PollMgr::Instance() { return instance; } + +class wait_manager { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual void wait_ready(vector & readable, vector & writable) = 0; + virtual ~wait_manager() throw() {} +}; + +class SelectAIO : public wait_manager { + public : + SelectAIO(); + ~SelectAIO() {} + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + fd_set rfds_, wfds_; + int highfds_; + file_t pipe_[2]; + mutex m_; +}; -PollMgr * -PollMgr::Instance() -{ - std::call_once(pollmgr_is_initialized, PollMgrInit); - return instance; -} +#ifdef __linux__ +class EPollAIO : public wait_manager { + public: + EPollAIO() {} + ~EPollAIO() throw() { } + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + file_t poll_ = epoll_create(MAX_POLL_FDS); + struct epoll_event ready_[MAX_POLL_FDS]; + vector fdstatus_ = vector(MAX_POLL_FDS); +}; +#endif -PollMgr::PollMgr() : pending_change_(false) -{ - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); - th_ = std::thread(&PollMgr::wait_loop, this); +PollMgr::PollMgr() : aio_(new SelectAIO()) { + th_ = thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() [[noreturn]] +PollMgr::~PollMgr() { - //never kill me!!! - VERIFY(0); + lock ml(m_); + for (auto p : callbacks_) + aio_->unwatch_fd(p.first, CB_RDWR); + pending_change_ = true; + shutdown_ = true; + changedone_c_.wait(ml); + delete aio_; + th_.join(); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); - lock ml(m_); - aio_->watch_fd(fd, flag); + aio_->watch_fd(fd, flag); - VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); - callbacks_[fd] = ch; + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; } -//remove all callbacks related to fd -//the return guarantees that callbacks related to fd -//will never be called again -void -PollMgr::block_remove_fd(int fd) -{ +// Remove all callbacks related to fd. After this returns, we guarantee that +// callbacks related to fd will never be called again. +void PollMgr::block_remove_fd(int fd) { lock ml(m_); - aio_->unwatch_fd(fd, CB_RDWR); - pending_change_ = true; + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; changedone_c_.wait(ml); - callbacks_[fd] = NULL; + callbacks_[fd] = nullptr; } -void -PollMgr::del_callback(int fd, poll_flag flag) -{ +void PollMgr::del_callback(int fd, poll_flag flag) { lock ml(m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + if (aio_->unwatch_fd(fd, flag)) + callbacks_[fd] = nullptr; } -bool -PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) -{ - lock ml(m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; - - return aio_->is_watched(fd, flag); -} +void PollMgr::wait_loop() { + vector readable; + vector writable; + aio_callback * cb; -void -PollMgr::wait_loop() [[noreturn]] -{ - - std::vector readable; - std::vector writable; - - while (1) { - { + while (1) { + { lock ml(m_); - if (pending_change_) { - pending_change_ = false; + if (pending_change_) { + pending_change_ = false; changedone_c_.notify_all(); - } - } - readable.clear(); - writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); - } - - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); - } - } + if (shutdown_) + break; + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(readable, writable); + + for (auto fd : readable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->read_cb(fd); + } + + for (auto fd : writable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->write_cb(fd); + } + } } -SelectAIO::SelectAIO() : highfds_(0) +SelectAIO::SelectAIO() { - FD_ZERO(&rfds_); - FD_ZERO(&wfds_); + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + file_t::pipe(pipe_); - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); -} + FD_SET(pipe_[0], &rfds_); + highfds_ = pipe_[0]; -SelectAIO::~SelectAIO() -{ + pipe_[0].flags() |= O_NONBLOCK; } -void -SelectAIO::watch_fd(int fd, poll_flag flag) -{ - lock ml(m_); - if (highfds_ <= fd) - highfds_ = fd; - - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { - FD_SET(fd,&rfds_); - FD_SET(fd,&wfds_); - } - - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); -} +void SelectAIO::watch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); -bool -SelectAIO::is_watched(int fd, poll_flag flag) -{ lock ml(m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } + if (highfds_ <= fd) + highfds_ = fd; + + if (flag & CB_RDONLY) + FD_SET(fd,&rfds_); + + if (flag & CB_WRONLY) + FD_SET(fd,&wfds_); + + VERIFY(pipe_[1].write((char)1)==1); } -bool -SelectAIO::unwatch_fd(int fd, poll_flag flag) -{ +bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); + lock ml(m_); - if (flag == CB_RDONLY) { - FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { - FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); - } - return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); + VERIFY((flag & ~CB_RDWR) == 0); + if (flag & CB_RDONLY) + FD_CLR(fd, &rfds_); + if (flag & CB_WRONLY) + FD_CLR(fd, &wfds_); + + int newh = pipe_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_)) + newh = i; + } + highfds_ = newh; + + if (flag == CB_RDWR) + VERIFY(pipe_[1].write((char)1)==1); + + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } -void -SelectAIO::wait_ready(std::vector *readable, std::vector *writable) -{ - fd_set trfds, twfds; - int high; +void SelectAIO::wait_ready(vector & readable, vector & writable) { + + fd_set trfds, twfds; + int high; - { + { lock ml(m_); - trfds = rfds_; - twfds = wfds_; - high = highfds_; - } - - int ret = select(high+1, &trfds, &twfds, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); - VERIFY(0); - } - } - - for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { - char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); - VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } - } - } + trfds = rfds_; + twfds = wfds_; + high = highfds_; + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0 && errno == EINTR) + return; + else if (ret < 0) { + perror("select:"); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); + VERIFY(0); + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY(pipe_[0].read(tmp)==1); + VERIFY(tmp==1); + } else { + if (FD_ISSET(fd, &twfds)) + writable.push_back(fd); + + if (FD_ISSET(fd, &trfds)) + readable.push_back(fd); + } + } } #ifdef __linux__ -EPollAIO::EPollAIO() -{ - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); -} +void EPollAIO::watch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; -EPollAIO::~EPollAIO() -{ - close(pollfd_); -} + VERIFY(fd < MAX_POLL_FDS); -static inline -int poll_flag_to_event(poll_flag flag) -{ - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; + struct epoll_event ev; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (unsigned)flag; + + ev.events = EPOLLET; + ev.data.fd = fd_; + + if (fdstatus_[fd] & CB_RDONLY) + ev.events |= EPOLLIN; + + if (fdstatus_[fd] & CB_WRONLY) + ev.events |= EPOLLOUT; + + if (flag == CB_RDWR) + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); } -void -EPollAIO::watch_fd(int fd, poll_flag flag) -{ - VERIFY(fd < MAX_POLL_FDS); +bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(unsigned)flag; - ev.events = EPOLLET; - ev.data.fd = fd; + struct epoll_event ev; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } + ev.events = EPOLLET; + ev.data.fd = fd_; - if (flag == CB_RDWR) { - VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } + if (fdstatus_[fd] & CB_RDONLY) + ev.events |= EPOLLIN; - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); -} + if (fdstatus_[fd] & CB_WRONLY) + ev.events |= EPOLLOUT; -bool -EPollAIO::unwatch_fd(int fd, poll_flag flag) -{ - VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; - - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; - - ev.events = EPOLLET; - ev.data.fd = fd; - - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } - - if (flag == CB_RDWR) { - VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); - return (op == EPOLL_CTL_DEL); -} + if (flag == CB_RDWR) + VERIFY(op == EPOLL_CTL_DEL); -bool -EPollAIO::is_watched(int fd, poll_flag flag) -{ - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); } -void -EPollAIO::wait_ready(std::vector *readable, std::vector *writable) -{ - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); - for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } - } +void EPollAIO::wait_ready(vector & readable, vector & writable) { + + int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) + readable.push_back(ready_[i].data.fd); + + if (ready_[i].events & EPOLLOUT) + writable.push_back(ready_[i].data.fd); + } } #endif