3 #include <sys/select.h>
10 static PollMgr instance;
12 PollMgr & PollMgr::Instance() { return instance; }
16 virtual void watch_fd(int fd, poll_flag flag) = 0;
17 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
18 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
19 virtual ~wait_manager() throw() {}
22 class SelectAIO : public wait_manager {
26 void watch_fd(int fd, poll_flag flag);
27 bool unwatch_fd(int fd, poll_flag flag);
28 void wait_ready(vector<int> & readable, vector<int> & writable);
38 class EPollAIO : public wait_manager {
41 ~EPollAIO() throw() { }
42 void watch_fd(int fd, poll_flag flag);
43 bool unwatch_fd(int fd, poll_flag flag);
44 void wait_ready(vector<int> & readable, vector<int> & writable);
47 file_t poll_ = epoll_create(MAX_POLL_FDS);
48 struct epoll_event ready_[MAX_POLL_FDS];
49 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
54 PollMgr::PollMgr() : aio_(new SelectAIO()) {
55 th_ = thread(&PollMgr::wait_loop, this);
61 for (auto p : callbacks_)
62 aio_->unwatch_fd(p.first, CB_RDWR);
63 pending_change_ = true;
65 changedone_c_.wait(ml);
71 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
74 aio_->watch_fd(fd, flag);
76 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
80 // Remove all callbacks related to fd. After this returns, we guarantee that
81 // callbacks related to fd will never be called again.
82 void PollMgr::block_remove_fd(int fd) {
84 aio_->unwatch_fd(fd, CB_RDWR);
85 pending_change_ = true;
86 changedone_c_.wait(ml);
87 callbacks_[fd] = nullptr;
90 void PollMgr::del_callback(int fd, poll_flag flag) {
92 if (aio_->unwatch_fd(fd, flag))
93 callbacks_[fd] = nullptr;
96 void PollMgr::wait_loop() {
104 if (pending_change_) {
105 pending_change_ = false;
106 changedone_c_.notify_all();
113 aio_->wait_ready(readable, writable);
115 for (auto fd : readable) {
116 { lock ml(m_); cb = callbacks_[fd]; }
117 if (cb) cb->read_cb(fd);
120 for (auto fd : writable) {
121 { lock ml(m_); cb = callbacks_[fd]; }
122 if (cb) cb->write_cb(fd);
127 SelectAIO::SelectAIO()
134 FD_SET(pipe_[0], &rfds_);
137 pipe_[0].flags() |= O_NONBLOCK;
140 void SelectAIO::watch_fd(int fd, poll_flag flag) {
141 VERIFY(fd < MAX_POLL_FDS);
147 if (flag & CB_RDONLY)
150 if (flag & CB_WRONLY)
153 VERIFY(pipe_[1].write((char)1)==1);
156 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
157 VERIFY(fd < MAX_POLL_FDS);
160 VERIFY((flag & ~CB_RDWR) == 0);
161 if (flag & CB_RDONLY)
163 if (flag & CB_WRONLY)
167 for (int i = 0; i <= highfds_; i++) {
168 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
174 VERIFY(pipe_[1].write((char)1)==1);
176 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
179 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
191 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
193 if (ret < 0 && errno == EINTR)
197 IF_LEVEL(0) LOG("select_loop failure errno " << errno);
201 for (int fd = 0; fd <= high; fd++) {
202 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
204 VERIFY(pipe_[0].read(tmp)==1);
207 if (FD_ISSET(fd, &twfds))
208 writable.push_back(fd);
210 if (FD_ISSET(fd, &trfds))
211 readable.push_back(fd);
218 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
219 size_t fd = (size_t)fd_;
221 VERIFY(fd < MAX_POLL_FDS);
223 struct epoll_event ev;
224 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
225 fdstatus_[fd] |= (unsigned)flag;
230 if (fdstatus_[fd] & CB_RDONLY)
231 ev.events |= EPOLLIN;
233 if (fdstatus_[fd] & CB_WRONLY)
234 ev.events |= EPOLLOUT;
237 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
239 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
242 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
243 size_t fd = (size_t)fd_;
245 VERIFY(fd < MAX_POLL_FDS);
246 fdstatus_[fd] &= ~(unsigned)flag;
248 struct epoll_event ev;
249 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
254 if (fdstatus_[fd] & CB_RDONLY)
255 ev.events |= EPOLLIN;
257 if (fdstatus_[fd] & CB_WRONLY)
258 ev.events |= EPOLLOUT;
261 VERIFY(op == EPOLL_CTL_DEL);
263 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
264 return (op == EPOLL_CTL_DEL);
267 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
269 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
270 for (int i = 0; i < nfds; i++) {
271 if (ready_[i].events & EPOLLIN)
272 readable.push_back(ready_[i].data.fd);
274 if (ready_[i].events & EPOLLOUT)
275 writable.push_back(ready_[i].data.fd);