94b24e3a4ccee8fb7ca571b9a5501cd856c04928
[invirt/third/libt4.git] / rpc / poll_mgr.cc
1 #include "poll_mgr.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "file.h"
5
6 #ifdef __linux__
7 #include <sys/epoll.h>
8 #endif
9
10 poll_mgr poll_mgr::shared_mgr;
11
12 class wait_manager {
13     public:
14         virtual void watch_fd(int fd, poll_flag flag) = 0;
15         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
16         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
17         virtual ~wait_manager() throw() {}
18 };
19
20 class SelectAIO : public wait_manager {
21     public :
22         SelectAIO();
23         ~SelectAIO() {}
24         void watch_fd(int fd, poll_flag flag);
25         bool unwatch_fd(int fd, poll_flag flag);
26         void wait_ready(vector<int> & readable, vector<int> & writable);
27
28     private:
29         fd_set rfds_, wfds_;
30         int highfds_;
31         file_t pipe_[2];
32         mutex m_;
33 };
34
35 #ifdef __linux__ 
36 class EPollAIO : public wait_manager {
37     public:
38         EPollAIO() {}
39         ~EPollAIO() throw() { }
40         void watch_fd(int fd, poll_flag flag);
41         bool unwatch_fd(int fd, poll_flag flag);
42         void wait_ready(vector<int> & readable, vector<int> & writable);
43
44     private:
45         file_t poll_ = epoll_create(MAX_POLL_FDS);
46         struct epoll_event ready_[MAX_POLL_FDS];
47         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
48 };
49 #endif
50
51
52 poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
53     th_ = thread(&poll_mgr::wait_loop, this);
54 }
55
56 poll_mgr::~poll_mgr()
57 {
58     lock ml(m_);
59     for (auto p : callbacks_)
60         aio_->unwatch_fd(p.first, CB_RDWR);
61     pending_change_ = true;
62     shutdown_ = true;
63     changedone_c_.wait(ml);
64     aio_ = nullptr;
65     th_.join();
66 }
67
68 void
69 poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
70 {
71     lock ml(m_);
72     aio_->watch_fd(fd, flag);
73
74     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
75     callbacks_[fd] = ch;
76 }
77
78 // Remove all callbacks related to fd.  After this returns, we guarantee that
79 // callbacks related to fd will never be called again.
80 void poll_mgr::block_remove_fd(int fd) {
81     lock ml(m_);
82     aio_->unwatch_fd(fd, CB_RDWR);
83     pending_change_ = true;
84     changedone_c_.wait(ml);
85     callbacks_[fd] = nullptr;
86 }
87
88 void poll_mgr::del_callback(int fd, poll_flag flag) {
89     lock ml(m_);
90     if (aio_->unwatch_fd(fd, flag))
91         callbacks_[fd] = nullptr;
92 }
93
94 void poll_mgr::wait_loop() {
95     vector<int> readable;
96     vector<int> writable;
97     aio_callback * cb;
98
99     while (1) {
100         {
101             lock ml(m_);
102             if (pending_change_) {
103                 pending_change_ = false;
104                 changedone_c_.notify_all();
105                 if (shutdown_)
106                     break;
107             }
108         }
109         readable.clear();
110         writable.clear();
111         aio_->wait_ready(readable, writable);
112
113         for (auto fd : readable) {
114             { lock ml(m_); cb = callbacks_[fd]; }
115             if (cb) cb->read_cb(fd);
116         }
117
118         for (auto fd : writable) {
119             { lock ml(m_); cb = callbacks_[fd]; }
120             if (cb) cb->write_cb(fd);
121         }
122     }
123 }
124
125 SelectAIO::SelectAIO()
126 {
127     FD_ZERO(&rfds_);
128     FD_ZERO(&wfds_);
129
130     file_t::pipe(pipe_);
131
132     FD_SET(pipe_[0], &rfds_);
133     highfds_ = pipe_[0];
134
135     pipe_[0].flags() |= O_NONBLOCK;
136 }
137
138 void SelectAIO::watch_fd(int fd, poll_flag flag) {
139     VERIFY(fd < MAX_POLL_FDS);
140
141     lock ml(m_);
142     if (highfds_ <= fd) 
143         highfds_ = fd;
144
145     if (flag & CB_RDONLY)
146         FD_SET(fd,&rfds_);
147
148     if (flag & CB_WRONLY)
149         FD_SET(fd,&wfds_);
150
151     VERIFY(pipe_[1].write((char)1)==1);
152 }
153
154 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
155     VERIFY(fd < MAX_POLL_FDS);
156
157     lock ml(m_);
158     VERIFY((flag & ~CB_RDWR) == 0);
159     if (flag & CB_RDONLY)
160         FD_CLR(fd, &rfds_);
161     if (flag & CB_WRONLY)
162         FD_CLR(fd, &wfds_);
163
164     int newh = pipe_[0];
165     for (int i = 0; i <= highfds_; i++) {
166         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
167             newh = i;
168     }
169     highfds_ = newh;
170
171     if (flag == CB_RDWR)
172         VERIFY(pipe_[1].write((char)1)==1);
173
174     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
175 }
176
177 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
178
179     fd_set trfds, twfds;
180     int high;
181
182     {
183         lock ml(m_);
184         trfds = rfds_;
185         twfds = wfds_;
186         high = highfds_;
187     }
188
189     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
190
191     if (ret < 0 && errno == EINTR)
192         return;
193     else if (ret < 0) {
194         perror("select:");
195         IF_LEVEL(0) LOG("select_loop failure errno " << errno);
196         VERIFY(0);
197     }
198
199     for (int fd = 0; fd <= high; fd++) {
200         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
201             char tmp;
202             VERIFY(pipe_[0].read(tmp)==1);
203             VERIFY(tmp==1);
204         } else {
205             if (FD_ISSET(fd, &twfds))
206                 writable.push_back(fd);
207
208             if (FD_ISSET(fd, &trfds))
209                 readable.push_back(fd);
210         }
211     }
212 }
213
214 #ifdef __linux__ 
215
216 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
217     size_t fd = (size_t)fd_;
218
219     VERIFY(fd < MAX_POLL_FDS);
220
221     struct epoll_event ev;
222     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
223     fdstatus_[fd] |= (unsigned)flag;
224
225     ev.events = EPOLLET;
226     ev.data.fd = fd_;
227
228     if (fdstatus_[fd] & CB_RDONLY)
229         ev.events |= EPOLLIN;
230
231     if (fdstatus_[fd] & CB_WRONLY)
232         ev.events |= EPOLLOUT;
233
234     if (flag == CB_RDWR)
235         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
236
237     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
238 }
239
240 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
241     size_t fd = (size_t)fd_;
242
243     VERIFY(fd < MAX_POLL_FDS);
244     fdstatus_[fd] &= ~(unsigned)flag;
245
246     struct epoll_event ev;
247     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
248
249     ev.events = EPOLLET;
250     ev.data.fd = fd_;
251
252     if (fdstatus_[fd] & CB_RDONLY)
253         ev.events |= EPOLLIN;
254
255     if (fdstatus_[fd] & CB_WRONLY)
256         ev.events |= EPOLLOUT;
257
258     if (flag == CB_RDWR)
259         VERIFY(op == EPOLL_CTL_DEL);
260
261     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
262     return (op == EPOLL_CTL_DEL);
263 }
264
265 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
266
267     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
268     for (int i = 0; i < nfds; i++) {
269         if (ready_[i].events & EPOLLIN)
270             readable.push_back(ready_[i].data.fd);
271
272         if (ready_[i].events & EPOLLOUT)
273             writable.push_back(ready_[i].data.fd);
274     }
275 }
276
277 #endif