8 PollMgr *PollMgr::instance = NULL;
9 static once_flag pollmgr_is_initialized;
14 PollMgr::instance = new PollMgr();
20 call_once(pollmgr_is_initialized, PollMgrInit);
24 PollMgr::PollMgr() : pending_change_(false)
26 bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
27 aio_ = new SelectAIO();
28 //aio_ = new EPollAIO();
30 th_ = thread(&PollMgr::wait_loop, this);
33 PollMgr::~PollMgr() [[noreturn]]
40 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
42 VERIFY(fd < MAX_POLL_FDS);
45 aio_->watch_fd(fd, flag);
47 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
51 //remove all callbacks related to fd
52 //the return guarantees that callbacks related to fd
53 //will never be called again
55 PollMgr::block_remove_fd(int fd)
58 aio_->unwatch_fd(fd, CB_RDWR);
59 pending_change_ = true;
60 changedone_c_.wait(ml);
61 callbacks_[fd] = NULL;
65 PollMgr::del_callback(int fd, poll_flag flag)
68 if (aio_->unwatch_fd(fd, flag)) {
69 callbacks_[fd] = NULL;
74 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
77 if (!callbacks_[fd] || callbacks_[fd]!=c)
80 return aio_->is_watched(fd, flag);
84 PollMgr::wait_loop() [[noreturn]]
93 if (pending_change_) {
94 pending_change_ = false;
95 changedone_c_.notify_all();
100 aio_->wait_ready(&readable,&writable);
102 if (!readable.size() && !writable.size()) {
106 //because no add_callback() and del_callback should
107 //modify callbacks_[fd] while the fd is not dead
108 for (unsigned int i = 0; i < readable.size(); i++) {
109 int fd = readable[i];
111 callbacks_[fd]->read_cb(fd);
114 for (unsigned int i = 0; i < writable.size(); i++) {
115 int fd = writable[i];
117 callbacks_[fd]->write_cb(fd);
122 SelectAIO::SelectAIO() : highfds_(0)
127 VERIFY(pipe(pipefd_) == 0);
128 FD_SET(pipefd_[0], &rfds_);
129 highfds_ = pipefd_[0];
131 int flags = fcntl(pipefd_[0], F_GETFL, NULL);
133 fcntl(pipefd_[0], F_SETFL, flags);
136 SelectAIO::~SelectAIO()
141 SelectAIO::watch_fd(int fd, poll_flag flag)
147 if (flag == CB_RDONLY) {
149 }else if (flag == CB_WRONLY) {
157 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
161 SelectAIO::is_watched(int fd, poll_flag flag)
164 if (flag == CB_RDONLY) {
165 return FD_ISSET(fd,&rfds_);
166 }else if (flag == CB_WRONLY) {
167 return FD_ISSET(fd,&wfds_);
169 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
174 SelectAIO::unwatch_fd(int fd, poll_flag flag)
177 if (flag == CB_RDONLY) {
179 }else if (flag == CB_WRONLY) {
181 }else if (flag == CB_RDWR) {
188 if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
189 if (fd == highfds_) {
190 int newh = pipefd_[0];
191 for (int i = 0; i <= highfds_; i++) {
192 if (FD_ISSET(i, &rfds_)) {
194 }else if (FD_ISSET(i, &wfds_)) {
201 if (flag == CB_RDWR) {
203 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
205 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
209 SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
221 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
224 if (errno == EINTR) {
228 IF_LEVEL(0) LOG("select_loop failure errno " << errno);
233 for (int fd = 0; fd <= high; fd++) {
234 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
236 VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
239 if (FD_ISSET(fd, &twfds)) {
240 writable->push_back(fd);
242 if (FD_ISSET(fd, &trfds)) {
243 readable->push_back(fd);
253 pollfd_ = epoll_create(MAX_POLL_FDS);
254 VERIFY(pollfd_ >= 0);
255 bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
258 EPollAIO::~EPollAIO()
264 int poll_flag_to_event(poll_flag flag)
267 if (flag == CB_RDONLY) {
269 }else if (flag == CB_WRONLY) {
271 }else { //flag == CB_RDWR
272 f = EPOLLIN | EPOLLOUT;
278 EPollAIO::watch_fd(int fd, poll_flag flag)
280 VERIFY(fd < MAX_POLL_FDS);
282 struct epoll_event ev;
283 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
284 fdstatus_[fd] |= (int)flag;
289 if (fdstatus_[fd] & CB_RDONLY) {
290 ev.events |= EPOLLIN;
292 if (fdstatus_[fd] & CB_WRONLY) {
293 ev.events |= EPOLLOUT;
296 if (flag == CB_RDWR) {
297 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
300 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
304 EPollAIO::unwatch_fd(int fd, poll_flag flag)
306 VERIFY(fd < MAX_POLL_FDS);
307 fdstatus_[fd] &= ~(int)flag;
309 struct epoll_event ev;
310 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
315 if (fdstatus_[fd] & CB_RDONLY) {
316 ev.events |= EPOLLIN;
318 if (fdstatus_[fd] & CB_WRONLY) {
319 ev.events |= EPOLLOUT;
322 if (flag == CB_RDWR) {
323 VERIFY(op == EPOLL_CTL_DEL);
325 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
326 return (op == EPOLL_CTL_DEL);
330 EPollAIO::is_watched(int fd, poll_flag flag)
332 VERIFY(fd < MAX_POLL_FDS);
333 return ((fdstatus_[fd] & CB_MASK) == flag);
337 EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
339 int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
340 for (int i = 0; i < nfds; i++) {
341 if (ready_[i].events & EPOLLIN) {
342 readable->push_back(ready_[i].data.fd);
344 if (ready_[i].events & EPOLLOUT) {
345 writable->push_back(ready_[i].data.fd);