X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/603bac8fcb3697f283e6537d81b4a92e457ebbad..3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b:/rpc/pollmgr.cc diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index a938284..4acff93 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,46 +1,77 @@ #include "types.h" #include -#include -#include +#include +#include "file.h" -#include "pollmgr.h" +#ifdef __linux__ +#include +#endif -PollMgr *PollMgr::instance = NULL; -static once_flag pollmgr_is_initialized; +#include "pollmgr.h" -static void -PollMgrInit() -{ - PollMgr::instance = new PollMgr(); -} +static PollMgr instance; + +PollMgr & PollMgr::Instance() { return instance; } + +class wait_manager { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual void wait_ready(vector & readable, vector & writable) = 0; + virtual ~wait_manager() throw() {} +}; + +class SelectAIO : public wait_manager { + public : + SelectAIO(); + ~SelectAIO() {} + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + fd_set rfds_, wfds_; + int highfds_; + file_t pipe_[2]; + mutex m_; +}; -PollMgr * -PollMgr::Instance() -{ - call_once(pollmgr_is_initialized, PollMgrInit); - return instance; -} +#ifdef __linux__ +class EPollAIO : public wait_manager { + public: + EPollAIO() {} + ~EPollAIO() throw() { } + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + file_t poll_ = epoll_create(MAX_POLL_FDS); + struct epoll_event ready_[MAX_POLL_FDS]; + vector fdstatus_ = vector(MAX_POLL_FDS); +}; +#endif -PollMgr::PollMgr() : pending_change_(false) -{ - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); +PollMgr::PollMgr() : aio_(new SelectAIO()) { th_ = thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() [[noreturn]] +PollMgr::~PollMgr() { - //never kill me!!! - VERIFY(0); + lock ml(m_); + for (auto p : callbacks_) + aio_->unwatch_fd(p.first, CB_RDWR); + pending_change_ = true; + shutdown_ = true; + changedone_c_.wait(ml); + delete aio_; + th_.join(); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); - lock ml(m_); aio_->watch_fd(fd, flag); @@ -48,44 +79,26 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) callbacks_[fd] = ch; } -//remove all callbacks related to fd -//the return guarantees that callbacks related to fd -//will never be called again -void -PollMgr::block_remove_fd(int fd) -{ +// Remove all callbacks related to fd. After this returns, we guarantee that +// callbacks related to fd will never be called again. +void PollMgr::block_remove_fd(int fd) { lock ml(m_); aio_->unwatch_fd(fd, CB_RDWR); pending_change_ = true; changedone_c_.wait(ml); - callbacks_[fd] = NULL; + callbacks_[fd] = nullptr; } -void -PollMgr::del_callback(int fd, poll_flag flag) -{ +void PollMgr::del_callback(int fd, poll_flag flag) { lock ml(m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + if (aio_->unwatch_fd(fd, flag)) + callbacks_[fd] = nullptr; } -bool -PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) -{ - lock ml(m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; - - return aio_->is_watched(fd, flag); -} - -void -PollMgr::wait_loop() [[noreturn]] -{ - +void PollMgr::wait_loop() { vector readable; vector writable; + aio_callback * cb; while (1) { { @@ -93,121 +106,80 @@ PollMgr::wait_loop() [[noreturn]] if (pending_change_) { pending_change_ = false; changedone_c_.notify_all(); + if (shutdown_) + break; } } readable.clear(); writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); + aio_->wait_ready(readable, writable); + + for (auto fd : readable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->read_cb(fd); } - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); + for (auto fd : writable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->write_cb(fd); } } } -SelectAIO::SelectAIO() : highfds_(0) +SelectAIO::SelectAIO() { FD_ZERO(&rfds_); FD_ZERO(&wfds_); - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + file_t::pipe(pipe_); - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); -} + FD_SET(pipe_[0], &rfds_); + highfds_ = pipe_[0]; -SelectAIO::~SelectAIO() -{ + pipe_[0].flags() |= O_NONBLOCK; } -void -SelectAIO::watch_fd(int fd, poll_flag flag) -{ +void SelectAIO::watch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); + lock ml(m_); if (highfds_ <= fd) highfds_ = fd; - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { + if (flag & CB_RDONLY) FD_SET(fd,&rfds_); + + if (flag & CB_WRONLY) FD_SET(fd,&wfds_); - } - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + VERIFY(pipe_[1].write((char)1)==1); } -bool -SelectAIO::is_watched(int fd, poll_flag flag) -{ - lock ml(m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } -} +bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); -bool -SelectAIO::unwatch_fd(int fd, poll_flag flag) -{ lock ml(m_); - if (flag == CB_RDONLY) { + VERIFY((flag & ~CB_RDWR) == 0); + if (flag & CB_RDONLY) FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { + if (flag & CB_WRONLY) FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + int newh = pipe_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_)) + newh = i; } + highfds_ = newh; + + if (flag == CB_RDWR) + VERIFY(pipe_[1].write((char)1)==1); + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } -void -SelectAIO::wait_ready(vector *readable, vector *writable) -{ +void SelectAIO::wait_ready(vector & readable, vector & writable) { + fd_set trfds, twfds; int high; @@ -220,130 +192,89 @@ SelectAIO::wait_ready(vector *readable, vector *writable) int ret = select(high+1, &trfds, &twfds, NULL, NULL); - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - IF_LEVEL(0) LOG("select_loop failure errno " << errno); - VERIFY(0); - } + if (ret < 0 && errno == EINTR) + return; + else if (ret < 0) { + perror("select:"); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); + VERIFY(0); } for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) { char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(pipe_[0].read(tmp)==1); VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } + } else { + if (FD_ISSET(fd, &twfds)) + writable.push_back(fd); + + if (FD_ISSET(fd, &trfds)) + readable.push_back(fd); } } } #ifdef __linux__ -EPollAIO::EPollAIO() -{ - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); -} - -EPollAIO::~EPollAIO() -{ - close(pollfd_); -} +void EPollAIO::watch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; -static inline -int poll_flag_to_event(poll_flag flag) -{ - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; -} - -void -EPollAIO::watch_fd(int fd, poll_flag flag) -{ VERIFY(fd < MAX_POLL_FDS); struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (unsigned)flag; ev.events = EPOLLET; - ev.data.fd = fd; + ev.data.fd = fd_; - if (fdstatus_[fd] & CB_RDONLY) { + if (fdstatus_[fd] & CB_RDONLY) ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { + + if (fdstatus_[fd] & CB_WRONLY) ev.events |= EPOLLOUT; - } - if (flag == CB_RDWR) { + if (flag == CB_RDWR) VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); } -bool -EPollAIO::unwatch_fd(int fd, poll_flag flag) -{ +bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; + VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; + fdstatus_[fd] &= ~(unsigned)flag; struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; ev.events = EPOLLET; - ev.data.fd = fd; + ev.data.fd = fd_; - if (fdstatus_[fd] & CB_RDONLY) { + if (fdstatus_[fd] & CB_RDONLY) ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { + + if (fdstatus_[fd] & CB_WRONLY) ev.events |= EPOLLOUT; - } - if (flag == CB_RDWR) { + if (flag == CB_RDWR) VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); return (op == EPOLL_CTL_DEL); } -bool -EPollAIO::is_watched(int fd, poll_flag flag) -{ - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); -} +void EPollAIO::wait_ready(vector & readable, vector & writable) { -void -EPollAIO::wait_ready(vector *readable, vector *writable) -{ - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1); for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } + if (ready_[i].events & EPOLLIN) + readable.push_back(ready_[i].data.fd); + + if (ready_[i].events & EPOLLOUT) + writable.push_back(ready_[i].data.fd); } }