023a7aac2b2c6dd0bef630d782fe37ec836d36d6
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include "types.h"
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <unistd.h>
5
6 #include "jsl_log.h"
7 #include "pollmgr.h"
8
9 PollMgr *PollMgr::instance = NULL;
10 static std::once_flag pollmgr_is_initialized;
11
12 static void
13 PollMgrInit()
14 {
15         PollMgr::instance = new PollMgr();
16 }
17
18 PollMgr *
19 PollMgr::Instance()
20 {
21     std::call_once(pollmgr_is_initialized, PollMgrInit);
22         return instance;
23 }
24
25 PollMgr::PollMgr() : pending_change_(false)
26 {
27         bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
28         aio_ = new SelectAIO();
29         //aio_ = new EPollAIO();
30
31     th_ = std::thread(&PollMgr::wait_loop, this);
32 }
33
34 PollMgr::~PollMgr() [[noreturn]]
35 {
36         //never kill me!!!
37         VERIFY(0);
38 }
39
40 void
41 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
42 {
43         VERIFY(fd < MAX_POLL_FDS);
44
45     lock ml(m_);
46         aio_->watch_fd(fd, flag);
47
48         VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
49         callbacks_[fd] = ch;
50 }
51
52 //remove all callbacks related to fd
53 //the return guarantees that callbacks related to fd
54 //will never be called again
55 void
56 PollMgr::block_remove_fd(int fd)
57 {
58     lock ml(m_);
59         aio_->unwatch_fd(fd, CB_RDWR);
60         pending_change_ = true;
61     changedone_c_.wait(ml);
62         callbacks_[fd] = NULL;
63 }
64
65 void
66 PollMgr::del_callback(int fd, poll_flag flag)
67 {
68     lock ml(m_);
69         if (aio_->unwatch_fd(fd, flag)) {
70                 callbacks_[fd] = NULL;
71         }
72 }
73
74 bool
75 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
76 {
77     lock ml(m_);
78         if (!callbacks_[fd] || callbacks_[fd]!=c)
79                 return false;
80
81         return aio_->is_watched(fd, flag);
82 }
83
84 void
85 PollMgr::wait_loop() [[noreturn]]
86 {
87
88         std::vector<int> readable;
89         std::vector<int> writable;
90
91         while (1) {
92                 {
93             lock ml(m_);
94                         if (pending_change_) {
95                                 pending_change_ = false;
96                 changedone_c_.notify_all();
97                         }
98                 }
99                 readable.clear();
100                 writable.clear();
101                 aio_->wait_ready(&readable,&writable);
102
103                 if (!readable.size() && !writable.size()) {
104                         continue;
105                 } 
106                 //no locking of m_
107                 //because no add_callback() and del_callback should 
108                 //modify callbacks_[fd] while the fd is not dead
109                 for (unsigned int i = 0; i < readable.size(); i++) {
110                         int fd = readable[i];
111                         if (callbacks_[fd])
112                                 callbacks_[fd]->read_cb(fd);
113                 }
114
115                 for (unsigned int i = 0; i < writable.size(); i++) {
116                         int fd = writable[i];
117                         if (callbacks_[fd])
118                                 callbacks_[fd]->write_cb(fd);
119                 }
120         }
121 }
122
123 SelectAIO::SelectAIO() : highfds_(0)
124 {
125         FD_ZERO(&rfds_);
126         FD_ZERO(&wfds_);
127
128         VERIFY(pipe(pipefd_) == 0);
129         FD_SET(pipefd_[0], &rfds_);
130         highfds_ = pipefd_[0];
131
132         int flags = fcntl(pipefd_[0], F_GETFL, NULL);
133         flags |= O_NONBLOCK;
134         fcntl(pipefd_[0], F_SETFL, flags);
135 }
136
137 SelectAIO::~SelectAIO()
138 {
139 }
140
141 void
142 SelectAIO::watch_fd(int fd, poll_flag flag)
143 {
144     lock ml(m_);
145         if (highfds_ <= fd) 
146                 highfds_ = fd;
147
148         if (flag == CB_RDONLY) {
149                 FD_SET(fd,&rfds_);
150         }else if (flag == CB_WRONLY) {
151                 FD_SET(fd,&wfds_);
152         }else {
153                 FD_SET(fd,&rfds_);
154                 FD_SET(fd,&wfds_);
155         }
156
157         char tmp = 1;
158         VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
159 }
160
161 bool
162 SelectAIO::is_watched(int fd, poll_flag flag)
163 {
164     lock ml(m_);
165         if (flag == CB_RDONLY) {
166                 return FD_ISSET(fd,&rfds_);
167         }else if (flag == CB_WRONLY) {
168                 return FD_ISSET(fd,&wfds_);
169         }else{
170                 return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
171         }
172 }
173
174 bool 
175 SelectAIO::unwatch_fd(int fd, poll_flag flag)
176 {
177     lock ml(m_);
178         if (flag == CB_RDONLY) {
179                 FD_CLR(fd, &rfds_);
180         }else if (flag == CB_WRONLY) {
181                 FD_CLR(fd, &wfds_);
182         }else if (flag == CB_RDWR) {
183                 FD_CLR(fd, &wfds_);
184                 FD_CLR(fd, &rfds_);
185         }else{
186                 VERIFY(0);
187         }
188
189         if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
190                 if (fd == highfds_) {
191                         int newh = pipefd_[0];
192                         for (int i = 0; i <= highfds_; i++) {
193                                 if (FD_ISSET(i, &rfds_)) {
194                                         newh = i;
195                                 }else if (FD_ISSET(i, &wfds_)) {
196                                         newh = i;
197                                 }
198                         }
199                         highfds_ = newh;
200                 }
201         }
202         if (flag == CB_RDWR) {
203                 char tmp = 1;
204                 VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
205         }
206         return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
207 }
208
209 void
210 SelectAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
211 {
212         fd_set trfds, twfds;
213         int high;
214
215         {
216         lock ml(m_);
217                 trfds = rfds_;
218                 twfds = wfds_;
219                 high = highfds_;
220         }
221
222         int ret = select(high+1, &trfds, &twfds, NULL, NULL);
223
224         if (ret < 0) {
225                 if (errno == EINTR) {
226                         return;
227                 } else {
228                         perror("select:");
229                         jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno);
230                         VERIFY(0);
231                 }
232         }
233
234         for (int fd = 0; fd <= high; fd++) {
235                 if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
236                         char tmp;
237                         VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
238                         VERIFY(tmp==1);
239                 }else {
240                         if (FD_ISSET(fd, &twfds)) {
241                                 writable->push_back(fd);
242                         }
243                         if (FD_ISSET(fd, &trfds)) {
244                                 readable->push_back(fd);
245                         }
246                 }
247         }
248 }
249
250 #ifdef __linux__ 
251
252 EPollAIO::EPollAIO()
253 {
254         pollfd_ = epoll_create(MAX_POLL_FDS);
255         VERIFY(pollfd_ >= 0);
256         bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
257 }
258
259 EPollAIO::~EPollAIO()
260 {
261         close(pollfd_);
262 }
263
264 static inline
265 int poll_flag_to_event(poll_flag flag)
266 {
267         int f;
268         if (flag == CB_RDONLY) {
269                 f = EPOLLIN;
270         }else if (flag == CB_WRONLY) {
271                 f = EPOLLOUT;
272         }else { //flag == CB_RDWR
273                 f = EPOLLIN | EPOLLOUT;
274         }
275         return f;
276 }
277
278 void
279 EPollAIO::watch_fd(int fd, poll_flag flag)
280 {
281         VERIFY(fd < MAX_POLL_FDS);
282
283         struct epoll_event ev;
284         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
285         fdstatus_[fd] |= (int)flag;
286
287         ev.events = EPOLLET;
288         ev.data.fd = fd;
289
290         if (fdstatus_[fd] & CB_RDONLY) {
291                 ev.events |= EPOLLIN;
292         }
293         if (fdstatus_[fd] & CB_WRONLY) {
294                 ev.events |= EPOLLOUT;
295         }
296
297         if (flag == CB_RDWR) {
298                 VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
299         }
300
301         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
302 }
303
304 bool 
305 EPollAIO::unwatch_fd(int fd, poll_flag flag)
306 {
307         VERIFY(fd < MAX_POLL_FDS);
308         fdstatus_[fd] &= ~(int)flag;
309
310         struct epoll_event ev;
311         int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
312
313         ev.events = EPOLLET;
314         ev.data.fd = fd;
315
316         if (fdstatus_[fd] & CB_RDONLY) {
317                 ev.events |= EPOLLIN;
318         }
319         if (fdstatus_[fd] & CB_WRONLY) {
320                 ev.events |= EPOLLOUT;
321         }
322
323         if (flag == CB_RDWR) {
324                 VERIFY(op == EPOLL_CTL_DEL);
325         }
326         VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
327         return (op == EPOLL_CTL_DEL);
328 }
329
330 bool
331 EPollAIO::is_watched(int fd, poll_flag flag)
332 {
333         VERIFY(fd < MAX_POLL_FDS);
334         return ((fdstatus_[fd] & CB_MASK) == flag);
335 }
336
337 void
338 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
339 {
340         int nfds = epoll_wait(pollfd_, ready_,  MAX_POLL_FDS, -1);
341         for (int i = 0; i < nfds; i++) {
342                 if (ready_[i].events & EPOLLIN) {
343                         readable->push_back(ready_[i].data.fd);
344                 }
345                 if (ready_[i].events & EPOLLOUT) {
346                         writable->push_back(ready_[i].data.fd);
347                 }
348         }
349 }
350
351 #endif