Removed explicit reference counting in handle.cc
[invirt/third/libt4.git] / rpc / pollmgr.cc
index 023a7aa..4acff93 100644 (file)
 #include "types.h"
 #include <errno.h>
 #include "types.h"
 #include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
 
 
-#include "jsl_log.h"
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
 
-PollMgr *PollMgr::instance = NULL;
-static std::once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
 
 
-static void
-PollMgrInit()
-{
-       PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+    public:
+        virtual void watch_fd(int fd, poll_flag flag) = 0;
+        virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+        virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+        virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+    public :
+        SelectAIO();
+        ~SelectAIO() {}
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        fd_set rfds_, wfds_;
+        int highfds_;
+        file_t pipe_[2];
+        mutex m_;
+};
 
 
-PollMgr *
-PollMgr::Instance()
-{
-    std::call_once(pollmgr_is_initialized, PollMgrInit);
-       return instance;
-}
+#ifdef __linux__ 
+class EPollAIO : public wait_manager {
+    public:
+        EPollAIO() {}
+        ~EPollAIO() throw() { }
+        void watch_fd(int fd, poll_flag flag);
+        bool unwatch_fd(int fd, poll_flag flag);
+        void wait_ready(vector<int> & readable, vector<int> & writable);
+
+    private:
+        file_t poll_ = epoll_create(MAX_POLL_FDS);
+        struct epoll_event ready_[MAX_POLL_FDS];
+        vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
 
 
-PollMgr::PollMgr() : pending_change_(false)
-{
-       bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
-       aio_ = new SelectAIO();
-       //aio_ = new EPollAIO();
 
 
-    th_ = std::thread(&PollMgr::wait_loop, this);
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
+    th_ = thread(&PollMgr::wait_loop, this);
 }
 
 }
 
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
 {
 {
-       //never kill me!!!
-       VERIFY(0);
+    lock ml(m_);
+    for (auto p : callbacks_)
+        aio_->unwatch_fd(p.first, CB_RDWR);
+    pending_change_ = true;
+    shutdown_ = true;
+    changedone_c_.wait(ml);
+    delete aio_;
+    th_.join();
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
 }
 
 void
 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
-       VERIFY(fd < MAX_POLL_FDS);
-
     lock ml(m_);
     lock ml(m_);
-       aio_->watch_fd(fd, flag);
+    aio_->watch_fd(fd, flag);
 
 
-       VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
-       callbacks_[fd] = ch;
+    VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
+    callbacks_[fd] = ch;
 }
 
 }
 
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd.  After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
     lock ml(m_);
     lock ml(m_);
-       aio_->unwatch_fd(fd, CB_RDWR);
-       pending_change_ = true;
+    aio_->unwatch_fd(fd, CB_RDWR);
+    pending_change_ = true;
     changedone_c_.wait(ml);
     changedone_c_.wait(ml);
-       callbacks_[fd] = NULL;
+    callbacks_[fd] = nullptr;
 }
 
 }
 
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
     lock ml(m_);
     lock ml(m_);
-       if (aio_->unwatch_fd(fd, flag)) {
-               callbacks_[fd] = NULL;
-       }
+    if (aio_->unwatch_fd(fd, flag))
+        callbacks_[fd] = nullptr;
 }
 
 }
 
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
-    lock ml(m_);
-       if (!callbacks_[fd] || callbacks_[fd]!=c)
-               return false;
-
-       return aio_->is_watched(fd, flag);
-}
+void PollMgr::wait_loop() {
+    vector<int> readable;
+    vector<int> writable;
+    aio_callback * cb;
 
 
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
-       std::vector<int> readable;
-       std::vector<int> writable;
-
-       while (1) {
-               {
+    while (1) {
+        {
             lock ml(m_);
             lock ml(m_);
-                       if (pending_change_) {
-                               pending_change_ = false;
+            if (pending_change_) {
+                pending_change_ = false;
                 changedone_c_.notify_all();
                 changedone_c_.notify_all();
-                       }
-               }
-               readable.clear();
-               writable.clear();
-               aio_->wait_ready(&readable,&writable);
-
-               if (!readable.size() && !writable.size()) {
-                       continue;
-               } 
-               //no locking of m_
-               //because no add_callback() and del_callback should 
-               //modify callbacks_[fd] while the fd is not dead
-               for (unsigned int i = 0; i < readable.size(); i++) {
-                       int fd = readable[i];
-                       if (callbacks_[fd])
-                               callbacks_[fd]->read_cb(fd);
-               }
-
-               for (unsigned int i = 0; i < writable.size(); i++) {
-                       int fd = writable[i];
-                       if (callbacks_[fd])
-                               callbacks_[fd]->write_cb(fd);
-               }
-       }
+                if (shutdown_)
+                    break;
+            }
+        }
+        readable.clear();
+        writable.clear();
+        aio_->wait_ready(readable, writable);
+
+        for (auto fd : readable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->read_cb(fd);
+        }
+
+        for (auto fd : writable) {
+            { lock ml(m_); cb = callbacks_[fd]; }
+            if (cb) cb->write_cb(fd);
+        }
+    }
 }
 
 }
 
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
 {
 {
-       FD_ZERO(&rfds_);
-       FD_ZERO(&wfds_);
+    FD_ZERO(&rfds_);
+    FD_ZERO(&wfds_);
 
 
-       VERIFY(pipe(pipefd_) == 0);
-       FD_SET(pipefd_[0], &rfds_);
-       highfds_ = pipefd_[0];
+    file_t::pipe(pipe_);
 
 
-       int flags = fcntl(pipefd_[0], F_GETFL, NULL);
-       flags |= O_NONBLOCK;
-       fcntl(pipefd_[0], F_SETFL, flags);
-}
+    FD_SET(pipe_[0], &rfds_);
+    highfds_ = pipe_[0];
 
 
-SelectAIO::~SelectAIO()
-{
+    pipe_[0].flags() |= O_NONBLOCK;
 }
 
 }
 
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
-    lock ml(m_);
-       if (highfds_ <= fd) 
-               highfds_ = fd;
-
-       if (flag == CB_RDONLY) {
-               FD_SET(fd,&rfds_);
-       }else if (flag == CB_WRONLY) {
-               FD_SET(fd,&wfds_);
-       }else {
-               FD_SET(fd,&rfds_);
-               FD_SET(fd,&wfds_);
-       }
-
-       char tmp = 1;
-       VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
-}
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
 
 
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
     lock ml(m_);
     lock ml(m_);
-       if (flag == CB_RDONLY) {
-               return FD_ISSET(fd,&rfds_);
-       }else if (flag == CB_WRONLY) {
-               return FD_ISSET(fd,&wfds_);
-       }else{
-               return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
-       }
+    if (highfds_ <= fd) 
+        highfds_ = fd;
+
+    if (flag & CB_RDONLY)
+        FD_SET(fd,&rfds_);
+
+    if (flag & CB_WRONLY)
+        FD_SET(fd,&wfds_);
+
+    VERIFY(pipe_[1].write((char)1)==1);
 }
 
 }
 
-bool 
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+    VERIFY(fd < MAX_POLL_FDS);
+
     lock ml(m_);
     lock ml(m_);
-       if (flag == CB_RDONLY) {
-               FD_CLR(fd, &rfds_);
-       }else if (flag == CB_WRONLY) {
-               FD_CLR(fd, &wfds_);
-       }else if (flag == CB_RDWR) {
-               FD_CLR(fd, &wfds_);
-               FD_CLR(fd, &rfds_);
-       }else{
-               VERIFY(0);
-       }
-
-       if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
-               if (fd == highfds_) {
-                       int newh = pipefd_[0];
-                       for (int i = 0; i <= highfds_; i++) {
-                               if (FD_ISSET(i, &rfds_)) {
-                                       newh = i;
-                               }else if (FD_ISSET(i, &wfds_)) {
-                                       newh = i;
-                               }
-                       }
-                       highfds_ = newh;
-               }
-       }
-       if (flag == CB_RDWR) {
-               char tmp = 1;
-               VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
-       }
-       return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
+    VERIFY((flag & ~CB_RDWR) == 0);
+    if (flag & CB_RDONLY)
+        FD_CLR(fd, &rfds_);
+    if (flag & CB_WRONLY)
+        FD_CLR(fd, &wfds_);
+
+    int newh = pipe_[0];
+    for (int i = 0; i <= highfds_; i++) {
+        if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+            newh = i;
+    }
+    highfds_ = newh;
+
+    if (flag == CB_RDWR)
+        VERIFY(pipe_[1].write((char)1)==1);
+
+    return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
 }
 
 }
 
-void
-SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
-{
-       fd_set trfds, twfds;
-       int high;
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
+    fd_set trfds, twfds;
+    int high;
 
 
-       {
+    {
         lock ml(m_);
         lock ml(m_);
-               trfds = rfds_;
-               twfds = wfds_;
-               high = highfds_;
-       }
-
-       int ret = select(high+1, &trfds, &twfds, NULL, NULL);
-
-       if (ret < 0) {
-               if (errno == EINTR) {
-                       return;
-               } else {
-                       perror("select:");
-                       jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
-                       VERIFY(0);
-               }
-       }
-
-       for (int fd = 0; fd <= high; fd++) {
-               if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
-                       char tmp;
-                       VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
-                       VERIFY(tmp==1);
-               }else {
-                       if (FD_ISSET(fd, &twfds)) {
-                               writable->push_back(fd);
-                       }
-                       if (FD_ISSET(fd, &trfds)) {
-                               readable->push_back(fd);
-                       }
-               }
-       }
+        trfds = rfds_;
+        twfds = wfds_;
+        high = highfds_;
+    }
+
+    int ret = select(high+1, &trfds, &twfds, NULL, NULL);
+
+    if (ret < 0 && errno == EINTR)
+        return;
+    else if (ret < 0) {
+        perror("select:");
+        IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+        VERIFY(0);
+    }
+
+    for (int fd = 0; fd <= high; fd++) {
+        if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
+            char tmp;
+            VERIFY(pipe_[0].read(tmp)==1);
+            VERIFY(tmp==1);
+        } else {
+            if (FD_ISSET(fd, &twfds))
+                writable.push_back(fd);
+
+            if (FD_ISSET(fd, &trfds))
+                readable.push_back(fd);
+        }
+    }
 }
 
 #ifdef __linux__ 
 
 }
 
 #ifdef __linux__ 
 
-EPollAIO::EPollAIO()
-{
-       pollfd_ = epoll_create(MAX_POLL_FDS);
-       VERIFY(pollfd_ >= 0);
-       bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
 
 
-EPollAIO::~EPollAIO()
-{
-       close(pollfd_);
-}
+    VERIFY(fd < MAX_POLL_FDS);
 
 
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
-       int f;
-       if (flag == CB_RDONLY) {
-               f = EPOLLIN;
-       }else if (flag == CB_WRONLY) {
-               f = EPOLLOUT;
-       }else { //flag == CB_RDWR
-               f = EPOLLIN | EPOLLOUT;
-       }
-       return f;
+    struct epoll_event ev;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+    fdstatus_[fd] |= (unsigned)flag;
+
+    ev.events = EPOLLET;
+    ev.data.fd = fd_;
+
+    if (fdstatus_[fd] & CB_RDONLY)
+        ev.events |= EPOLLIN;
+
+    if (fdstatus_[fd] & CB_WRONLY)
+        ev.events |= EPOLLOUT;
+
+    if (flag == CB_RDWR)
+        VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
+
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
 }
 
 }
 
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
-       VERIFY(fd < MAX_POLL_FDS);
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+    size_t fd = (size_t)fd_;
 
 
-       struct epoll_event ev;
-       int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-       fdstatus_[fd] |= (int)flag;
+    VERIFY(fd < MAX_POLL_FDS);
+    fdstatus_[fd] &= ~(unsigned)flag;
 
 
-       ev.events = EPOLLET;
-       ev.data.fd = fd;
+    struct epoll_event ev;
+    int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 
 
-       if (fdstatus_[fd] & CB_RDONLY) {
-               ev.events |= EPOLLIN;
-       }
-       if (fdstatus_[fd] & CB_WRONLY) {
-               ev.events |= EPOLLOUT;
-       }
+    ev.events = EPOLLET;
+    ev.data.fd = fd_;
 
 
-       if (flag == CB_RDWR) {
-               VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
-       }
+    if (fdstatus_[fd] & CB_RDONLY)
+        ev.events |= EPOLLIN;
 
 
-       VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
-}
+    if (fdstatus_[fd] & CB_WRONLY)
+        ev.events |= EPOLLOUT;
 
 
-bool 
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
-       VERIFY(fd < MAX_POLL_FDS);
-       fdstatus_[fd] &= ~(int)flag;
-
-       struct epoll_event ev;
-       int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
-
-       ev.events = EPOLLET;
-       ev.data.fd = fd;
-
-       if (fdstatus_[fd] & CB_RDONLY) {
-               ev.events |= EPOLLIN;
-       }
-       if (fdstatus_[fd] & CB_WRONLY) {
-               ev.events |= EPOLLOUT;
-       }
-
-       if (flag == CB_RDWR) {
-               VERIFY(op == EPOLL_CTL_DEL);
-       }
-       VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
-       return (op == EPOLL_CTL_DEL);
-}
+    if (flag == CB_RDWR)
+        VERIFY(op == EPOLL_CTL_DEL);
 
 
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
-       VERIFY(fd < MAX_POLL_FDS);
-       return ((fdstatus_[fd] & CB_MASK) == flag);
+    VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
+    return (op == EPOLL_CTL_DEL);
 }
 
 }
 
-void
-EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
-{
-       int nfds = epoll_wait(pollfd_, ready_,  MAX_POLL_FDS, -1);
-       for (int i = 0; i < nfds; i++) {
-               if (ready_[i].events & EPOLLIN) {
-                       readable->push_back(ready_[i].data.fd);
-               }
-               if (ready_[i].events & EPOLLOUT) {
-                       writable->push_back(ready_[i].data.fd);
-               }
-       }
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
+    int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
+    for (int i = 0; i < nfds; i++) {
+        if (ready_[i].events & EPOLLIN)
+            readable.push_back(ready_[i].data.fd);
+
+        if (ready_[i].events & EPOLLOUT)
+            writable.push_back(ready_[i].data.fd);
+    }
 }
 
 #endif
 }
 
 #endif