6 #include "lang/verify.h"
10 PollMgr *PollMgr::instance = NULL;
11 static std::once_flag pollmgr_is_initialized;
16 PollMgr::instance = new PollMgr();
22 std::call_once(pollmgr_is_initialized, PollMgrInit);
26 PollMgr::PollMgr() : pending_change_(false)
28 bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
29 aio_ = new SelectAIO();
30 //aio_ = new EPollAIO();
32 th_ = std::thread(&PollMgr::wait_loop, this);
35 PollMgr::~PollMgr() [[noreturn]]
42 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
44 VERIFY(fd < MAX_POLL_FDS);
47 aio_->watch_fd(fd, flag);
49 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
53 //remove all callbacks related to fd
54 //the return guarantees that callbacks related to fd
55 //will never be called again
57 PollMgr::block_remove_fd(int fd)
60 aio_->unwatch_fd(fd, CB_RDWR);
61 pending_change_ = true;
62 changedone_c_.wait(ml);
63 callbacks_[fd] = NULL;
67 PollMgr::del_callback(int fd, poll_flag flag)
70 if (aio_->unwatch_fd(fd, flag)) {
71 callbacks_[fd] = NULL;
76 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
79 if (!callbacks_[fd] || callbacks_[fd]!=c)
82 return aio_->is_watched(fd, flag);
86 PollMgr::wait_loop() [[noreturn]]
89 std::vector<int> readable;
90 std::vector<int> writable;
95 if (pending_change_) {
96 pending_change_ = false;
97 changedone_c_.notify_all();
102 aio_->wait_ready(&readable,&writable);
104 if (!readable.size() && !writable.size()) {
108 //because no add_callback() and del_callback should
109 //modify callbacks_[fd] while the fd is not dead
110 for (unsigned int i = 0; i < readable.size(); i++) {
111 int fd = readable[i];
113 callbacks_[fd]->read_cb(fd);
116 for (unsigned int i = 0; i < writable.size(); i++) {
117 int fd = writable[i];
119 callbacks_[fd]->write_cb(fd);
124 SelectAIO::SelectAIO() : highfds_(0)
129 VERIFY(pipe(pipefd_) == 0);
130 FD_SET(pipefd_[0], &rfds_);
131 highfds_ = pipefd_[0];
133 int flags = fcntl(pipefd_[0], F_GETFL, NULL);
135 fcntl(pipefd_[0], F_SETFL, flags);
138 SelectAIO::~SelectAIO()
143 SelectAIO::watch_fd(int fd, poll_flag flag)
149 if (flag == CB_RDONLY) {
151 }else if (flag == CB_WRONLY) {
159 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
163 SelectAIO::is_watched(int fd, poll_flag flag)
166 if (flag == CB_RDONLY) {
167 return FD_ISSET(fd,&rfds_);
168 }else if (flag == CB_WRONLY) {
169 return FD_ISSET(fd,&wfds_);
171 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
176 SelectAIO::unwatch_fd(int fd, poll_flag flag)
179 if (flag == CB_RDONLY) {
181 }else if (flag == CB_WRONLY) {
183 }else if (flag == CB_RDWR) {
190 if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
191 if (fd == highfds_) {
192 int newh = pipefd_[0];
193 for (int i = 0; i <= highfds_; i++) {
194 if (FD_ISSET(i, &rfds_)) {
196 }else if (FD_ISSET(i, &wfds_)) {
203 if (flag == CB_RDWR) {
205 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
207 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
211 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
223 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
226 if (errno == EINTR) {
230 jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
235 for (int fd = 0; fd <= high; fd++) {
236 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
238 VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
241 if (FD_ISSET(fd, &twfds)) {
242 writable->push_back(fd);
244 if (FD_ISSET(fd, &trfds)) {
245 readable->push_back(fd);
255 pollfd_ = epoll_create(MAX_POLL_FDS);
256 VERIFY(pollfd_ >= 0);
257 bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
260 EPollAIO::~EPollAIO()
266 int poll_flag_to_event(poll_flag flag)
269 if (flag == CB_RDONLY) {
271 }else if (flag == CB_WRONLY) {
273 }else { //flag == CB_RDWR
274 f = EPOLLIN | EPOLLOUT;
280 EPollAIO::watch_fd(int fd, poll_flag flag)
282 VERIFY(fd < MAX_POLL_FDS);
284 struct epoll_event ev;
285 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
286 fdstatus_[fd] |= (int)flag;
291 if (fdstatus_[fd] & CB_RDONLY) {
292 ev.events |= EPOLLIN;
294 if (fdstatus_[fd] & CB_WRONLY) {
295 ev.events |= EPOLLOUT;
298 if (flag == CB_RDWR) {
299 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
302 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
306 EPollAIO::unwatch_fd(int fd, poll_flag flag)
308 VERIFY(fd < MAX_POLL_FDS);
309 fdstatus_[fd] &= ~(int)flag;
311 struct epoll_event ev;
312 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
317 if (fdstatus_[fd] & CB_RDONLY) {
318 ev.events |= EPOLLIN;
320 if (fdstatus_[fd] & CB_WRONLY) {
321 ev.events |= EPOLLOUT;
324 if (flag == CB_RDWR) {
325 VERIFY(op == EPOLL_CTL_DEL);
327 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
328 return (op == EPOLL_CTL_DEL);
332 EPollAIO::is_watched(int fd, poll_flag flag)
334 VERIFY(fd < MAX_POLL_FDS);
335 return ((fdstatus_[fd] & CB_MASK) == flag);
339 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
341 int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
342 for (int i = 0; i < nfds; i++) {
343 if (ready_[i].events & EPOLLIN) {
344 readable->push_back(ready_[i].data.fd);
346 if (ready_[i].events & EPOLLOUT) {
347 writable->push_back(ready_[i].data.fd);