1 #include "include/rpc/poll_mgr.h"
3 #include <sys/select.h>
4 #include "include/rpc/file.h"
5 #include "include/debug.h"
13 aio_callback::~aio_callback() {}
17 virtual void watch_fd(int fd, poll_flag flag) = 0;
18 virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
19 virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
20 virtual ~wait_manager() noexcept;
23 wait_manager::~wait_manager() noexcept {}
25 class SelectAIO : public wait_manager {
29 void watch_fd(int fd, poll_flag flag);
30 bool unwatch_fd(int fd, poll_flag flag);
31 void wait_ready(vector<int> & readable, vector<int> & writable);
41 class EPollAIO : public wait_manager {
44 ~EPollAIO() throw() { }
45 void watch_fd(int fd, poll_flag flag);
46 bool unwatch_fd(int fd, poll_flag flag);
47 void wait_ready(vector<int> & readable, vector<int> & writable);
50 file_t poll_ = epoll_create(MAX_POLL_FDS);
51 struct epoll_event ready_[MAX_POLL_FDS];
52 vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
57 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
58 th_ = thread(&poll_mgr::wait_loop, this);
61 poll_mgr::~poll_mgr() {
65 void poll_mgr::shutdown() {
69 for (auto p : callbacks_)
70 aio_->unwatch_fd(p.first, CB_RDWR);
71 pending_change_ = true;
73 changedone_c_.wait(ml);
78 void poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch) {
80 aio_->watch_fd(fd, flag);
82 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
86 // Remove all callbacks related to fd. After this returns, we guarantee that
87 // callbacks related to fd will never be called again.
88 void poll_mgr::block_remove_fd(int fd) {
90 aio_->unwatch_fd(fd, CB_RDWR);
91 pending_change_ = true;
92 changedone_c_.wait(ml);
93 callbacks_[fd] = nullptr;
96 void poll_mgr::del_callback(int fd, poll_flag flag) {
98 if (aio_->unwatch_fd(fd, flag))
99 callbacks_[fd] = nullptr;
102 void poll_mgr::wait_loop() {
103 vector<int> readable;
104 vector<int> writable;
110 if (pending_change_) {
111 pending_change_ = false;
112 changedone_c_.notify_all();
119 aio_->wait_ready(readable, writable);
121 for (auto fd : readable) {
122 { lock ml(m_); cb = callbacks_[fd]; }
123 if (cb) cb->read_cb(fd);
126 for (auto fd : writable) {
127 { lock ml(m_); cb = callbacks_[fd]; }
128 if (cb) cb->write_cb(fd);
133 SelectAIO::SelectAIO()
140 FD_SET(pipe_[0], &rfds_);
143 pipe_[0].flags() |= O_NONBLOCK;
146 void SelectAIO::watch_fd(int fd, poll_flag flag) {
147 VERIFY(fd < MAX_POLL_FDS);
153 if (flag & CB_RDONLY)
156 if (flag & CB_WRONLY)
159 VERIFY(pipe_[1].write((char)1)==1);
162 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
163 VERIFY(fd < MAX_POLL_FDS);
166 VERIFY((flag & ~CB_RDWR) == 0);
167 if (flag & CB_RDONLY)
169 if (flag & CB_WRONLY)
173 for (int i = 0; i <= highfds_; i++) {
174 if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
180 VERIFY(pipe_[1].write((char)1)==1);
182 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
185 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
197 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
199 if (ret < 0 && errno == EINTR)
203 IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
207 for (int fd = 0; fd <= high; fd++) {
208 if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
210 VERIFY(pipe_[0].read(tmp)==1);
213 if (FD_ISSET(fd, &twfds))
214 writable.push_back(fd);
216 if (FD_ISSET(fd, &trfds))
217 readable.push_back(fd);
224 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
225 size_t fd = (size_t)fd_;
227 VERIFY(fd < MAX_POLL_FDS);
229 struct epoll_event ev;
230 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
231 fdstatus_[fd] |= (unsigned)flag;
236 if (fdstatus_[fd] & CB_RDONLY)
237 ev.events |= EPOLLIN;
239 if (fdstatus_[fd] & CB_WRONLY)
240 ev.events |= EPOLLOUT;
243 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
245 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
248 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
249 size_t fd = (size_t)fd_;
251 VERIFY(fd < MAX_POLL_FDS);
252 fdstatus_[fd] &= ~(unsigned)flag;
254 struct epoll_event ev;
255 int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
260 if (fdstatus_[fd] & CB_RDONLY)
261 ev.events |= EPOLLIN;
263 if (fdstatus_[fd] & CB_WRONLY)
264 ev.events |= EPOLLOUT;
267 VERIFY(op == EPOLL_CTL_DEL);
269 VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
270 return (op == EPOLL_CTL_DEL);
273 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
275 int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
276 for (int i = 0; i < nfds; i++) {
277 if (ready_[i].events & EPOLLIN)
278 readable.push_back(ready_[i].data.fd);
280 if (ready_[i].events & EPOLLOUT)
281 writable.push_back(ready_[i].data.fd);