Build on wheezy, and presumably precise
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include <sys/time.h>
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <unistd.h>
5
6 #include "slock.h"
7 #include "jsl_log.h"
8 #include "method_thread.h"
9 #include "lang/verify.h"
10 #include "pollmgr.h"
11
12 PollMgr *PollMgr::instance = NULL;
13 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
14
15 void
16 PollMgrInit()
17 {
18         PollMgr::instance = new PollMgr();
19 }
20
21 PollMgr *
22 PollMgr::Instance()
23 {
24         pthread_once(&pollmgr_is_initialized, PollMgrInit);
25         return instance;
26 }
27
28 PollMgr::PollMgr() : pending_change_(false)
29 {
30         bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
31         aio_ = new SelectAIO();
32         //aio_ = new EPollAIO();
33
34         VERIFY(pthread_mutex_init(&m_, NULL) == 0);
35         VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
36         VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
37 }
38
39 PollMgr::~PollMgr()
40 {
41         //never kill me!!!
42         VERIFY(0);
43 }
44
45 void
46 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
47 {
48         VERIFY(fd < MAX_POLL_FDS);
49
50         ScopedLock ml(&m_);
51         aio_->watch_fd(fd, flag);
52
53         VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
54         callbacks_[fd] = ch;
55 }
56
57 //remove all callbacks related to fd
58 //the return guarantees that callbacks related to fd
59 //will never be called again
60 void
61 PollMgr::block_remove_fd(int fd)
62 {
63         ScopedLock ml(&m_);
64         aio_->unwatch_fd(fd, CB_RDWR);
65         pending_change_ = true;
66         VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
67         callbacks_[fd] = NULL;
68 }
69
70 void
71 PollMgr::del_callback(int fd, poll_flag flag)
72 {
73         ScopedLock ml(&m_);
74         if (aio_->unwatch_fd(fd, flag)) {
75                 callbacks_[fd] = NULL;
76         }
77 }
78
79 bool
80 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
81 {
82         ScopedLock ml(&m_);
83         if (!callbacks_[fd] || callbacks_[fd]!=c)
84                 return false;
85
86         return aio_->is_watched(fd, flag);
87 }
88
89 void
90 PollMgr::wait_loop()
91 {
92
93         std::vector<int> readable;
94         std::vector<int> writable;
95
96         while (1) {
97                 {
98                         ScopedLock ml(&m_);
99                         if (pending_change_) {
100                                 pending_change_ = false;
101                                 VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
102                         }
103                 }
104                 readable.clear();
105                 writable.clear();
106                 aio_->wait_ready(&readable,&writable);
107
108                 if (!readable.size() && !writable.size()) {
109                         continue;
110                 } 
111                 //no locking of m_
112                 //because no add_callback() and del_callback should 
113                 //modify callbacks_[fd] while the fd is not dead
114                 for (unsigned int i = 0; i < readable.size(); i++) {
115                         int fd = readable[i];
116                         if (callbacks_[fd])
117                                 callbacks_[fd]->read_cb(fd);
118                 }
119
120                 for (unsigned int i = 0; i < writable.size(); i++) {
121                         int fd = writable[i];
122                         if (callbacks_[fd])
123                                 callbacks_[fd]->write_cb(fd);
124                 }
125         }
126 }
127
128 SelectAIO::SelectAIO() : highfds_(0)
129 {
130         FD_ZERO(&rfds_);
131         FD_ZERO(&wfds_);
132
133         VERIFY(pipe(pipefd_) == 0);
134         FD_SET(pipefd_[0], &rfds_);
135         highfds_ = pipefd_[0];
136
137         int flags = fcntl(pipefd_[0], F_GETFL, NULL);
138         flags |= O_NONBLOCK;
139         fcntl(pipefd_[0], F_SETFL, flags);
140
141         VERIFY(pthread_mutex_init(&m_, NULL) == 0);
142 }
143
144 SelectAIO::~SelectAIO()
145 {
146         VERIFY(pthread_mutex_destroy(&m_) == 0);
147 }
148
149 void
150 SelectAIO::watch_fd(int fd, poll_flag flag)
151 {
152         ScopedLock ml(&m_);
153         if (highfds_ <= fd) 
154                 highfds_ = fd;
155
156         if (flag == CB_RDONLY) {
157                 FD_SET(fd,&rfds_);
158         }else if (flag == CB_WRONLY) {
159                 FD_SET(fd,&wfds_);
160         }else {
161                 FD_SET(fd,&rfds_);
162                 FD_SET(fd,&wfds_);
163         }
164
165         char tmp = 1;
166         VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
167 }
168
169 bool
170 SelectAIO::is_watched(int fd, poll_flag flag)
171 {
172         ScopedLock ml(&m_);
173         if (flag == CB_RDONLY) {
174                 return FD_ISSET(fd,&rfds_);
175         }else if (flag == CB_WRONLY) {
176                 return FD_ISSET(fd,&wfds_);
177         }else{
178                 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
179         }
180 }
181
182 bool 
183 SelectAIO::unwatch_fd(int fd, poll_flag flag)
184 {
185         ScopedLock ml(&m_);
186         if (flag == CB_RDONLY) {
187                 FD_CLR(fd, &rfds_);
188         }else if (flag == CB_WRONLY) {
189                 FD_CLR(fd, &wfds_);
190         }else if (flag == CB_RDWR) {
191                 FD_CLR(fd, &wfds_);
192                 FD_CLR(fd, &rfds_);
193         }else{
194                 VERIFY(0);
195         }
196
197         if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
198                 if (fd == highfds_) {
199                         int newh = pipefd_[0];
200                         for (int i = 0; i <= highfds_; i++) {
201                                 if (FD_ISSET(i, &rfds_)) {
202                                         newh = i;
203                                 }else if (FD_ISSET(i, &wfds_)) {
204                                         newh = i;
205                                 }
206                         }
207                         highfds_ = newh;
208                 }
209         }
210         if (flag == CB_RDWR) {
211                 char tmp = 1;
212                 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
213         }
214         return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
215 }
216
217 void
218 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
219 {
220         fd_set trfds, twfds;
221         int high;
222
223         {
224                 ScopedLock ml(&m_);
225                 trfds = rfds_;
226                 twfds = wfds_;
227                 high = highfds_;
228
229         }
230
231         int ret = select(high+1, &trfds, &twfds, NULL, NULL);
232
233         if (ret < 0) {
234                 if (errno == EINTR) {
235                         return;
236                 } else {
237                         perror("select:");
238                         jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
239                         VERIFY(0);
240                 }
241         }
242
243         for (int fd = 0; fd <= high; fd++) {
244                 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
245                         char tmp;
246                         VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
247                         VERIFY(tmp==1);
248                 }else {
249                         if (FD_ISSET(fd, &twfds)) {
250                                 writable->push_back(fd);
251                         }
252                         if (FD_ISSET(fd, &trfds)) {
253                                 readable->push_back(fd);
254                         }
255                 }
256         }
257 }
258
259 #ifdef __linux__ 
260
261 EPollAIO::EPollAIO()
262 {
263         pollfd_ = epoll_create(MAX_POLL_FDS);
264         VERIFY(pollfd_ >= 0);
265         bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
266 }
267
268 EPollAIO::~EPollAIO()
269 {
270         close(pollfd_);
271 }
272
273 static inline
274 int poll_flag_to_event(poll_flag flag)
275 {
276         int f;
277         if (flag == CB_RDONLY) {
278                 f = EPOLLIN;
279         }else if (flag == CB_WRONLY) {
280                 f = EPOLLOUT;
281         }else { //flag == CB_RDWR
282                 f = EPOLLIN | EPOLLOUT;
283         }
284         return f;
285 }
286
287 void
288 EPollAIO::watch_fd(int fd, poll_flag flag)
289 {
290         VERIFY(fd < MAX_POLL_FDS);
291
292         struct epoll_event ev;
293         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
294         fdstatus_[fd] |= (int)flag;
295
296         ev.events = EPOLLET;
297         ev.data.fd = fd;
298
299         if (fdstatus_[fd] & CB_RDONLY) {
300                 ev.events |= EPOLLIN;
301         }
302         if (fdstatus_[fd] & CB_WRONLY) {
303                 ev.events |= EPOLLOUT;
304         }
305
306         if (flag == CB_RDWR) {
307                 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
308         }
309
310         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
311 }
312
313 bool 
314 EPollAIO::unwatch_fd(int fd, poll_flag flag)
315 {
316         VERIFY(fd < MAX_POLL_FDS);
317         fdstatus_[fd] &= ~(int)flag;
318
319         struct epoll_event ev;
320         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
321
322         ev.events = EPOLLET;
323         ev.data.fd = fd;
324
325         if (fdstatus_[fd] & CB_RDONLY) {
326                 ev.events |= EPOLLIN;
327         }
328         if (fdstatus_[fd] & CB_WRONLY) {
329                 ev.events |= EPOLLOUT;
330         }
331
332         if (flag == CB_RDWR) {
333                 VERIFY(op == EPOLL_CTL_DEL);
334         }
335         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
336         return (op == EPOLL_CTL_DEL);
337 }
338
339 bool
340 EPollAIO::is_watched(int fd, poll_flag flag)
341 {
342         VERIFY(fd < MAX_POLL_FDS);
343         return ((fdstatus_[fd] & CB_MASK) == flag);
344 }
345
346 void
347 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
348 {
349         int nfds = epoll_wait(pollfd_, ready_,  MAX_POLL_FDS, -1);
350         for (int i = 0; i < nfds; i++) {
351                 if (ready_[i].events & EPOLLIN) {
352                         readable->push_back(ready_[i].data.fd);
353                 }
354                 if (ready_[i].events & EPOLLOUT) {
355                         writable->push_back(ready_[i].data.fd);
356                 }
357         }
358 }
359
360 #endif