9 PollMgr *PollMgr::instance = NULL;
10 static std::once_flag pollmgr_is_initialized;
15 PollMgr::instance = new PollMgr();
21 std::call_once(pollmgr_is_initialized, PollMgrInit);
25 PollMgr::PollMgr() : pending_change_(false)
27 bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
28 aio_ = new SelectAIO();
29 //aio_ = new EPollAIO();
31 th_ = std::thread(&PollMgr::wait_loop, this);
34 PollMgr::~PollMgr() [[noreturn]]
41 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
43 VERIFY(fd < MAX_POLL_FDS);
46 aio_->watch_fd(fd, flag);
48 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
52 //remove all callbacks related to fd
53 //the return guarantees that callbacks related to fd
54 //will never be called again
56 PollMgr::block_remove_fd(int fd)
59 aio_->unwatch_fd(fd, CB_RDWR);
60 pending_change_ = true;
61 changedone_c_.wait(ml);
62 callbacks_[fd] = NULL;
66 PollMgr::del_callback(int fd, poll_flag flag)
69 if (aio_->unwatch_fd(fd, flag)) {
70 callbacks_[fd] = NULL;
75 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
78 if (!callbacks_[fd] || callbacks_[fd]!=c)
81 return aio_->is_watched(fd, flag);
85 PollMgr::wait_loop() [[noreturn]]
88 std::vector<int> readable;
89 std::vector<int> writable;
94 if (pending_change_) {
95 pending_change_ = false;
96 changedone_c_.notify_all();
101 aio_->wait_ready(&readable,&writable);
103 if (!readable.size() && !writable.size()) {
107 //because no add_callback() and del_callback should
108 //modify callbacks_[fd] while the fd is not dead
109 for (unsigned int i = 0; i < readable.size(); i++) {
110 int fd = readable[i];
112 callbacks_[fd]->read_cb(fd);
115 for (unsigned int i = 0; i < writable.size(); i++) {
116 int fd = writable[i];
118 callbacks_[fd]->write_cb(fd);
123 SelectAIO::SelectAIO() : highfds_(0)
128 VERIFY(pipe(pipefd_) == 0);
129 FD_SET(pipefd_[0], &rfds_);
130 highfds_ = pipefd_[0];
132 int flags = fcntl(pipefd_[0], F_GETFL, NULL);
134 fcntl(pipefd_[0], F_SETFL, flags);
137 SelectAIO::~SelectAIO()
142 SelectAIO::watch_fd(int fd, poll_flag flag)
148 if (flag == CB_RDONLY) {
150 }else if (flag == CB_WRONLY) {
158 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
162 SelectAIO::is_watched(int fd, poll_flag flag)
165 if (flag == CB_RDONLY) {
166 return FD_ISSET(fd,&rfds_);
167 }else if (flag == CB_WRONLY) {
168 return FD_ISSET(fd,&wfds_);
170 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
175 SelectAIO::unwatch_fd(int fd, poll_flag flag)
178 if (flag == CB_RDONLY) {
180 }else if (flag == CB_WRONLY) {
182 }else if (flag == CB_RDWR) {
189 if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
190 if (fd == highfds_) {
191 int newh = pipefd_[0];
192 for (int i = 0; i <= highfds_; i++) {
193 if (FD_ISSET(i, &rfds_)) {
195 }else if (FD_ISSET(i, &wfds_)) {
202 if (flag == CB_RDWR) {
204 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
206 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
210 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
222 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
225 if (errno == EINTR) {
229 jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
234 for (int fd = 0; fd <= high; fd++) {
235 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
237 VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
240 if (FD_ISSET(fd, &twfds)) {
241 writable->push_back(fd);
243 if (FD_ISSET(fd, &trfds)) {
244 readable->push_back(fd);
254 pollfd_ = epoll_create(MAX_POLL_FDS);
255 VERIFY(pollfd_ >= 0);
256 bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
259 EPollAIO::~EPollAIO()
265 int poll_flag_to_event(poll_flag flag)
268 if (flag == CB_RDONLY) {
270 }else if (flag == CB_WRONLY) {
272 }else { //flag == CB_RDWR
273 f = EPOLLIN | EPOLLOUT;
279 EPollAIO::watch_fd(int fd, poll_flag flag)
281 VERIFY(fd < MAX_POLL_FDS);
283 struct epoll_event ev;
284 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
285 fdstatus_[fd] |= (int)flag;
290 if (fdstatus_[fd] & CB_RDONLY) {
291 ev.events |= EPOLLIN;
293 if (fdstatus_[fd] & CB_WRONLY) {
294 ev.events |= EPOLLOUT;
297 if (flag == CB_RDWR) {
298 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
301 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
305 EPollAIO::unwatch_fd(int fd, poll_flag flag)
307 VERIFY(fd < MAX_POLL_FDS);
308 fdstatus_[fd] &= ~(int)flag;
310 struct epoll_event ev;
311 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
316 if (fdstatus_[fd] & CB_RDONLY) {
317 ev.events |= EPOLLIN;
319 if (fdstatus_[fd] & CB_WRONLY) {
320 ev.events |= EPOLLOUT;
323 if (flag == CB_RDWR) {
324 VERIFY(op == EPOLL_CTL_DEL);
326 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
327 return (op == EPOLL_CTL_DEL);
331 EPollAIO::is_watched(int fd, poll_flag flag)
333 VERIFY(fd < MAX_POLL_FDS);
334 return ((fdstatus_[fd] & CB_MASK) == flag);
338 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
340 int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
341 for (int i = 0; i < nfds; i++) {
342 if (ready_[i].events & EPOLLIN) {
343 readable->push_back(ready_[i].data.fd);
345 if (ready_[i].events & EPOLLOUT) {
346 writable->push_back(ready_[i].data.fd);