3 #include <sys/select.h>
12 static PollMgr instance;
14 PollMgr & PollMgr::Instance() { return instance; }
18 virtual void watch_fd(int fd, poll_flag flag) = 0;
19 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21 virtual ~wait_manager() throw() {}
24 class SelectAIO : public wait_manager {
28 void watch_fd(int fd, poll_flag flag);
29 bool unwatch_fd(int fd, poll_flag flag);
30 void wait_ready(vector<int> & readable, vector<int> & writable);
40 class EPollAIO : public wait_manager {
43 ~EPollAIO() throw() { }
44 void watch_fd(int fd, poll_flag flag);
45 bool unwatch_fd(int fd, poll_flag flag);
46 void wait_ready(vector<int> & readable, vector<int> & writable);
49 file_t poll_ = epoll_create(MAX_POLL_FDS);
50 struct epoll_event ready_[MAX_POLL_FDS];
51 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
56 PollMgr::PollMgr() : aio_(new SelectAIO()) {
57 th_ = thread(&PollMgr::wait_loop, this);
63 for (auto p : callbacks_)
64 aio_->unwatch_fd(p.first, CB_RDWR);
65 pending_change_ = true;
67 changedone_c_.wait(ml);
73 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
76 aio_->watch_fd(fd, flag);
78 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
82 // Remove all callbacks related to fd. After this returns, we guarantee that
83 // callbacks related to fd will never be called again.
84 void PollMgr::block_remove_fd(int fd) {
86 aio_->unwatch_fd(fd, CB_RDWR);
87 pending_change_ = true;
88 changedone_c_.wait(ml);
89 callbacks_[fd] = nullptr;
92 void PollMgr::del_callback(int fd, poll_flag flag) {
94 if (aio_->unwatch_fd(fd, flag))
95 callbacks_[fd] = nullptr;
98 void PollMgr::wait_loop() {
100 vector<int> writable;
106 if (pending_change_) {
107 pending_change_ = false;
108 changedone_c_.notify_all();
115 aio_->wait_ready(readable, writable);
117 for (auto fd : readable) {
118 { lock ml(m_); cb = callbacks_[fd]; }
119 if (cb) cb->read_cb(fd);
122 for (auto fd : writable) {
123 { lock ml(m_); cb = callbacks_[fd]; }
124 if (cb) cb->write_cb(fd);
129 SelectAIO::SelectAIO()
136 FD_SET(pipe_[0], &rfds_);
139 pipe_[0].flags() |= O_NONBLOCK;
142 void SelectAIO::watch_fd(int fd, poll_flag flag) {
143 VERIFY(fd < MAX_POLL_FDS);
149 if (flag & CB_RDONLY)
152 if (flag & CB_WRONLY)
155 VERIFY(pipe_[1].write((char)1)==1);
158 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
159 VERIFY(fd < MAX_POLL_FDS);
162 VERIFY((flag & ~CB_RDWR) == 0);
163 if (flag & CB_RDONLY)
165 if (flag & CB_WRONLY)
169 for (int i = 0; i <= highfds_; i++) {
170 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
176 VERIFY(pipe_[1].write((char)1)==1);
178 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
181 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
193 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
195 if (ret < 0 && errno == EINTR)
199 IF_LEVEL(0) LOG("select_loop failure errno " << errno);
203 for (int fd = 0; fd <= high; fd++) {
204 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
206 VERIFY(pipe_[0].read(tmp)==1);
209 if (FD_ISSET(fd, &twfds))
210 writable.push_back(fd);
212 if (FD_ISSET(fd, &trfds))
213 readable.push_back(fd);
220 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
221 size_t fd = (size_t)fd_;
223 VERIFY(fd < MAX_POLL_FDS);
225 struct epoll_event ev;
226 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
227 fdstatus_[fd] |= (unsigned)flag;
232 if (fdstatus_[fd] & CB_RDONLY)
233 ev.events |= EPOLLIN;
235 if (fdstatus_[fd] & CB_WRONLY)
236 ev.events |= EPOLLOUT;
239 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
241 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
244 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
245 size_t fd = (size_t)fd_;
247 VERIFY(fd < MAX_POLL_FDS);
248 fdstatus_[fd] &= ~(unsigned)flag;
250 struct epoll_event ev;
251 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
256 if (fdstatus_[fd] & CB_RDONLY)
257 ev.events |= EPOLLIN;
259 if (fdstatus_[fd] & CB_WRONLY)
260 ev.events |= EPOLLOUT;
263 VERIFY(op == EPOLL_CTL_DEL);
265 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
266 return (op == EPOLL_CTL_DEL);
269 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
271 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
272 for (int i = 0; i < nfds; i++) {
273 if (ready_[i].events & EPOLLIN)
274 readable.push_back(ready_[i].data.fd);
276 if (ready_[i].events & EPOLLOUT)
277 writable.push_back(ready_[i].data.fd);