Includes cleanups
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include "pollmgr.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "file.h"
5
6 #ifdef __linux__
7 #include <sys/epoll.h>
8 #endif
9
10 static PollMgr instance;
11
12 PollMgr & PollMgr::Instance() { return instance; }
13
14 class wait_manager {
15     public:
16         virtual void watch_fd(int fd, poll_flag flag) = 0;
17         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
18         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
19         virtual ~wait_manager() throw() {}
20 };
21
22 class SelectAIO : public wait_manager {
23     public :
24         SelectAIO();
25         ~SelectAIO() {}
26         void watch_fd(int fd, poll_flag flag);
27         bool unwatch_fd(int fd, poll_flag flag);
28         void wait_ready(vector<int> & readable, vector<int> & writable);
29
30     private:
31         fd_set rfds_, wfds_;
32         int highfds_;
33         file_t pipe_[2];
34         mutex m_;
35 };
36
37 #ifdef __linux__ 
38 class EPollAIO : public wait_manager {
39     public:
40         EPollAIO() {}
41         ~EPollAIO() throw() { }
42         void watch_fd(int fd, poll_flag flag);
43         bool unwatch_fd(int fd, poll_flag flag);
44         void wait_ready(vector<int> & readable, vector<int> & writable);
45
46     private:
47         file_t poll_ = epoll_create(MAX_POLL_FDS);
48         struct epoll_event ready_[MAX_POLL_FDS];
49         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
50 };
51 #endif
52
53
54 PollMgr::PollMgr() : aio_(new SelectAIO()) {
55     th_ = thread(&PollMgr::wait_loop, this);
56 }
57
58 PollMgr::~PollMgr()
59 {
60     lock ml(m_);
61     for (auto p : callbacks_)
62         aio_->unwatch_fd(p.first, CB_RDWR);
63     pending_change_ = true;
64     shutdown_ = true;
65     changedone_c_.wait(ml);
66     delete aio_;
67     th_.join();
68 }
69
70 void
71 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
72 {
73     lock ml(m_);
74     aio_->watch_fd(fd, flag);
75
76     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
77     callbacks_[fd] = ch;
78 }
79
80 // Remove all callbacks related to fd.  After this returns, we guarantee that
81 // callbacks related to fd will never be called again.
82 void PollMgr::block_remove_fd(int fd) {
83     lock ml(m_);
84     aio_->unwatch_fd(fd, CB_RDWR);
85     pending_change_ = true;
86     changedone_c_.wait(ml);
87     callbacks_[fd] = nullptr;
88 }
89
90 void PollMgr::del_callback(int fd, poll_flag flag) {
91     lock ml(m_);
92     if (aio_->unwatch_fd(fd, flag))
93         callbacks_[fd] = nullptr;
94 }
95
96 void PollMgr::wait_loop() {
97     vector<int> readable;
98     vector<int> writable;
99     aio_callback * cb;
100
101     while (1) {
102         {
103             lock ml(m_);
104             if (pending_change_) {
105                 pending_change_ = false;
106                 changedone_c_.notify_all();
107                 if (shutdown_)
108                     break;
109             }
110         }
111         readable.clear();
112         writable.clear();
113         aio_->wait_ready(readable, writable);
114
115         for (auto fd : readable) {
116             { lock ml(m_); cb = callbacks_[fd]; }
117             if (cb) cb->read_cb(fd);
118         }
119
120         for (auto fd : writable) {
121             { lock ml(m_); cb = callbacks_[fd]; }
122             if (cb) cb->write_cb(fd);
123         }
124     }
125 }
126
127 SelectAIO::SelectAIO()
128 {
129     FD_ZERO(&rfds_);
130     FD_ZERO(&wfds_);
131
132     file_t::pipe(pipe_);
133
134     FD_SET(pipe_[0], &rfds_);
135     highfds_ = pipe_[0];
136
137     pipe_[0].flags() |= O_NONBLOCK;
138 }
139
140 void SelectAIO::watch_fd(int fd, poll_flag flag) {
141     VERIFY(fd < MAX_POLL_FDS);
142
143     lock ml(m_);
144     if (highfds_ <= fd) 
145         highfds_ = fd;
146
147     if (flag & CB_RDONLY)
148         FD_SET(fd,&rfds_);
149
150     if (flag & CB_WRONLY)
151         FD_SET(fd,&wfds_);
152
153     VERIFY(pipe_[1].write((char)1)==1);
154 }
155
156 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
157     VERIFY(fd < MAX_POLL_FDS);
158
159     lock ml(m_);
160     VERIFY((flag & ~CB_RDWR) == 0);
161     if (flag & CB_RDONLY)
162         FD_CLR(fd, &rfds_);
163     if (flag & CB_WRONLY)
164         FD_CLR(fd, &wfds_);
165
166     int newh = pipe_[0];
167     for (int i = 0; i <= highfds_; i++) {
168         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
169             newh = i;
170     }
171     highfds_ = newh;
172
173     if (flag == CB_RDWR)
174         VERIFY(pipe_[1].write((char)1)==1);
175
176     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
177 }
178
179 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
180
181     fd_set trfds, twfds;
182     int high;
183
184     {
185         lock ml(m_);
186         trfds = rfds_;
187         twfds = wfds_;
188         high = highfds_;
189     }
190
191     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
192
193     if (ret < 0 && errno == EINTR)
194         return;
195     else if (ret < 0) {
196         perror("select:");
197         IF_LEVEL(0) LOG("select_loop failure errno " << errno);
198         VERIFY(0);
199     }
200
201     for (int fd = 0; fd <= high; fd++) {
202         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
203             char tmp;
204             VERIFY(pipe_[0].read(tmp)==1);
205             VERIFY(tmp==1);
206         } else {
207             if (FD_ISSET(fd, &twfds))
208                 writable.push_back(fd);
209
210             if (FD_ISSET(fd, &trfds))
211                 readable.push_back(fd);
212         }
213     }
214 }
215
216 #ifdef __linux__ 
217
218 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
219     size_t fd = (size_t)fd_;
220
221     VERIFY(fd < MAX_POLL_FDS);
222
223     struct epoll_event ev;
224     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
225     fdstatus_[fd] |= (unsigned)flag;
226
227     ev.events = EPOLLET;
228     ev.data.fd = fd_;
229
230     if (fdstatus_[fd] & CB_RDONLY)
231         ev.events |= EPOLLIN;
232
233     if (fdstatus_[fd] & CB_WRONLY)
234         ev.events |= EPOLLOUT;
235
236     if (flag == CB_RDWR)
237         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
238
239     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
240 }
241
242 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
243     size_t fd = (size_t)fd_;
244
245     VERIFY(fd < MAX_POLL_FDS);
246     fdstatus_[fd] &= ~(unsigned)flag;
247
248     struct epoll_event ev;
249     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
250
251     ev.events = EPOLLET;
252     ev.data.fd = fd_;
253
254     if (fdstatus_[fd] & CB_RDONLY)
255         ev.events |= EPOLLIN;
256
257     if (fdstatus_[fd] & CB_WRONLY)
258         ev.events |= EPOLLOUT;
259
260     if (flag == CB_RDWR)
261         VERIFY(op == EPOLL_CTL_DEL);
262
263     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
264     return (op == EPOLL_CTL_DEL);
265 }
266
267 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
268
269     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
270     for (int i = 0; i < nfds; i++) {
271         if (ready_[i].events & EPOLLIN)
272             readable.push_back(ready_[i].data.fd);
273
274         if (ready_[i].events & EPOLLOUT)
275             writable.push_back(ready_[i].data.fd);
276     }
277 }
278
279 #endif