Global destructor clean-ups and python test fixes
[invirt/third/libt4.git] / rpc / poll_mgr.cc
1 #include "poll_mgr.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "file.h"
5
6 #ifdef __linux__
7 #include <sys/epoll.h>
8 #endif
9
10 using std::vector;
11
12 aio_callback::~aio_callback() {}
13
14 poll_mgr poll_mgr::shared_mgr;
15
16 class wait_manager {
17     public:
18         virtual void watch_fd(int fd, poll_flag flag) = 0;
19         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21         virtual ~wait_manager() noexcept;
22 };
23
24 wait_manager::~wait_manager() noexcept {}
25
26 class SelectAIO : public wait_manager {
27     public :
28         SelectAIO();
29         ~SelectAIO() {}
30         void watch_fd(int fd, poll_flag flag);
31         bool unwatch_fd(int fd, poll_flag flag);
32         void wait_ready(vector<int> & readable, vector<int> & writable);
33
34     private:
35         fd_set rfds_, wfds_;
36         int highfds_;
37         file_t pipe_[2];
38         std::mutex m_;
39 };
40
41 #ifdef __linux__ 
42 class EPollAIO : public wait_manager {
43     public:
44         EPollAIO() {}
45         ~EPollAIO() throw() { }
46         void watch_fd(int fd, poll_flag flag);
47         bool unwatch_fd(int fd, poll_flag flag);
48         void wait_ready(vector<int> & readable, vector<int> & writable);
49
50     private:
51         file_t poll_ = epoll_create(MAX_POLL_FDS);
52         struct epoll_event ready_[MAX_POLL_FDS];
53         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
54 };
55 #endif
56
57
58 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
59     th_ = thread(&poll_mgr::wait_loop, this);
60 }
61
62 poll_mgr::~poll_mgr() {
63     shutdown();
64 }
65
66 void poll_mgr::shutdown() {
67     lock ml(m_);
68     if (shutdown_)
69         return;
70     for (auto p : callbacks_)
71         aio_->unwatch_fd(p.first, CB_RDWR);
72     pending_change_ = true;
73     shutdown_ = true;
74     changedone_c_.wait(ml);
75     aio_ = nullptr;
76     th_.join();
77 }
78
79 void poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch) {
80     lock ml(m_);
81     aio_->watch_fd(fd, flag);
82
83     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
84     callbacks_[fd] = ch;
85 }
86
87 // Remove all callbacks related to fd.  After this returns, we guarantee that
88 // callbacks related to fd will never be called again.
89 void poll_mgr::block_remove_fd(int fd) {
90     lock ml(m_);
91     aio_->unwatch_fd(fd, CB_RDWR);
92     pending_change_ = true;
93     changedone_c_.wait(ml);
94     callbacks_[fd] = nullptr;
95 }
96
97 void poll_mgr::del_callback(int fd, poll_flag flag) {
98     lock ml(m_);
99     if (aio_->unwatch_fd(fd, flag))
100         callbacks_[fd] = nullptr;
101 }
102
103 void poll_mgr::wait_loop() {
104     vector<int> readable;
105     vector<int> writable;
106     aio_callback * cb;
107
108     while (1) {
109         {
110             lock ml(m_);
111             if (pending_change_) {
112                 pending_change_ = false;
113                 changedone_c_.notify_all();
114                 if (shutdown_)
115                     break;
116             }
117         }
118         readable.clear();
119         writable.clear();
120         aio_->wait_ready(readable, writable);
121
122         for (auto fd : readable) {
123             { lock ml(m_); cb = callbacks_[fd]; }
124             if (cb) cb->read_cb(fd);
125         }
126
127         for (auto fd : writable) {
128             { lock ml(m_); cb = callbacks_[fd]; }
129             if (cb) cb->write_cb(fd);
130         }
131     }
132 }
133
134 SelectAIO::SelectAIO()
135 {
136     FD_ZERO(&rfds_);
137     FD_ZERO(&wfds_);
138
139     file_t::pipe(pipe_);
140
141     FD_SET(pipe_[0], &rfds_);
142     highfds_ = pipe_[0];
143
144     pipe_[0].flags() |= O_NONBLOCK;
145 }
146
147 void SelectAIO::watch_fd(int fd, poll_flag flag) {
148     VERIFY(fd < MAX_POLL_FDS);
149
150     lock ml(m_);
151     if (highfds_ <= fd) 
152         highfds_ = fd;
153
154     if (flag & CB_RDONLY)
155         FD_SET(fd,&rfds_);
156
157     if (flag & CB_WRONLY)
158         FD_SET(fd,&wfds_);
159
160     VERIFY(pipe_[1].write((char)1)==1);
161 }
162
163 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
164     VERIFY(fd < MAX_POLL_FDS);
165
166     lock ml(m_);
167     VERIFY((flag & ~CB_RDWR) == 0);
168     if (flag & CB_RDONLY)
169         FD_CLR(fd, &rfds_);
170     if (flag & CB_WRONLY)
171         FD_CLR(fd, &wfds_);
172
173     int newh = pipe_[0];
174     for (int i = 0; i <= highfds_; i++) {
175         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
176             newh = i;
177     }
178     highfds_ = newh;
179
180     if (flag == CB_RDWR)
181         VERIFY(pipe_[1].write((char)1)==1);
182
183     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
184 }
185
186 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
187
188     fd_set trfds, twfds;
189     int high;
190
191     {
192         lock ml(m_);
193         trfds = rfds_;
194         twfds = wfds_;
195         high = highfds_;
196     }
197
198     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
199
200     if (ret < 0 && errno == EINTR)
201         return;
202     else if (ret < 0) {
203         perror("select:");
204         IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
205         VERIFY(0);
206     }
207
208     for (int fd = 0; fd <= high; fd++) {
209         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
210             char tmp;
211             VERIFY(pipe_[0].read(tmp)==1);
212             VERIFY(tmp==1);
213         } else {
214             if (FD_ISSET(fd, &twfds))
215                 writable.push_back(fd);
216
217             if (FD_ISSET(fd, &trfds))
218                 readable.push_back(fd);
219         }
220     }
221 }
222
223 #ifdef __linux__ 
224
225 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
226     size_t fd = (size_t)fd_;
227
228     VERIFY(fd < MAX_POLL_FDS);
229
230     struct epoll_event ev;
231     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
232     fdstatus_[fd] |= (unsigned)flag;
233
234     ev.events = EPOLLET;
235     ev.data.fd = fd_;
236
237     if (fdstatus_[fd] & CB_RDONLY)
238         ev.events |= EPOLLIN;
239
240     if (fdstatus_[fd] & CB_WRONLY)
241         ev.events |= EPOLLOUT;
242
243     if (flag == CB_RDWR)
244         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
245
246     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
247 }
248
249 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
250     size_t fd = (size_t)fd_;
251
252     VERIFY(fd < MAX_POLL_FDS);
253     fdstatus_[fd] &= ~(unsigned)flag;
254
255     struct epoll_event ev;
256     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
257
258     ev.events = EPOLLET;
259     ev.data.fd = fd_;
260
261     if (fdstatus_[fd] & CB_RDONLY)
262         ev.events |= EPOLLIN;
263
264     if (fdstatus_[fd] & CB_WRONLY)
265         ev.events |= EPOLLOUT;
266
267     if (flag == CB_RDWR)
268         VERIFY(op == EPOLL_CTL_DEL);
269
270     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
271     return (op == EPOLL_CTL_DEL);
272 }
273
274 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
275
276     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
277     for (int i = 0; i < nfds; i++) {
278         if (ready_[i].events & EPOLLIN)
279             readable.push_back(ready_[i].data.fd);
280
281         if (ready_[i].events & EPOLLOUT)
282             writable.push_back(ready_[i].data.fd);
283     }
284 }
285
286 #endif