Got rid of most using directives. Ported tests to python.
[invirt/third/libt4.git] / rpc / poll_mgr.cc
1 #include "poll_mgr.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "file.h"
5
6 #ifdef __linux__
7 #include <sys/epoll.h>
8 #endif
9
10 using std::vector;
11
12 aio_callback::~aio_callback() {}
13
14 poll_mgr poll_mgr::shared_mgr;
15
16 class wait_manager {
17     public:
18         virtual void watch_fd(int fd, poll_flag flag) = 0;
19         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21         virtual ~wait_manager() noexcept;
22 };
23
24 wait_manager::~wait_manager() noexcept {}
25
26 class SelectAIO : public wait_manager {
27     public :
28         SelectAIO();
29         ~SelectAIO() {}
30         void watch_fd(int fd, poll_flag flag);
31         bool unwatch_fd(int fd, poll_flag flag);
32         void wait_ready(vector<int> & readable, vector<int> & writable);
33
34     private:
35         fd_set rfds_, wfds_;
36         int highfds_;
37         file_t pipe_[2];
38         std::mutex m_;
39 };
40
41 #ifdef __linux__ 
42 class EPollAIO : public wait_manager {
43     public:
44         EPollAIO() {}
45         ~EPollAIO() throw() { }
46         void watch_fd(int fd, poll_flag flag);
47         bool unwatch_fd(int fd, poll_flag flag);
48         void wait_ready(vector<int> & readable, vector<int> & writable);
49
50     private:
51         file_t poll_ = epoll_create(MAX_POLL_FDS);
52         struct epoll_event ready_[MAX_POLL_FDS];
53         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
54 };
55 #endif
56
57
58 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
59     th_ = thread(&poll_mgr::wait_loop, this);
60 }
61
62 poll_mgr::~poll_mgr()
63 {
64     lock ml(m_);
65     for (auto p : callbacks_)
66         aio_->unwatch_fd(p.first, CB_RDWR);
67     pending_change_ = true;
68     shutdown_ = true;
69     changedone_c_.wait(ml);
70     aio_ = nullptr;
71     th_.join();
72 }
73
74 void
75 poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
76 {
77     lock ml(m_);
78     aio_->watch_fd(fd, flag);
79
80     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
81     callbacks_[fd] = ch;
82 }
83
84 // Remove all callbacks related to fd.  After this returns, we guarantee that
85 // callbacks related to fd will never be called again.
86 void poll_mgr::block_remove_fd(int fd) {
87     lock ml(m_);
88     aio_->unwatch_fd(fd, CB_RDWR);
89     pending_change_ = true;
90     changedone_c_.wait(ml);
91     callbacks_[fd] = nullptr;
92 }
93
94 void poll_mgr::del_callback(int fd, poll_flag flag) {
95     lock ml(m_);
96     if (aio_->unwatch_fd(fd, flag))
97         callbacks_[fd] = nullptr;
98 }
99
100 void poll_mgr::wait_loop() {
101     vector<int> readable;
102     vector<int> writable;
103     aio_callback * cb;
104
105     while (1) {
106         {
107             lock ml(m_);
108             if (pending_change_) {
109                 pending_change_ = false;
110                 changedone_c_.notify_all();
111                 if (shutdown_)
112                     break;
113             }
114         }
115         readable.clear();
116         writable.clear();
117         aio_->wait_ready(readable, writable);
118
119         for (auto fd : readable) {
120             { lock ml(m_); cb = callbacks_[fd]; }
121             if (cb) cb->read_cb(fd);
122         }
123
124         for (auto fd : writable) {
125             { lock ml(m_); cb = callbacks_[fd]; }
126             if (cb) cb->write_cb(fd);
127         }
128     }
129 }
130
131 SelectAIO::SelectAIO()
132 {
133     FD_ZERO(&rfds_);
134     FD_ZERO(&wfds_);
135
136     file_t::pipe(pipe_);
137
138     FD_SET(pipe_[0], &rfds_);
139     highfds_ = pipe_[0];
140
141     pipe_[0].flags() |= O_NONBLOCK;
142 }
143
144 void SelectAIO::watch_fd(int fd, poll_flag flag) {
145     VERIFY(fd < MAX_POLL_FDS);
146
147     lock ml(m_);
148     if (highfds_ <= fd) 
149         highfds_ = fd;
150
151     if (flag & CB_RDONLY)
152         FD_SET(fd,&rfds_);
153
154     if (flag & CB_WRONLY)
155         FD_SET(fd,&wfds_);
156
157     VERIFY(pipe_[1].write((char)1)==1);
158 }
159
160 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
161     VERIFY(fd < MAX_POLL_FDS);
162
163     lock ml(m_);
164     VERIFY((flag & ~CB_RDWR) == 0);
165     if (flag & CB_RDONLY)
166         FD_CLR(fd, &rfds_);
167     if (flag & CB_WRONLY)
168         FD_CLR(fd, &wfds_);
169
170     int newh = pipe_[0];
171     for (int i = 0; i <= highfds_; i++) {
172         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
173             newh = i;
174     }
175     highfds_ = newh;
176
177     if (flag == CB_RDWR)
178         VERIFY(pipe_[1].write((char)1)==1);
179
180     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
181 }
182
183 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
184
185     fd_set trfds, twfds;
186     int high;
187
188     {
189         lock ml(m_);
190         trfds = rfds_;
191         twfds = wfds_;
192         high = highfds_;
193     }
194
195     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
196
197     if (ret < 0 && errno == EINTR)
198         return;
199     else if (ret < 0) {
200         perror("select:");
201         IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
202         VERIFY(0);
203     }
204
205     for (int fd = 0; fd <= high; fd++) {
206         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
207             char tmp;
208             VERIFY(pipe_[0].read(tmp)==1);
209             VERIFY(tmp==1);
210         } else {
211             if (FD_ISSET(fd, &twfds))
212                 writable.push_back(fd);
213
214             if (FD_ISSET(fd, &trfds))
215                 readable.push_back(fd);
216         }
217     }
218 }
219
220 #ifdef __linux__ 
221
222 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
223     size_t fd = (size_t)fd_;
224
225     VERIFY(fd < MAX_POLL_FDS);
226
227     struct epoll_event ev;
228     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
229     fdstatus_[fd] |= (unsigned)flag;
230
231     ev.events = EPOLLET;
232     ev.data.fd = fd_;
233
234     if (fdstatus_[fd] & CB_RDONLY)
235         ev.events |= EPOLLIN;
236
237     if (fdstatus_[fd] & CB_WRONLY)
238         ev.events |= EPOLLOUT;
239
240     if (flag == CB_RDWR)
241         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
242
243     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
244 }
245
246 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
247     size_t fd = (size_t)fd_;
248
249     VERIFY(fd < MAX_POLL_FDS);
250     fdstatus_[fd] &= ~(unsigned)flag;
251
252     struct epoll_event ev;
253     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
254
255     ev.events = EPOLLET;
256     ev.data.fd = fd_;
257
258     if (fdstatus_[fd] & CB_RDONLY)
259         ev.events |= EPOLLIN;
260
261     if (fdstatus_[fd] & CB_WRONLY)
262         ev.events |= EPOLLOUT;
263
264     if (flag == CB_RDWR)
265         VERIFY(op == EPOLL_CTL_DEL);
266
267     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
268     return (op == EPOLL_CTL_DEL);
269 }
270
271 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
272
273     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
274     for (int i = 0; i < nfds; i++) {
275         if (ready_[i].events & EPOLLIN)
276             readable.push_back(ready_[i].data.fd);
277
278         if (ready_[i].events & EPOLLOUT)
279             writable.push_back(ready_[i].data.fd);
280     }
281 }
282
283 #endif