X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/5fd8cc8409d0efadc07dfe8d6774ad9ff477663d..603bac8fcb3697f283e6537d81b4a92e457ebbad:/rpc/pollmgr.cc?ds=sidebyside diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index f73a3a5..a938284 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,57 +1,51 @@ -#include +#include "types.h" #include #include #include -#include "slock.h" -#include "jsl_log.h" -#include "method_thread.h" -#include "lang/verify.h" #include "pollmgr.h" PollMgr *PollMgr::instance = NULL; -static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT; +static once_flag pollmgr_is_initialized; -void +static void PollMgrInit() { - PollMgr::instance = new PollMgr(); + PollMgr::instance = new PollMgr(); } PollMgr * PollMgr::Instance() { - pthread_once(&pollmgr_is_initialized, PollMgrInit); - return instance; + call_once(pollmgr_is_initialized, PollMgrInit); + return instance; } PollMgr::PollMgr() : pending_change_(false) { - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); - VERIFY(pthread_mutex_init(&m_, NULL) == 0); - VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0); - VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0); + th_ = thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() +PollMgr::~PollMgr() [[noreturn]] { - //never kill me!!! - VERIFY(0); + //never kill me!!! + VERIFY(0); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); - ScopedLock ml(&m_); - aio_->watch_fd(fd, flag); + lock ml(m_); + aio_->watch_fd(fd, flag); - VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); - callbacks_[fd] = ch; + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; } //remove all callbacks related to fd @@ -60,301 +54,297 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) void PollMgr::block_remove_fd(int fd) { - ScopedLock ml(&m_); - aio_->unwatch_fd(fd, CB_RDWR); - pending_change_ = true; - VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0); - callbacks_[fd] = NULL; + lock ml(m_); + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; + changedone_c_.wait(ml); + callbacks_[fd] = NULL; } void PollMgr::del_callback(int fd, poll_flag flag) { - ScopedLock ml(&m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + lock ml(m_); + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } } bool PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) { - ScopedLock ml(&m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; + lock ml(m_); + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; - return aio_->is_watched(fd, flag); + return aio_->is_watched(fd, flag); } void -PollMgr::wait_loop() +PollMgr::wait_loop() [[noreturn]] { - std::vector readable; - std::vector writable; - - while (1) { - { - ScopedLock ml(&m_); - if (pending_change_) { - pending_change_ = false; - VERIFY(pthread_cond_broadcast(&changedone_c_)==0); - } - } - readable.clear(); - writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); - } - - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); - } - } + vector readable; + vector writable; + + while (1) { + { + lock ml(m_); + if (pending_change_) { + pending_change_ = false; + changedone_c_.notify_all(); + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } } SelectAIO::SelectAIO() : highfds_(0) { - FD_ZERO(&rfds_); - FD_ZERO(&wfds_); - - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; - VERIFY(pthread_mutex_init(&m_, NULL) == 0); + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); } SelectAIO::~SelectAIO() { - VERIFY(pthread_mutex_destroy(&m_) == 0); } void SelectAIO::watch_fd(int fd, poll_flag flag) { - ScopedLock ml(&m_); - if (highfds_ <= fd) - highfds_ = fd; - - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { - FD_SET(fd,&rfds_); - FD_SET(fd,&wfds_); - } - - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + lock ml(m_); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); } bool SelectAIO::is_watched(int fd, poll_flag flag) { - ScopedLock ml(&m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } + lock ml(m_); + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } } bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { - ScopedLock ml(&m_); - if (flag == CB_RDONLY) { - FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { - FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); - } - return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); + lock ml(m_); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } void -SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +SelectAIO::wait_ready(vector *readable, vector *writable) { - fd_set trfds, twfds; - int high; - - { - ScopedLock ml(&m_); - trfds = rfds_; - twfds = wfds_; - high = highfds_; - - } - - int ret = select(high+1, &trfds, &twfds, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); - VERIFY(0); - } - } - - for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { - char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); - VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } - } - } + fd_set trfds, twfds; + int high; + + { + lock ml(m_); + trfds = rfds_; + twfds = wfds_; + high = highfds_; + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } } #ifdef __linux__ EPollAIO::EPollAIO() { - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); } EPollAIO::~EPollAIO() { - close(pollfd_); + close(pollfd_); } static inline int poll_flag_to_event(poll_flag flag) { - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; } void EPollAIO::watch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); + VERIFY(fd < MAX_POLL_FDS); - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; - ev.events = EPOLLET; - ev.data.fd = fd; + ev.events = EPOLLET; + ev.data.fd = fd; - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } - if (flag == CB_RDWR) { - VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); } bool EPollAIO::unwatch_fd(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; - - struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; - - ev.events = EPOLLET; - ev.data.fd = fd; - - if (fdstatus_[fd] & CB_RDONLY) { - ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { - ev.events |= EPOLLOUT; - } - - if (flag == CB_RDWR) { - VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); - return (op == EPOLL_CTL_DEL); + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); } bool EPollAIO::is_watched(int fd, poll_flag flag) { - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); } void -EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +EPollAIO::wait_ready(vector *readable, vector *writable) { - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); - for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } - } + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } } #endif