So many changes. Broken.
[invirt/third/libt4.git] / rpc / poll_mgr.cc
1 #include "include/rpc/poll_mgr.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "include/rpc/file.h"
5 #include "include/debug.h"
6
7 #ifdef __linux__
8 #include <sys/epoll.h>
9 #endif
10
11 using std::vector;
12
13 aio_callback::~aio_callback() {}
14
15 class wait_manager {
16     public:
17         virtual void watch_fd(int fd, poll_flag flag) = 0;
18         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
19         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
20         virtual ~wait_manager() noexcept;
21 };
22
23 wait_manager::~wait_manager() noexcept {}
24
25 class SelectAIO : public wait_manager {
26     public :
27         SelectAIO();
28         ~SelectAIO() {}
29         void watch_fd(int fd, poll_flag flag);
30         bool unwatch_fd(int fd, poll_flag flag);
31         void wait_ready(vector<int> & readable, vector<int> & writable);
32
33     private:
34         fd_set rfds_, wfds_;
35         int highfds_;
36         file_t pipe_[2];
37         std::mutex m_;
38 };
39
40 #ifdef __linux__ 
41 class EPollAIO : public wait_manager {
42     public:
43         EPollAIO() {}
44         ~EPollAIO() throw() { }
45         void watch_fd(int fd, poll_flag flag);
46         bool unwatch_fd(int fd, poll_flag flag);
47         void wait_ready(vector<int> & readable, vector<int> & writable);
48
49     private:
50         file_t poll_ = epoll_create(MAX_POLL_FDS);
51         struct epoll_event ready_[MAX_POLL_FDS];
52         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
53 };
54 #endif
55
56
57 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
58     th_ = thread(&poll_mgr::wait_loop, this);
59 }
60
61 poll_mgr::~poll_mgr() {
62     shutdown();
63 }
64
65 void poll_mgr::shutdown() {
66     lock ml(m_);
67     if (shutdown_)
68         return;
69     for (auto p : callbacks_)
70         aio_->unwatch_fd(p.first, CB_RDWR);
71     pending_change_ = true;
72     shutdown_ = true;
73     changedone_c_.wait(ml);
74     aio_ = nullptr;
75     th_.join();
76 }
77
78 void poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch) {
79     lock ml(m_);
80     aio_->watch_fd(fd, flag);
81
82     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
83     callbacks_[fd] = ch;
84 }
85
86 // Remove all callbacks related to fd.  After this returns, we guarantee that
87 // callbacks related to fd will never be called again.
88 void poll_mgr::block_remove_fd(int fd) {
89     lock ml(m_);
90     aio_->unwatch_fd(fd, CB_RDWR);
91     pending_change_ = true;
92     changedone_c_.wait(ml);
93     callbacks_[fd] = nullptr;
94 }
95
96 void poll_mgr::del_callback(int fd, poll_flag flag) {
97     lock ml(m_);
98     if (aio_->unwatch_fd(fd, flag))
99         callbacks_[fd] = nullptr;
100 }
101
102 void poll_mgr::wait_loop() {
103     vector<int> readable;
104     vector<int> writable;
105     aio_callback * cb;
106
107     while (1) {
108         {
109             lock ml(m_);
110             if (pending_change_) {
111                 pending_change_ = false;
112                 changedone_c_.notify_all();
113                 if (shutdown_)
114                     break;
115             }
116         }
117         readable.clear();
118         writable.clear();
119         aio_->wait_ready(readable, writable);
120
121         for (auto fd : readable) {
122             { lock ml(m_); cb = callbacks_[fd]; }
123             if (cb) cb->read_cb(fd);
124         }
125
126         for (auto fd : writable) {
127             { lock ml(m_); cb = callbacks_[fd]; }
128             if (cb) cb->write_cb(fd);
129         }
130     }
131 }
132
133 SelectAIO::SelectAIO()
134 {
135     FD_ZERO(&rfds_);
136     FD_ZERO(&wfds_);
137
138     file_t::pipe(pipe_);
139
140     FD_SET(pipe_[0], &rfds_);
141     highfds_ = pipe_[0];
142
143     pipe_[0].flags() |= O_NONBLOCK;
144 }
145
146 void SelectAIO::watch_fd(int fd, poll_flag flag) {
147     VERIFY(fd < MAX_POLL_FDS);
148
149     lock ml(m_);
150     if (highfds_ <= fd) 
151         highfds_ = fd;
152
153     if (flag & CB_RDONLY)
154         FD_SET(fd,&rfds_);
155
156     if (flag & CB_WRONLY)
157         FD_SET(fd,&wfds_);
158
159     VERIFY(pipe_[1].write((char)1)==1);
160 }
161
162 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
163     VERIFY(fd < MAX_POLL_FDS);
164
165     lock ml(m_);
166     VERIFY((flag & ~CB_RDWR) == 0);
167     if (flag & CB_RDONLY)
168         FD_CLR(fd, &rfds_);
169     if (flag & CB_WRONLY)
170         FD_CLR(fd, &wfds_);
171
172     int newh = pipe_[0];
173     for (int i = 0; i <= highfds_; i++) {
174         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
175             newh = i;
176     }
177     highfds_ = newh;
178
179     if (flag == CB_RDWR)
180         VERIFY(pipe_[1].write((char)1)==1);
181
182     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
183 }
184
185 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
186
187     fd_set trfds, twfds;
188     int high;
189
190     {
191         lock ml(m_);
192         trfds = rfds_;
193         twfds = wfds_;
194         high = highfds_;
195     }
196
197     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
198
199     if (ret < 0 && errno == EINTR)
200         return;
201     else if (ret < 0) {
202         perror("select:");
203         IF_LEVEL(0) LOG << "select_loop failure errno " << errno;
204         VERIFY(0);
205     }
206
207     for (int fd = 0; fd <= high; fd++) {
208         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
209             char tmp;
210             VERIFY(pipe_[0].read(tmp)==1);
211             VERIFY(tmp==1);
212         } else {
213             if (FD_ISSET(fd, &twfds))
214                 writable.push_back(fd);
215
216             if (FD_ISSET(fd, &trfds))
217                 readable.push_back(fd);
218         }
219     }
220 }
221
222 #ifdef __linux__ 
223
224 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
225     size_t fd = (size_t)fd_;
226
227     VERIFY(fd < MAX_POLL_FDS);
228
229     struct epoll_event ev;
230     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
231     fdstatus_[fd] |= (unsigned)flag;
232
233     ev.events = EPOLLET;
234     ev.data.fd = fd_;
235
236     if (fdstatus_[fd] & CB_RDONLY)
237         ev.events |= EPOLLIN;
238
239     if (fdstatus_[fd] & CB_WRONLY)
240         ev.events |= EPOLLOUT;
241
242     if (flag == CB_RDWR)
243         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
244
245     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
246 }
247
248 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
249     size_t fd = (size_t)fd_;
250
251     VERIFY(fd < MAX_POLL_FDS);
252     fdstatus_[fd] &= ~(unsigned)flag;
253
254     struct epoll_event ev;
255     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
256
257     ev.events = EPOLLET;
258     ev.data.fd = fd_;
259
260     if (fdstatus_[fd] & CB_RDONLY)
261         ev.events |= EPOLLIN;
262
263     if (fdstatus_[fd] & CB_WRONLY)
264         ev.events |= EPOLLOUT;
265
266     if (flag == CB_RDWR)
267         VERIFY(op == EPOLL_CTL_DEL);
268
269     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
270     return (op == EPOLL_CTL_DEL);
271 }
272
273 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
274
275     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
276     for (int i = 0; i < nfds; i++) {
277         if (ready_[i].events & EPOLLIN)
278             readable.push_back(ready_[i].data.fd);
279
280         if (ready_[i].events & EPOLLOUT)
281             writable.push_back(ready_[i].data.fd);
282     }
283 }
284
285 #endif