X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/a4175b2e216a20b86cc872dea8a08005c60617a5..16e7c282c6fcec8189425bd15ec9e8a4a0ee857d:/rpc/pollmgr.cc diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 33aeae2..a938284 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,53 +1,51 @@ +#include "types.h" #include #include #include -#include "jsl_log.h" -#include "lang/verify.h" #include "pollmgr.h" -#include "lock.h" PollMgr *PollMgr::instance = NULL; -static std::once_flag pollmgr_is_initialized; +static once_flag pollmgr_is_initialized; -void +static void PollMgrInit() { - PollMgr::instance = new PollMgr(); + PollMgr::instance = new PollMgr(); } PollMgr * PollMgr::Instance() { - std::call_once(pollmgr_is_initialized, PollMgrInit); - return instance; + call_once(pollmgr_is_initialized, PollMgrInit); + return instance; } PollMgr::PollMgr() : pending_change_(false) { - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); - th_ = std::thread(&PollMgr::wait_loop, this); + th_ = thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() +PollMgr::~PollMgr() [[noreturn]] { - //never kill me!!! - VERIFY(0); + //never kill me!!! + VERIFY(0); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); lock ml(m_); - aio_->watch_fd(fd, flag); + aio_->watch_fd(fd, flag); - VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); - callbacks_[fd] = ch; + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; } //remove all callbacks related to fd @@ -57,82 +55,82 @@ void PollMgr::block_remove_fd(int fd) { lock ml(m_); - aio_->unwatch_fd(fd, CB_RDWR); - pending_change_ = true; + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; changedone_c_.wait(ml); - callbacks_[fd] = NULL; + callbacks_[fd] = NULL; } void PollMgr::del_callback(int fd, poll_flag flag) { lock ml(m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } } bool PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) { lock ml(m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; - return aio_->is_watched(fd, flag); + return aio_->is_watched(fd, flag); } void -PollMgr::wait_loop() +PollMgr::wait_loop() [[noreturn]] { - std::vector readable; - std::vector writable; + vector readable; + vector writable; - while (1) { - { + while (1) { + { lock ml(m_); - if (pending_change_) { - pending_change_ = false; + if (pending_change_) { + pending_change_ = false; changedone_c_.notify_all(); - } - } - readable.clear(); - writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); - } - - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); - } - } + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } } SelectAIO::SelectAIO() : highfds_(0) { - FD_ZERO(&rfds_); - FD_ZERO(&wfds_); + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); } SelectAIO::~SelectAIO() @@ -143,210 +141,210 @@ void SelectAIO::watch_fd(int fd, poll_flag flag) { lock ml(m_); - if (highfds_ <= fd) - highfds_ = fd; - - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { - FD_SET(fd,&rfds_); - FD_SET(fd,&wfds_); - } - - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); } bool SelectAIO::is_watched(int fd, poll_flag flag) { lock ml(m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } } bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { lock ml(m_); - if (flag == CB_RDONLY) { - FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { - FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); - } - return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } void -SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +SelectAIO::wait_ready(vector *readable, vector *writable) { - fd_set trfds, twfds; - int high; + fd_set trfds, twfds; + int high; - { + { lock ml(m_); - trfds = rfds_; - twfds = wfds_; - high = highfds_; - } - - int ret = select(high+1, &trfds, &twfds, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); - VERIFY(0); - } - } - - for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { - char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); - VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } - } - } + trfds = rfds_; + twfds = wfds_; + high = highfds_; + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } } #ifdef __linux__ EPollAIO::EPollAIO() { - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); } EPollAIO::~EPollAIO() { - close(pollfd_); + close(pollfd_); } static inline int poll_flag_to_event(poll_flag flag) { - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; } void EPollAIO::watch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; - ev.events = EPOLLET; - ev.data.fd = fd; + ev.events = EPOLLET; + ev.data.fd = fd; - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } - if (flag == CB_RDWR) { - VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); } bool EPollAIO::unwatch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; - - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; - - ev.events = EPOLLET; - ev.data.fd = fd; - - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } - - if (flag == CB_RDWR) { - VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); - return (op == EPOLL_CTL_DEL); + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); } bool EPollAIO::is_watched(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); } void -EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +EPollAIO::wait_ready(vector *readable, vector *writable) { - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); - for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } - } + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } } #endif