3 #include <sys/select.h>
10 poll_mgr poll_mgr::shared_mgr;
14 virtual void watch_fd(int fd, poll_flag flag) = 0;
15 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
16 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
17 virtual ~wait_manager() throw() {}
20 class SelectAIO : public wait_manager {
24 void watch_fd(int fd, poll_flag flag);
25 bool unwatch_fd(int fd, poll_flag flag);
26 void wait_ready(vector<int> & readable, vector<int> & writable);
36 class EPollAIO : public wait_manager {
39 ~EPollAIO() throw() { }
40 void watch_fd(int fd, poll_flag flag);
41 bool unwatch_fd(int fd, poll_flag flag);
42 void wait_ready(vector<int> & readable, vector<int> & writable);
45 file_t poll_ = epoll_create(MAX_POLL_FDS);
46 struct epoll_event ready_[MAX_POLL_FDS];
47 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
52 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
53 th_ = thread(&poll_mgr::wait_loop, this);
59 for (auto p : callbacks_)
60 aio_->unwatch_fd(p.first, CB_RDWR);
61 pending_change_ = true;
63 changedone_c_.wait(ml);
69 poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
72 aio_->watch_fd(fd, flag);
74 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
78 // Remove all callbacks related to fd. After this returns, we guarantee that
79 // callbacks related to fd will never be called again.
80 void poll_mgr::block_remove_fd(int fd) {
82 aio_->unwatch_fd(fd, CB_RDWR);
83 pending_change_ = true;
84 changedone_c_.wait(ml);
85 callbacks_[fd] = nullptr;
88 void poll_mgr::del_callback(int fd, poll_flag flag) {
90 if (aio_->unwatch_fd(fd, flag))
91 callbacks_[fd] = nullptr;
94 void poll_mgr::wait_loop() {
102 if (pending_change_) {
103 pending_change_ = false;
104 changedone_c_.notify_all();
111 aio_->wait_ready(readable, writable);
113 for (auto fd : readable) {
114 { lock ml(m_); cb = callbacks_[fd]; }
115 if (cb) cb->read_cb(fd);
118 for (auto fd : writable) {
119 { lock ml(m_); cb = callbacks_[fd]; }
120 if (cb) cb->write_cb(fd);
125 SelectAIO::SelectAIO()
132 FD_SET(pipe_[0], &rfds_);
135 pipe_[0].flags() |= O_NONBLOCK;
138 void SelectAIO::watch_fd(int fd, poll_flag flag) {
139 VERIFY(fd < MAX_POLL_FDS);
145 if (flag & CB_RDONLY)
148 if (flag & CB_WRONLY)
151 VERIFY(pipe_[1].write((char)1)==1);
154 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
155 VERIFY(fd < MAX_POLL_FDS);
158 VERIFY((flag & ~CB_RDWR) == 0);
159 if (flag & CB_RDONLY)
161 if (flag & CB_WRONLY)
165 for (int i = 0; i <= highfds_; i++) {
166 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
172 VERIFY(pipe_[1].write((char)1)==1);
174 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
177 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
189 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
191 if (ret < 0 && errno == EINTR)
195 IF_LEVEL(0) LOG("select_loop failure errno " << errno);
199 for (int fd = 0; fd <= high; fd++) {
200 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
202 VERIFY(pipe_[0].read(tmp)==1);
205 if (FD_ISSET(fd, &twfds))
206 writable.push_back(fd);
208 if (FD_ISSET(fd, &trfds))
209 readable.push_back(fd);
216 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
217 size_t fd = (size_t)fd_;
219 VERIFY(fd < MAX_POLL_FDS);
221 struct epoll_event ev;
222 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
223 fdstatus_[fd] |= (unsigned)flag;
228 if (fdstatus_[fd] & CB_RDONLY)
229 ev.events |= EPOLLIN;
231 if (fdstatus_[fd] & CB_WRONLY)
232 ev.events |= EPOLLOUT;
235 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
237 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
240 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
241 size_t fd = (size_t)fd_;
243 VERIFY(fd < MAX_POLL_FDS);
244 fdstatus_[fd] &= ~(unsigned)flag;
246 struct epoll_event ev;
247 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
252 if (fdstatus_[fd] & CB_RDONLY)
253 ev.events |= EPOLLIN;
255 if (fdstatus_[fd] & CB_WRONLY)
256 ev.events |= EPOLLOUT;
259 VERIFY(op == EPOLL_CTL_DEL);
261 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
262 return (op == EPOLL_CTL_DEL);
265 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
267 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
268 for (int i = 0; i < nfds; i++) {
269 if (ready_[i].events & EPOLLIN)
270 readable.push_back(ready_[i].data.fd);
272 if (ready_[i].events & EPOLLOUT)
273 writable.push_back(ready_[i].data.fd);