8 #include "method_thread.h"
9 #include "lang/verify.h"
12 PollMgr *PollMgr::instance = NULL;
13 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
18 PollMgr::instance = new PollMgr();
24 pthread_once(&pollmgr_is_initialized, PollMgrInit);
28 PollMgr::PollMgr() : pending_change_(false)
30 bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
31 aio_ = new SelectAIO();
32 //aio_ = new EPollAIO();
34 VERIFY(pthread_mutex_init(&m_, NULL) == 0);
35 VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
36 VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
46 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
48 VERIFY(fd < MAX_POLL_FDS);
51 aio_->watch_fd(fd, flag);
53 VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
57 //remove all callbacks related to fd
58 //the return guarantees that callbacks related to fd
59 //will never be called again
61 PollMgr::block_remove_fd(int fd)
64 aio_->unwatch_fd(fd, CB_RDWR);
65 pending_change_ = true;
66 VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
67 callbacks_[fd] = NULL;
71 PollMgr::del_callback(int fd, poll_flag flag)
74 if (aio_->unwatch_fd(fd, flag)) {
75 callbacks_[fd] = NULL;
80 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
83 if (!callbacks_[fd] || callbacks_[fd]!=c)
86 return aio_->is_watched(fd, flag);
93 std::vector<int> readable;
94 std::vector<int> writable;
99 if (pending_change_) {
100 pending_change_ = false;
101 VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
106 aio_->wait_ready(&readable,&writable);
108 if (!readable.size() && !writable.size()) {
112 //because no add_callback() and del_callback should
113 //modify callbacks_[fd] while the fd is not dead
114 for (unsigned int i = 0; i < readable.size(); i++) {
115 int fd = readable[i];
117 callbacks_[fd]->read_cb(fd);
120 for (unsigned int i = 0; i < writable.size(); i++) {
121 int fd = writable[i];
123 callbacks_[fd]->write_cb(fd);
128 SelectAIO::SelectAIO() : highfds_(0)
133 VERIFY(pipe(pipefd_) == 0);
134 FD_SET(pipefd_[0], &rfds_);
135 highfds_ = pipefd_[0];
137 int flags = fcntl(pipefd_[0], F_GETFL, NULL);
139 fcntl(pipefd_[0], F_SETFL, flags);
141 VERIFY(pthread_mutex_init(&m_, NULL) == 0);
144 SelectAIO::~SelectAIO()
146 VERIFY(pthread_mutex_destroy(&m_) == 0);
150 SelectAIO::watch_fd(int fd, poll_flag flag)
156 if (flag == CB_RDONLY) {
158 }else if (flag == CB_WRONLY) {
166 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
170 SelectAIO::is_watched(int fd, poll_flag flag)
173 if (flag == CB_RDONLY) {
174 return FD_ISSET(fd,&rfds_);
175 }else if (flag == CB_WRONLY) {
176 return FD_ISSET(fd,&wfds_);
178 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
183 SelectAIO::unwatch_fd(int fd, poll_flag flag)
186 if (flag == CB_RDONLY) {
188 }else if (flag == CB_WRONLY) {
190 }else if (flag == CB_RDWR) {
197 if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
198 if (fd == highfds_) {
199 int newh = pipefd_[0];
200 for (int i = 0; i <= highfds_; i++) {
201 if (FD_ISSET(i, &rfds_)) {
203 }else if (FD_ISSET(i, &wfds_)) {
210 if (flag == CB_RDWR) {
212 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
214 return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
218 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
231 int ret = select(high+1, &trfds, &twfds, NULL, NULL);
234 if (errno == EINTR) {
238 jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
243 for (int fd = 0; fd <= high; fd++) {
244 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
246 VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
249 if (FD_ISSET(fd, &twfds)) {
250 writable->push_back(fd);
252 if (FD_ISSET(fd, &trfds)) {
253 readable->push_back(fd);
263 pollfd_ = epoll_create(MAX_POLL_FDS);
264 VERIFY(pollfd_ >= 0);
265 bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
268 EPollAIO::~EPollAIO()
274 int poll_flag_to_event(poll_flag flag)
277 if (flag == CB_RDONLY) {
279 }else if (flag == CB_WRONLY) {
281 }else { //flag == CB_RDWR
282 f = EPOLLIN | EPOLLOUT;
288 EPollAIO::watch_fd(int fd, poll_flag flag)
290 VERIFY(fd < MAX_POLL_FDS);
292 struct epoll_event ev;
293 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
294 fdstatus_[fd] |= (int)flag;
299 if (fdstatus_[fd] & CB_RDONLY) {
300 ev.events |= EPOLLIN;
302 if (fdstatus_[fd] & CB_WRONLY) {
303 ev.events |= EPOLLOUT;
306 if (flag == CB_RDWR) {
307 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
310 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
314 EPollAIO::unwatch_fd(int fd, poll_flag flag)
316 VERIFY(fd < MAX_POLL_FDS);
317 fdstatus_[fd] &= ~(int)flag;
319 struct epoll_event ev;
320 int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
325 if (fdstatus_[fd] & CB_RDONLY) {
326 ev.events |= EPOLLIN;
328 if (fdstatus_[fd] & CB_WRONLY) {
329 ev.events |= EPOLLOUT;
332 if (flag == CB_RDWR) {
333 VERIFY(op == EPOLL_CTL_DEL);
335 VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
336 return (op == EPOLL_CTL_DEL);
340 EPollAIO::is_watched(int fd, poll_flag flag)
342 VERIFY(fd < MAX_POLL_FDS);
343 return ((fdstatus_[fd] & CB_MASK) == flag);
347 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
349 int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
350 for (int i = 0; i < nfds; i++) {
351 if (ready_[i].events & EPOLLIN) {
352 readable->push_back(ready_[i].data.fd);
354 if (ready_[i].events & EPOLLOUT) {
355 writable->push_back(ready_[i].data.fd);