3 #include <sys/select.h>
12 aio_callback::~aio_callback() {}
14 poll_mgr poll_mgr::shared_mgr;
18 virtual void watch_fd(int fd, poll_flag flag) = 0;
19 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21 virtual ~wait_manager() noexcept;
24 wait_manager::~wait_manager() noexcept {}
26 class SelectAIO : public wait_manager {
30 void watch_fd(int fd, poll_flag flag);
31 bool unwatch_fd(int fd, poll_flag flag);
32 void wait_ready(vector<int> & readable, vector<int> & writable);
42 class EPollAIO : public wait_manager {
45 ~EPollAIO() throw() { }
46 void watch_fd(int fd, poll_flag flag);
47 bool unwatch_fd(int fd, poll_flag flag);
48 void wait_ready(vector<int> & readable, vector<int> & writable);
51 file_t poll_ = epoll_create(MAX_POLL_FDS);
52 struct epoll_event ready_[MAX_POLL_FDS];
53 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
58 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
59 th_ = thread(&poll_mgr::wait_loop, this);
62 poll_mgr::~poll_mgr() {
66 void poll_mgr::shutdown() {
70 for (auto p : callbacks_)
71 aio_->unwatch_fd(p.first, CB_RDWR);
72 pending_change_ = true;
74 changedone_c_.wait(ml);
79 void poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch) {
81 aio_->watch_fd(fd, flag);
83 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
87 // Remove all callbacks related to fd. After this returns, we guarantee that
88 // callbacks related to fd will never be called again.
89 void poll_mgr::block_remove_fd(int fd) {
91 aio_->unwatch_fd(fd, CB_RDWR);
92 pending_change_ = true;
93 changedone_c_.wait(ml);
94 callbacks_[fd] = nullptr;
97 void poll_mgr::del_callback(int fd, poll_flag flag) {
99 if (aio_->unwatch_fd(fd, flag))
100 callbacks_[fd] = nullptr;
103 void poll_mgr::wait_loop() {
104 vector<int> readable;
105 vector<int> writable;
111 if (pending_change_) {
112 pending_change_ = false;
113 changedone_c_.notify_all();
120 aio_->wait_ready(readable, writable);
122 for (auto fd : readable) {
123 { lock ml(m_); cb = callbacks_[fd]; }
124 if (cb) cb->read_cb(fd);
127 for (auto fd : writable) {
128 { lock ml(m_); cb = callbacks_[fd]; }
129 if (cb) cb->write_cb(fd);
134 SelectAIO::SelectAIO()
141 FD_SET(pipe_[0], &rfds_);
144 pipe_[0].flags() |= O_NONBLOCK;
147 void SelectAIO::watch_fd(int fd, poll_flag flag) {
148 VERIFY(fd < MAX_POLL_FDS);
154 if (flag & CB_RDONLY)
157 if (flag & CB_WRONLY)
160 VERIFY(pipe_[1].write((char)1)==1);
163 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
164 VERIFY(fd < MAX_POLL_FDS);
167 VERIFY((flag & ~CB_RDWR) == 0);
168 if (flag & CB_RDONLY)
170 if (flag & CB_WRONLY)
174 for (int i = 0; i <= highfds_; i++) {
175 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
181 VERIFY(pipe_[1].write((char)1)==1);
183 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
186 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
198 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
200 if (ret < 0 && errno == EINTR)
204 IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
208 for (int fd = 0; fd <= high; fd++) {
209 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
211 VERIFY(pipe_[0].read(tmp)==1);
214 if (FD_ISSET(fd, &twfds))
215 writable.push_back(fd);
217 if (FD_ISSET(fd, &trfds))
218 readable.push_back(fd);
225 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
226 size_t fd = (size_t)fd_;
228 VERIFY(fd < MAX_POLL_FDS);
230 struct epoll_event ev;
231 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
232 fdstatus_[fd] |= (unsigned)flag;
237 if (fdstatus_[fd] & CB_RDONLY)
238 ev.events |= EPOLLIN;
240 if (fdstatus_[fd] & CB_WRONLY)
241 ev.events |= EPOLLOUT;
244 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
246 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
249 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
250 size_t fd = (size_t)fd_;
252 VERIFY(fd < MAX_POLL_FDS);
253 fdstatus_[fd] &= ~(unsigned)flag;
255 struct epoll_event ev;
256 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
261 if (fdstatus_[fd] & CB_RDONLY)
262 ev.events |= EPOLLIN;
264 if (fdstatus_[fd] & CB_WRONLY)
265 ev.events |= EPOLLOUT;
268 VERIFY(op == EPOLL_CTL_DEL);
270 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
271 return (op == EPOLL_CTL_DEL);
274 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
276 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
277 for (int i = 0; i < nfds; i++) {
278 if (ready_[i].events & EPOLLIN)
279 readable.push_back(ready_[i].data.fd);
281 if (ready_[i].events & EPOLLOUT)
282 writable.push_back(ready_[i].data.fd);