More clean-ups and cool template stuff
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include <errno.h>
2 #include <fcntl.h>
3 #include <unistd.h>
4
5 #include "jsl_log.h"
6 #include "lang/verify.h"
7 #include "pollmgr.h"
8 #include "lock.h"
9
10 PollMgr *PollMgr::instance = NULL;
11 static std::once_flag pollmgr_is_initialized;
12
13 static void
14 PollMgrInit()
15 {
16         PollMgr::instance = new PollMgr();
17 }
18
19 PollMgr *
20 PollMgr::Instance()
21 {
22     std::call_once(pollmgr_is_initialized, PollMgrInit);
23         return instance;
24 }
25
26 PollMgr::PollMgr() : pending_change_(false)
27 {
28         bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
29         aio_ = new SelectAIO();
30         //aio_ = new EPollAIO();
31
32     th_ = std::thread(&PollMgr::wait_loop, this);
33 }
34
35 PollMgr::~PollMgr() [[noreturn]]
36 {
37         //never kill me!!!
38         VERIFY(0);
39 }
40
41 void
42 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
43 {
44         VERIFY(fd < MAX_POLL_FDS);
45
46     lock ml(m_);
47         aio_->watch_fd(fd, flag);
48
49         VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
50         callbacks_[fd] = ch;
51 }
52
53 //remove all callbacks related to fd
54 //the return guarantees that callbacks related to fd
55 //will never be called again
56 void
57 PollMgr::block_remove_fd(int fd)
58 {
59     lock ml(m_);
60         aio_->unwatch_fd(fd, CB_RDWR);
61         pending_change_ = true;
62     changedone_c_.wait(ml);
63         callbacks_[fd] = NULL;
64 }
65
66 void
67 PollMgr::del_callback(int fd, poll_flag flag)
68 {
69     lock ml(m_);
70         if (aio_->unwatch_fd(fd, flag)) {
71                 callbacks_[fd] = NULL;
72         }
73 }
74
75 bool
76 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
77 {
78     lock ml(m_);
79         if (!callbacks_[fd] || callbacks_[fd]!=c)
80                 return false;
81
82         return aio_->is_watched(fd, flag);
83 }
84
85 void
86 PollMgr::wait_loop() [[noreturn]]
87 {
88
89         std::vector<int> readable;
90         std::vector<int> writable;
91
92         while (1) {
93                 {
94             lock ml(m_);
95                         if (pending_change_) {
96                                 pending_change_ = false;
97                 changedone_c_.notify_all();
98                         }
99                 }
100                 readable.clear();
101                 writable.clear();
102                 aio_->wait_ready(&readable,&writable);
103
104                 if (!readable.size() && !writable.size()) {
105                         continue;
106                 } 
107                 //no locking of m_
108                 //because no add_callback() and del_callback should 
109                 //modify callbacks_[fd] while the fd is not dead
110                 for (unsigned int i = 0; i < readable.size(); i++) {
111                         int fd = readable[i];
112                         if (callbacks_[fd])
113                                 callbacks_[fd]->read_cb(fd);
114                 }
115
116                 for (unsigned int i = 0; i < writable.size(); i++) {
117                         int fd = writable[i];
118                         if (callbacks_[fd])
119                                 callbacks_[fd]->write_cb(fd);
120                 }
121         }
122 }
123
124 SelectAIO::SelectAIO() : highfds_(0)
125 {
126         FD_ZERO(&rfds_);
127         FD_ZERO(&wfds_);
128
129         VERIFY(pipe(pipefd_) == 0);
130         FD_SET(pipefd_[0], &rfds_);
131         highfds_ = pipefd_[0];
132
133         int flags = fcntl(pipefd_[0], F_GETFL, NULL);
134         flags |= O_NONBLOCK;
135         fcntl(pipefd_[0], F_SETFL, flags);
136 }
137
138 SelectAIO::~SelectAIO()
139 {
140 }
141
142 void
143 SelectAIO::watch_fd(int fd, poll_flag flag)
144 {
145     lock ml(m_);
146         if (highfds_ <= fd) 
147                 highfds_ = fd;
148
149         if (flag == CB_RDONLY) {
150                 FD_SET(fd,&rfds_);
151         }else if (flag == CB_WRONLY) {
152                 FD_SET(fd,&wfds_);
153         }else {
154                 FD_SET(fd,&rfds_);
155                 FD_SET(fd,&wfds_);
156         }
157
158         char tmp = 1;
159         VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
160 }
161
162 bool
163 SelectAIO::is_watched(int fd, poll_flag flag)
164 {
165     lock ml(m_);
166         if (flag == CB_RDONLY) {
167                 return FD_ISSET(fd,&rfds_);
168         }else if (flag == CB_WRONLY) {
169                 return FD_ISSET(fd,&wfds_);
170         }else{
171                 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
172         }
173 }
174
175 bool 
176 SelectAIO::unwatch_fd(int fd, poll_flag flag)
177 {
178     lock ml(m_);
179         if (flag == CB_RDONLY) {
180                 FD_CLR(fd, &rfds_);
181         }else if (flag == CB_WRONLY) {
182                 FD_CLR(fd, &wfds_);
183         }else if (flag == CB_RDWR) {
184                 FD_CLR(fd, &wfds_);
185                 FD_CLR(fd, &rfds_);
186         }else{
187                 VERIFY(0);
188         }
189
190         if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
191                 if (fd == highfds_) {
192                         int newh = pipefd_[0];
193                         for (int i = 0; i <= highfds_; i++) {
194                                 if (FD_ISSET(i, &rfds_)) {
195                                         newh = i;
196                                 }else if (FD_ISSET(i, &wfds_)) {
197                                         newh = i;
198                                 }
199                         }
200                         highfds_ = newh;
201                 }
202         }
203         if (flag == CB_RDWR) {
204                 char tmp = 1;
205                 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
206         }
207         return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
208 }
209
210 void
211 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
212 {
213         fd_set trfds, twfds;
214         int high;
215
216         {
217         lock ml(m_);
218                 trfds = rfds_;
219                 twfds = wfds_;
220                 high = highfds_;
221         }
222
223         int ret = select(high+1, &trfds, &twfds, NULL, NULL);
224
225         if (ret < 0) {
226                 if (errno == EINTR) {
227                         return;
228                 } else {
229                         perror("select:");
230                         jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
231                         VERIFY(0);
232                 }
233         }
234
235         for (int fd = 0; fd <= high; fd++) {
236                 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
237                         char tmp;
238                         VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
239                         VERIFY(tmp==1);
240                 }else {
241                         if (FD_ISSET(fd, &twfds)) {
242                                 writable->push_back(fd);
243                         }
244                         if (FD_ISSET(fd, &trfds)) {
245                                 readable->push_back(fd);
246                         }
247                 }
248         }
249 }
250
251 #ifdef __linux__ 
252
253 EPollAIO::EPollAIO()
254 {
255         pollfd_ = epoll_create(MAX_POLL_FDS);
256         VERIFY(pollfd_ >= 0);
257         bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
258 }
259
260 EPollAIO::~EPollAIO()
261 {
262         close(pollfd_);
263 }
264
265 static inline
266 int poll_flag_to_event(poll_flag flag)
267 {
268         int f;
269         if (flag == CB_RDONLY) {
270                 f = EPOLLIN;
271         }else if (flag == CB_WRONLY) {
272                 f = EPOLLOUT;
273         }else { //flag == CB_RDWR
274                 f = EPOLLIN | EPOLLOUT;
275         }
276         return f;
277 }
278
279 void
280 EPollAIO::watch_fd(int fd, poll_flag flag)
281 {
282         VERIFY(fd < MAX_POLL_FDS);
283
284         struct epoll_event ev;
285         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
286         fdstatus_[fd] |= (int)flag;
287
288         ev.events = EPOLLET;
289         ev.data.fd = fd;
290
291         if (fdstatus_[fd] & CB_RDONLY) {
292                 ev.events |= EPOLLIN;
293         }
294         if (fdstatus_[fd] & CB_WRONLY) {
295                 ev.events |= EPOLLOUT;
296         }
297
298         if (flag == CB_RDWR) {
299                 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
300         }
301
302         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
303 }
304
305 bool 
306 EPollAIO::unwatch_fd(int fd, poll_flag flag)
307 {
308         VERIFY(fd < MAX_POLL_FDS);
309         fdstatus_[fd] &= ~(int)flag;
310
311         struct epoll_event ev;
312         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
313
314         ev.events = EPOLLET;
315         ev.data.fd = fd;
316
317         if (fdstatus_[fd] & CB_RDONLY) {
318                 ev.events |= EPOLLIN;
319         }
320         if (fdstatus_[fd] & CB_WRONLY) {
321                 ev.events |= EPOLLOUT;
322         }
323
324         if (flag == CB_RDWR) {
325                 VERIFY(op == EPOLL_CTL_DEL);
326         }
327         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
328         return (op == EPOLL_CTL_DEL);
329 }
330
331 bool
332 EPollAIO::is_watched(int fd, poll_flag flag)
333 {
334         VERIFY(fd < MAX_POLL_FDS);
335         return ((fdstatus_[fd] & CB_MASK) == flag);
336 }
337
338 void
339 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
340 {
341         int nfds = epoll_wait(pollfd_, ready_,  MAX_POLL_FDS, -1);
342         for (int i = 0; i < nfds; i++) {
343                 if (ready_[i].events & EPOLLIN) {
344                         readable->push_back(ready_[i].data.fd);
345                 }
346                 if (ready_[i].events & EPOLLOUT) {
347                         writable->push_back(ready_[i].data.fd);
348                 }
349         }
350 }
351
352 #endif