3 #include <sys/select.h>
12 aio_callback::~aio_callback() {}
14 poll_mgr poll_mgr::shared_mgr;
18 virtual void watch_fd(int fd, poll_flag flag) = 0;
19 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21 virtual ~wait_manager() noexcept;
24 wait_manager::~wait_manager() noexcept {}
26 class SelectAIO : public wait_manager {
30 void watch_fd(int fd, poll_flag flag);
31 bool unwatch_fd(int fd, poll_flag flag);
32 void wait_ready(vector<int> & readable, vector<int> & writable);
42 class EPollAIO : public wait_manager {
45 ~EPollAIO() throw() { }
46 void watch_fd(int fd, poll_flag flag);
47 bool unwatch_fd(int fd, poll_flag flag);
48 void wait_ready(vector<int> & readable, vector<int> & writable);
51 file_t poll_ = epoll_create(MAX_POLL_FDS);
52 struct epoll_event ready_[MAX_POLL_FDS];
53 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
58 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
59 th_ = thread(&poll_mgr::wait_loop, this);
65 for (auto p : callbacks_)
66 aio_->unwatch_fd(p.first, CB_RDWR);
67 pending_change_ = true;
69 changedone_c_.wait(ml);
75 poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
78 aio_->watch_fd(fd, flag);
80 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
84 // Remove all callbacks related to fd. After this returns, we guarantee that
85 // callbacks related to fd will never be called again.
86 void poll_mgr::block_remove_fd(int fd) {
88 aio_->unwatch_fd(fd, CB_RDWR);
89 pending_change_ = true;
90 changedone_c_.wait(ml);
91 callbacks_[fd] = nullptr;
94 void poll_mgr::del_callback(int fd, poll_flag flag) {
96 if (aio_->unwatch_fd(fd, flag))
97 callbacks_[fd] = nullptr;
100 void poll_mgr::wait_loop() {
101 vector<int> readable;
102 vector<int> writable;
108 if (pending_change_) {
109 pending_change_ = false;
110 changedone_c_.notify_all();
117 aio_->wait_ready(readable, writable);
119 for (auto fd : readable) {
120 { lock ml(m_); cb = callbacks_[fd]; }
121 if (cb) cb->read_cb(fd);
124 for (auto fd : writable) {
125 { lock ml(m_); cb = callbacks_[fd]; }
126 if (cb) cb->write_cb(fd);
131 SelectAIO::SelectAIO()
138 FD_SET(pipe_[0], &rfds_);
141 pipe_[0].flags() |= O_NONBLOCK;
144 void SelectAIO::watch_fd(int fd, poll_flag flag) {
145 VERIFY(fd < MAX_POLL_FDS);
151 if (flag & CB_RDONLY)
154 if (flag & CB_WRONLY)
157 VERIFY(pipe_[1].write((char)1)==1);
160 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
161 VERIFY(fd < MAX_POLL_FDS);
164 VERIFY((flag & ~CB_RDWR) == 0);
165 if (flag & CB_RDONLY)
167 if (flag & CB_WRONLY)
171 for (int i = 0; i <= highfds_; i++) {
172 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
178 VERIFY(pipe_[1].write((char)1)==1);
180 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
183 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
195 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
197 if (ret < 0 && errno == EINTR)
201 IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
205 for (int fd = 0; fd <= high; fd++) {
206 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
208 VERIFY(pipe_[0].read(tmp)==1);
211 if (FD_ISSET(fd, &twfds))
212 writable.push_back(fd);
214 if (FD_ISSET(fd, &trfds))
215 readable.push_back(fd);
222 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
223 size_t fd = (size_t)fd_;
225 VERIFY(fd < MAX_POLL_FDS);
227 struct epoll_event ev;
228 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
229 fdstatus_[fd] |= (unsigned)flag;
234 if (fdstatus_[fd] & CB_RDONLY)
235 ev.events |= EPOLLIN;
237 if (fdstatus_[fd] & CB_WRONLY)
238 ev.events |= EPOLLOUT;
241 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
243 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
246 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
247 size_t fd = (size_t)fd_;
249 VERIFY(fd < MAX_POLL_FDS);
250 fdstatus_[fd] &= ~(unsigned)flag;
252 struct epoll_event ev;
253 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
258 if (fdstatus_[fd] & CB_RDONLY)
259 ev.events |= EPOLLIN;
261 if (fdstatus_[fd] & CB_WRONLY)
262 ev.events |= EPOLLOUT;
265 VERIFY(op == EPOLL_CTL_DEL);
267 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
268 return (op == EPOLL_CTL_DEL);
271 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
273 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
274 for (int i = 0; i < nfds; i++) {
275 if (ready_[i].events & EPOLLIN)
276 readable.push_back(ready_[i].data.fd);
278 if (ready_[i].events & EPOLLOUT)
279 writable.push_back(ready_[i].data.fd);