Explicit refcounting removed from connection object
[invirt/third/libt4.git] / rpc / pollmgr.cc
1 #include "types.h"
2 #include <errno.h>
3 #include <sys/select.h>
4 #include "file.h"
5
6 #ifdef __linux__
7 #include <sys/epoll.h>
8 #endif
9
10 #include "pollmgr.h"
11
12 static PollMgr instance;
13
14 PollMgr & PollMgr::Instance() { return instance; }
15
16 class wait_manager {
17     public:
18         virtual void watch_fd(int fd, poll_flag flag) = 0;
19         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
20         virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
21         virtual ~wait_manager() throw() {}
22 };
23
24 class SelectAIO : public wait_manager {
25     public :
26         SelectAIO();
27         ~SelectAIO() {}
28         void watch_fd(int fd, poll_flag flag);
29         bool unwatch_fd(int fd, poll_flag flag);
30         void wait_ready(vector<int> & readable, vector<int> & writable);
31
32     private:
33         fd_set rfds_, wfds_;
34         int highfds_;
35         file_t pipe_[2];
36         mutex m_;
37 };
38
39 #ifdef __linux__ 
40 class EPollAIO : public wait_manager {
41     public:
42         EPollAIO() {}
43         ~EPollAIO() throw() { }
44         void watch_fd(int fd, poll_flag flag);
45         bool unwatch_fd(int fd, poll_flag flag);
46         void wait_ready(vector<int> & readable, vector<int> & writable);
47
48     private:
49         file_t poll_ = epoll_create(MAX_POLL_FDS);
50         struct epoll_event ready_[MAX_POLL_FDS];
51         vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
52 };
53 #endif
54
55
56 PollMgr::PollMgr() : aio_(new SelectAIO()) {
57     th_ = thread(&PollMgr::wait_loop, this);
58 }
59
60 PollMgr::~PollMgr()
61 {
62     lock ml(m_);
63     for (auto p : callbacks_)
64         aio_->unwatch_fd(p.first, CB_RDWR);
65     pending_change_ = true;
66     shutdown_ = true;
67     changedone_c_.wait(ml);
68     delete aio_;
69     th_.join();
70 }
71
72 void
73 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
74 {
75     lock ml(m_);
76     aio_->watch_fd(fd, flag);
77
78     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
79     callbacks_[fd] = ch;
80 }
81
82 // Remove all callbacks related to fd.  After this returns, we guarantee that
83 // callbacks related to fd will never be called again.
84 void PollMgr::block_remove_fd(int fd) {
85     lock ml(m_);
86     aio_->unwatch_fd(fd, CB_RDWR);
87     pending_change_ = true;
88     changedone_c_.wait(ml);
89     callbacks_[fd] = nullptr;
90 }
91
92 void PollMgr::del_callback(int fd, poll_flag flag) {
93     lock ml(m_);
94     if (aio_->unwatch_fd(fd, flag))
95         callbacks_[fd] = nullptr;
96 }
97
98 void PollMgr::wait_loop() {
99     vector<int> readable;
100     vector<int> writable;
101     aio_callback * cb;
102
103     while (1) {
104         {
105             lock ml(m_);
106             if (pending_change_) {
107                 pending_change_ = false;
108                 changedone_c_.notify_all();
109                 if (shutdown_)
110                     break;
111             }
112         }
113         readable.clear();
114         writable.clear();
115         aio_->wait_ready(readable, writable);
116
117         for (auto fd : readable) {
118             { lock ml(m_); cb = callbacks_[fd]; }
119             if (cb) cb->read_cb(fd);
120         }
121
122         for (auto fd : writable) {
123             { lock ml(m_); cb = callbacks_[fd]; }
124             if (cb) cb->write_cb(fd);
125         }
126     }
127 }
128
129 SelectAIO::SelectAIO()
130 {
131     FD_ZERO(&rfds_);
132     FD_ZERO(&wfds_);
133
134     file_t::pipe(pipe_);
135
136     FD_SET(pipe_[0], &rfds_);
137     highfds_ = pipe_[0];
138
139     pipe_[0].flags() |= O_NONBLOCK;
140 }
141
142 void SelectAIO::watch_fd(int fd, poll_flag flag) {
143     VERIFY(fd < MAX_POLL_FDS);
144
145     lock ml(m_);
146     if (highfds_ <= fd) 
147         highfds_ = fd;
148
149     if (flag & CB_RDONLY)
150         FD_SET(fd,&rfds_);
151
152     if (flag & CB_WRONLY)
153         FD_SET(fd,&wfds_);
154
155     VERIFY(pipe_[1].write((char)1)==1);
156 }
157
158 bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
159     VERIFY(fd < MAX_POLL_FDS);
160
161     lock ml(m_);
162     VERIFY((flag & ~CB_RDWR) == 0);
163     if (flag & CB_RDONLY)
164         FD_CLR(fd, &rfds_);
165     if (flag & CB_WRONLY)
166         FD_CLR(fd, &wfds_);
167
168     int newh = pipe_[0];
169     for (int i = 0; i <= highfds_; i++) {
170         if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
171             newh = i;
172     }
173     highfds_ = newh;
174
175     if (flag == CB_RDWR)
176         VERIFY(pipe_[1].write((char)1)==1);
177
178     return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
179 }
180
181 void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
182
183     fd_set trfds, twfds;
184     int high;
185
186     {
187         lock ml(m_);
188         trfds = rfds_;
189         twfds = wfds_;
190         high = highfds_;
191     }
192
193     int ret = select(high+1, &trfds, &twfds, NULL, NULL);
194
195     if (ret < 0 && errno == EINTR)
196         return;
197     else if (ret < 0) {
198         perror("select:");
199         IF_LEVEL(0) LOG("select_loop failure errno " << errno);
200         VERIFY(0);
201     }
202
203     for (int fd = 0; fd <= high; fd++) {
204         if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
205             char tmp;
206             VERIFY(pipe_[0].read(tmp)==1);
207             VERIFY(tmp==1);
208         } else {
209             if (FD_ISSET(fd, &twfds))
210                 writable.push_back(fd);
211
212             if (FD_ISSET(fd, &trfds))
213                 readable.push_back(fd);
214         }
215     }
216 }
217
218 #ifdef __linux__ 
219
220 void EPollAIO::watch_fd(int fd_, poll_flag flag) {
221     size_t fd = (size_t)fd_;
222
223     VERIFY(fd < MAX_POLL_FDS);
224
225     struct epoll_event ev;
226     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
227     fdstatus_[fd] |= (unsigned)flag;
228
229     ev.events = EPOLLET;
230     ev.data.fd = fd_;
231
232     if (fdstatus_[fd] & CB_RDONLY)
233         ev.events |= EPOLLIN;
234
235     if (fdstatus_[fd] & CB_WRONLY)
236         ev.events |= EPOLLOUT;
237
238     if (flag == CB_RDWR)
239         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
240
241     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
242 }
243
244 bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
245     size_t fd = (size_t)fd_;
246
247     VERIFY(fd < MAX_POLL_FDS);
248     fdstatus_[fd] &= ~(unsigned)flag;
249
250     struct epoll_event ev;
251     int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
252
253     ev.events = EPOLLET;
254     ev.data.fd = fd_;
255
256     if (fdstatus_[fd] & CB_RDONLY)
257         ev.events |= EPOLLIN;
258
259     if (fdstatus_[fd] & CB_WRONLY)
260         ev.events |= EPOLLOUT;
261
262     if (flag == CB_RDWR)
263         VERIFY(op == EPOLL_CTL_DEL);
264
265     VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
266     return (op == EPOLL_CTL_DEL);
267 }
268
269 void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
270
271     int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
272     for (int i = 0; i < nfds; i++) {
273         if (ready_[i].events & EPOLLIN)
274             readable.push_back(ready_[i].data.fd);
275
276         if (ready_[i].events & EPOLLOUT)
277             writable.push_back(ready_[i].data.fd);
278     }
279 }
280
281 #endif