a9382843ec698a4de9d9b08784d7cdff7df4a35b
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include "types.h"
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <unistd.h>
5
6 #include "pollmgr.h"
7
8 PollMgr *PollMgr::instance = NULL;
9 static once_flag pollmgr_is_initialized;
10
11 static void
12 PollMgrInit()
13 {
14     PollMgr::instance = new PollMgr();
15 }
16
17 PollMgr *
18 PollMgr::Instance()
19 {
20     call_once(pollmgr_is_initialized, PollMgrInit);
21     return instance;
22 }
23
24 PollMgr::PollMgr() : pending_change_(false)
25 {
26     bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
27     aio_ = new SelectAIO();
28     //aio_ = new EPollAIO();
29
30     th_ = thread(&PollMgr::wait_loop, this);
31 }
32
33 PollMgr::~PollMgr() [[noreturn]]
34 {
35     //never kill me!!!
36     VERIFY(0);
37 }
38
39 void
40 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
41 {
42     VERIFY(fd < MAX_POLL_FDS);
43
44     lock ml(m_);
45     aio_->watch_fd(fd, flag);
46
47     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
48     callbacks_[fd] = ch;
49 }
50
51 //remove all callbacks related to fd
52 //the return guarantees that callbacks related to fd
53 //will never be called again
54 void
55 PollMgr::block_remove_fd(int fd)
56 {
57     lock ml(m_);
58     aio_->unwatch_fd(fd, CB_RDWR);
59     pending_change_ = true;
60     changedone_c_.wait(ml);
61     callbacks_[fd] = NULL;
62 }
63
64 void
65 PollMgr::del_callback(int fd, poll_flag flag)
66 {
67     lock ml(m_);
68     if (aio_->unwatch_fd(fd, flag)) {
69         callbacks_[fd] = NULL;
70     }
71 }
72
73 bool
74 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
75 {
76     lock ml(m_);
77     if (!callbacks_[fd] || callbacks_[fd]!=c)
78         return false;
79
80     return aio_->is_watched(fd, flag);
81 }
82
83 void
84 PollMgr::wait_loop() [[noreturn]]
85 {
86
87     vector<int> readable;
88     vector<int> writable;
89
90     while (1) {
91         {
92             lock ml(m_);
93             if (pending_change_) {
94                 pending_change_ = false;
95                 changedone_c_.notify_all();
96             }
97         }
98         readable.clear();
99         writable.clear();
100         aio_->wait_ready(&readable,&writable);
101
102         if (!readable.size() && !writable.size()) {
103             continue;
104         } 
105         //no locking of m_
106         //because no add_callback() and del_callback should 
107         //modify callbacks_[fd] while the fd is not dead
108         for (unsigned int i = 0; i < readable.size(); i++) {
109             int fd = readable[i];
110             if (callbacks_[fd])
111                 callbacks_[fd]->read_cb(fd);
112         }
113
114         for (unsigned int i = 0; i < writable.size(); i++) {
115             int fd = writable[i];
116             if (callbacks_[fd])
117                 callbacks_[fd]->write_cb(fd);
118         }
119     }
120 }
121
122 SelectAIO::SelectAIO() : highfds_(0)
123 {
124     FD_ZERO(&rfds_);
125     FD_ZERO(&wfds_);
126
127     VERIFY(pipe(pipefd_) == 0);
128     FD_SET(pipefd_[0], &rfds_);
129     highfds_ = pipefd_[0];
130
131     int flags = fcntl(pipefd_[0], F_GETFL, NULL);
132     flags |= O_NONBLOCK;
133     fcntl(pipefd_[0], F_SETFL, flags);
134 }
135
136 SelectAIO::~SelectAIO()
137 {
138 }
139
140 void
141 SelectAIO::watch_fd(int fd, poll_flag flag)
142 {
143     lock ml(m_);
144     if (highfds_ <= fd) 
145         highfds_ = fd;
146
147     if (flag == CB_RDONLY) {
148         FD_SET(fd,&rfds_);
149     }else if (flag == CB_WRONLY) {
150         FD_SET(fd,&wfds_);
151     }else {
152         FD_SET(fd,&rfds_);
153         FD_SET(fd,&wfds_);
154     }
155
156     char tmp = 1;
157     VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
158 }
159
160 bool
161 SelectAIO::is_watched(int fd, poll_flag flag)
162 {
163     lock ml(m_);
164     if (flag == CB_RDONLY) {
165         return FD_ISSET(fd,&rfds_);
166     }else if (flag == CB_WRONLY) {
167         return FD_ISSET(fd,&wfds_);
168     }else{
169         return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
170     }
171 }
172
173 bool 
174 SelectAIO::unwatch_fd(int fd, poll_flag flag)
175 {
176     lock ml(m_);
177     if (flag == CB_RDONLY) {
178         FD_CLR(fd, &rfds_);
179     }else if (flag == CB_WRONLY) {
180         FD_CLR(fd, &wfds_);
181     }else if (flag == CB_RDWR) {
182         FD_CLR(fd, &wfds_);
183         FD_CLR(fd, &rfds_);
184     }else{
185         VERIFY(0);
186     }
187
188     if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
189         if (fd == highfds_) {
190             int newh = pipefd_[0];
191             for (int i = 0; i <= highfds_; i++) {
192                 if (FD_ISSET(i, &rfds_)) {
193                     newh = i;
194                 }else if (FD_ISSET(i, &wfds_)) {
195                     newh = i;
196                 }
197             }
198             highfds_ = newh;
199         }
200     }
201     if (flag == CB_RDWR) {
202         char tmp = 1;
203         VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
204     }
205     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
206 }
207
208 void
209 SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
210 {
211     fd_set trfds, twfds;
212     int high;
213
214     {
215         lock ml(m_);
216         trfds = rfds_;
217         twfds = wfds_;
218         high = highfds_;
219     }
220
221     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
222
223     if (ret < 0) {
224         if (errno == EINTR) {
225             return;
226         } else {
227             perror("select:");
228             IF_LEVEL(0) LOG("select_loop failure errno " << errno);
229             VERIFY(0);
230         }
231     }
232
233     for (int fd = 0; fd <= high; fd++) {
234         if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
235             char tmp;
236             VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
237             VERIFY(tmp==1);
238         }else {
239             if (FD_ISSET(fd, &twfds)) {
240                 writable->push_back(fd);
241             }
242             if (FD_ISSET(fd, &trfds)) {
243                 readable->push_back(fd);
244             }
245         }
246     }
247 }
248
249 #ifdef __linux__ 
250
251 EPollAIO::EPollAIO()
252 {
253     pollfd_ = epoll_create(MAX_POLL_FDS);
254     VERIFY(pollfd_ >= 0);
255     bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
256 }
257
258 EPollAIO::~EPollAIO()
259 {
260     close(pollfd_);
261 }
262
263 static inline
264 int poll_flag_to_event(poll_flag flag)
265 {
266     int f;
267     if (flag == CB_RDONLY) {
268         f = EPOLLIN;
269     }else if (flag == CB_WRONLY) {
270         f = EPOLLOUT;
271     }else { //flag == CB_RDWR
272         f = EPOLLIN | EPOLLOUT;
273     }
274     return f;
275 }
276
277 void
278 EPollAIO::watch_fd(int fd, poll_flag flag)
279 {
280     VERIFY(fd < MAX_POLL_FDS);
281
282     struct epoll_event ev;
283     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
284     fdstatus_[fd] |= (int)flag;
285
286     ev.events = EPOLLET;
287     ev.data.fd = fd;
288
289     if (fdstatus_[fd] & CB_RDONLY) {
290         ev.events |= EPOLLIN;
291     }
292     if (fdstatus_[fd] & CB_WRONLY) {
293         ev.events |= EPOLLOUT;
294     }
295
296     if (flag == CB_RDWR) {
297         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
298     }
299
300     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
301 }
302
303 bool 
304 EPollAIO::unwatch_fd(int fd, poll_flag flag)
305 {
306     VERIFY(fd < MAX_POLL_FDS);
307     fdstatus_[fd] &= ~(int)flag;
308
309     struct epoll_event ev;
310     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
311
312     ev.events = EPOLLET;
313     ev.data.fd = fd;
314
315     if (fdstatus_[fd] & CB_RDONLY) {
316         ev.events |= EPOLLIN;
317     }
318     if (fdstatus_[fd] & CB_WRONLY) {
319         ev.events |= EPOLLOUT;
320     }
321
322     if (flag == CB_RDWR) {
323         VERIFY(op == EPOLL_CTL_DEL);
324     }
325     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
326     return (op == EPOLL_CTL_DEL);
327 }
328
329 bool
330 EPollAIO::is_watched(int fd, poll_flag flag)
331 {
332     VERIFY(fd < MAX_POLL_FDS);
333     return ((fdstatus_[fd] & CB_MASK) == flag);
334 }
335
336 void
337 EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
338 {
339     int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
340     for (int i = 0; i < nfds; i++) {
341         if (ready_[i].events & EPOLLIN) {
342             readable->push_back(ready_[i].data.fd);
343         }
344         if (ready_[i].events & EPOLLOUT) {
345             writable->push_back(ready_[i].data.fd);
346         }
347     }
348 }
349
350 #endif