Lots more clean-ups
[invirt/third/libt4.git] / rpc / pollmgr.cc
index a938284..4acff93 100644 (file)
@@ -1,46 +1,77 @@
 #include "types.h"
 #include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
 
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
-PollMgr *PollMgr::instance = NULL;
-static once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
 
-static void
-PollMgrInit()
-{
-    PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+    public:
+        virtual void watch_fd(int fd, poll_flag flag) = 0;
+        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+        virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+        virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+    public :
+        SelectAIO();
+        ~SelectAIO() {}
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        fd_set rfds_, wfds_;
+        int highfds_;
+        file_t pipe_[2];
+        mutex m_;
+};
 
-PollMgr *
-PollMgr::Instance()
-{
-    call_once(pollmgr_is_initialized, PollMgrInit);
-    return instance;
-}
+#ifdef __linux__ 
+class EPollAIO : public wait_manager {
+    public:
+        EPollAIO() {}
+        ~EPollAIO() throw() { }
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        file_t poll_ = epoll_create(MAX_POLL_FDS);
+        struct epoll_event ready_[MAX_POLL_FDS];
+        vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
 
-PollMgr::PollMgr() : pending_change_(false)
-{
-    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
-    aio_ = new SelectAIO();
-    //aio_ = new EPollAIO();
 
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
     th_ = thread(&PollMgr::wait_loop, this);
 }
 
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
 {
-    //never kill me!!!
-    VERIFY(0);
+    lock ml(m_);
+    for (auto p : callbacks_)
+        aio_->unwatch_fd(p.first, CB_RDWR);
+    pending_change_ = true;
+    shutdown_ = true;
+    changedone_c_.wait(ml);
+    delete aio_;
+    th_.join();
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
-    VERIFY(fd < MAX_POLL_FDS);
-
     lock ml(m_);
     aio_->watch_fd(fd, flag);
 
@@ -48,44 +79,26 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
     callbacks_[fd] = ch;
 }
 
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd.  After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
     lock ml(m_);
     aio_->unwatch_fd(fd, CB_RDWR);
     pending_change_ = true;
     changedone_c_.wait(ml);
-    callbacks_[fd] = NULL;
+    callbacks_[fd] = nullptr;
 }
 
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
     lock ml(m_);
-    if (aio_->unwatch_fd(fd, flag)) {
-        callbacks_[fd] = NULL;
-    }
+    if (aio_->unwatch_fd(fd, flag))
+        callbacks_[fd] = nullptr;
 }
 
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
-    lock ml(m_);
-    if (!callbacks_[fd] || callbacks_[fd]!=c)
-        return false;
-
-    return aio_->is_watched(fd, flag);
-}
-
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
+void PollMgr::wait_loop() {
     vector<int> readable;
     vector<int> writable;
+    aio_callback * cb;
 
     while (1) {
         {
@@ -93,121 +106,80 @@ PollMgr::wait_loop() [[noreturn]]
             if (pending_change_) {
                 pending_change_ = false;
                 changedone_c_.notify_all();
+                if (shutdown_)
+                    break;
             }
         }
         readable.clear();
         writable.clear();
-        aio_->wait_ready(&readable,&writable);
-
-        if (!readable.size() && !writable.size()) {
-            continue;
-        } 
-        //no locking of m_
-        //because no add_callback() and del_callback should 
-        //modify callbacks_[fd] while the fd is not dead
-        for (unsigned int i = 0; i < readable.size(); i++) {
-            int fd = readable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->read_cb(fd);
+        aio_->wait_ready(readable, writable);
+
+        for (auto fd : readable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->read_cb(fd);
         }
 
-        for (unsigned int i = 0; i < writable.size(); i++) {
-            int fd = writable[i];
-            if (callbacks_[fd])
-                callbacks_[fd]->write_cb(fd);
+        for (auto fd : writable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->write_cb(fd);
         }
     }
 }
 
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
 {
     FD_ZERO(&rfds_);
     FD_ZERO(&wfds_);
 
-    VERIFY(pipe(pipefd_) == 0);
-    FD_SET(pipefd_[0], &rfds_);
-    highfds_ = pipefd_[0];
+    file_t::pipe(pipe_);
 
-    int flags = fcntl(pipefd_[0], F_GETFL, NULL);
-    flags |= O_NONBLOCK;
-    fcntl(pipefd_[0], F_SETFL, flags);
-}
+    FD_SET(pipe_[0], &rfds_);
+    highfds_ = pipe_[0];
 
-SelectAIO::~SelectAIO()
-{
+    pipe_[0].flags() |= O_NONBLOCK;
 }
 
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
+
     lock ml(m_);
     if (highfds_ <= fd) 
         highfds_ = fd;
 
-    if (flag == CB_RDONLY) {
-        FD_SET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        FD_SET(fd,&wfds_);
-    }else {
+    if (flag & CB_RDONLY)
         FD_SET(fd,&rfds_);
+
+    if (flag & CB_WRONLY)
         FD_SET(fd,&wfds_);
-    }
 
-    char tmp = 1;
-    VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    VERIFY(pipe_[1].write((char)1)==1);
 }
 
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
-    lock ml(m_);
-    if (flag == CB_RDONLY) {
-        return FD_ISSET(fd,&rfds_);
-    }else if (flag == CB_WRONLY) {
-        return FD_ISSET(fd,&wfds_);
-    }else{
-        return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
-    }
-}
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
 
-bool 
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
     lock ml(m_);
-    if (flag == CB_RDONLY) {
+    VERIFY((flag & ~CB_RDWR) == 0);
+    if (flag & CB_RDONLY)
         FD_CLR(fd, &rfds_);
-    }else if (flag == CB_WRONLY) {
+    if (flag & CB_WRONLY)
         FD_CLR(fd, &wfds_);
-    }else if (flag == CB_RDWR) {
-        FD_CLR(fd, &wfds_);
-        FD_CLR(fd, &rfds_);
-    }else{
-        VERIFY(0);
-    }
 
-    if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
-        if (fd == highfds_) {
-            int newh = pipefd_[0];
-            for (int i = 0; i <= highfds_; i++) {
-                if (FD_ISSET(i, &rfds_)) {
-                    newh = i;
-                }else if (FD_ISSET(i, &wfds_)) {
-                    newh = i;
-                }
-            }
-            highfds_ = newh;
-        }
-    }
-    if (flag == CB_RDWR) {
-        char tmp = 1;
-        VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+    int newh = pipe_[0];
+    for (int i = 0; i <= highfds_; i++) {
+        if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+            newh = i;
     }
+    highfds_ = newh;
+
+    if (flag == CB_RDWR)
+        VERIFY(pipe_[1].write((char)1)==1);
+
     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
-void
-SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
     fd_set trfds, twfds;
     int high;
 
@@ -220,130 +192,89 @@ SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
 
     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
 
-    if (ret < 0) {
-        if (errno == EINTR) {
-            return;
-        } else {
-            perror("select:");
-            IF_LEVEL(0) LOG("select_loop failure errno " << errno);
-            VERIFY(0);
-        }
+    if (ret < 0 && errno == EINTR)
+        return;
+    else if (ret < 0) {
+        perror("select:");
+        IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+        VERIFY(0);
     }
 
     for (int fd = 0; fd <= high; fd++) {
-        if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+        if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
             char tmp;
-            VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+            VERIFY(pipe_[0].read(tmp)==1);
             VERIFY(tmp==1);
-        }else {
-            if (FD_ISSET(fd, &twfds)) {
-                writable->push_back(fd);
-            }
-            if (FD_ISSET(fd, &trfds)) {
-                readable->push_back(fd);
-            }
+        } else {
+            if (FD_ISSET(fd, &twfds))
+                writable.push_back(fd);
+
+            if (FD_ISSET(fd, &trfds))
+                readable.push_back(fd);
         }
     }
 }
 
 #ifdef __linux__ 
 
-EPollAIO::EPollAIO()
-{
-    pollfd_ = epoll_create(MAX_POLL_FDS);
-    VERIFY(pollfd_ >= 0);
-    bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
-
-EPollAIO::~EPollAIO()
-{
-    close(pollfd_);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
 
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
-    int f;
-    if (flag == CB_RDONLY) {
-        f = EPOLLIN;
-    }else if (flag == CB_WRONLY) {
-        f = EPOLLOUT;
-    }else { //flag == CB_RDWR
-        f = EPOLLIN | EPOLLOUT;
-    }
-    return f;
-}
-
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
     VERIFY(fd < MAX_POLL_FDS);
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-    fdstatus_[fd] |= (int)flag;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+    fdstatus_[fd] |= (unsigned)flag;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
-    }
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
-    }
 
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
 }
 
-bool 
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
+
     VERIFY(fd < MAX_POLL_FDS);
-    fdstatus_[fd] &= ~(int)flag;
+    fdstatus_[fd] &= ~(unsigned)flag;
 
     struct epoll_event ev;
-    int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 
     ev.events = EPOLLET;
-    ev.data.fd = fd;
+    ev.data.fd = fd_;
 
-    if (fdstatus_[fd] & CB_RDONLY) {
+    if (fdstatus_[fd] & CB_RDONLY)
         ev.events |= EPOLLIN;
-    }
-    if (fdstatus_[fd] & CB_WRONLY) {
+
+    if (fdstatus_[fd] & CB_WRONLY)
         ev.events |= EPOLLOUT;
-    }
 
-    if (flag == CB_RDWR) {
+    if (flag == CB_RDWR)
         VERIFY(op == EPOLL_CTL_DEL);
-    }
-    VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
     return (op == EPOLL_CTL_DEL);
 }
 
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
-    VERIFY(fd < MAX_POLL_FDS);
-    return ((fdstatus_[fd] & CB_MASK) == flag);
-}
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
 
-void
-EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
-    int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+    int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
     for (int i = 0; i < nfds; i++) {
-        if (ready_[i].events & EPOLLIN) {
-            readable->push_back(ready_[i].data.fd);
-        }
-        if (ready_[i].events & EPOLLOUT) {
-            writable->push_back(ready_[i].data.fd);
-        }
+        if (ready_[i].events & EPOLLIN)
+            readable.push_back(ready_[i].data.fd);
+
+        if (ready_[i].events & EPOLLOUT)
+            writable.push_back(ready_[i].data.fd);
     }
 }