Split out marshall code into a new file
[invirt/third/libt4.git] / rpc / connection.cc
1 // std::bind and syscall bind have the same name, so don't use std::bind in this file
2 #define LIBT4_NO_FUNCTIONAL
3 #include "connection.h"
4 #include <fcntl.h>
5 #include <sys/types.h>
6 #include <netinet/tcp.h>
7 #include <errno.h>
8 #include <signal.h>
9 #include <unistd.h>
10 #include <sys/socket.h>
11
12 #define MAX_PDU (10<<20) //maximum PDF is 10M
13
14 connection::connection(chanmgr *m1, int f1, int l1)
15 : mgr_(m1), fd_(f1), lossy_(l1)
16 {
17     int flags = fcntl(fd_, F_GETFL, NULL);
18     flags |= O_NONBLOCK;
19     fcntl(fd_, F_SETFL, flags);
20
21     signal(SIGPIPE, SIG_IGN);
22
23     create_time_ = steady_clock::now();
24
25     PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
26 }
27
28 connection::~connection() {
29     VERIFY(dead_);
30     if (rpdu_.buf)
31         free(rpdu_.buf);
32     VERIFY(!wpdu_.buf);
33     close(fd_);
34 }
35
36 void connection::incref() {
37     lock rl(ref_m_);
38     refno_++;
39 }
40
41 bool connection::isdead() {
42     lock ml(m_);
43     return dead_;
44 }
45
46 void connection::closeconn() {
47     {
48         lock ml(m_);
49         if (!dead_) {
50             dead_ = true;
51             shutdown(fd_,SHUT_RDWR);
52         } else {
53             return;
54         }
55     }
56     //after block_remove_fd, select will never wait on fd_
57     //and no callbacks will be active
58     PollMgr::Instance()->block_remove_fd(fd_);
59 }
60
61 void connection::decref() {
62     bool dead = false;
63     {
64         lock rl(ref_m_);
65         refno_--;
66         VERIFY(refno_>=0);
67         if (refno_==0) {
68             lock ml(m_);
69             dead = dead_;
70         }
71     }
72     if (dead)
73         delete this;
74 }
75
76 int connection::compare(connection *another) {
77     if (create_time_ > another->create_time_)
78         return 1;
79     if (create_time_ < another->create_time_)
80         return -1;
81     return 0;
82 }
83
84 bool connection::send(char *b, size_t sz) {
85     lock ml(m_);
86     waiters_++;
87     while (!dead_ && wpdu_.buf) {
88         send_wait_.wait(ml);
89     }
90     waiters_--;
91     if (dead_) {
92         return false;
93     }
94     wpdu_.buf = b;
95     wpdu_.sz = sz;
96     wpdu_.solong = 0;
97
98     if (lossy_) {
99         if ((random()%100) < lossy_) {
100             IF_LEVEL(1) LOG("connection::send LOSSY TEST shutdown fd_ " << fd_);
101             shutdown(fd_,SHUT_RDWR);
102         }
103     }
104
105     if (!writepdu()) {
106         dead_ = true;
107         ml.unlock();
108         PollMgr::Instance()->block_remove_fd(fd_);
109         ml.lock();
110     } else {
111         if (wpdu_.solong == wpdu_.sz) {
112         } else {
113             //should be rare to need to explicitly add write callback
114             PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
115             while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
116                 send_complete_.wait(ml);
117             }
118         }
119     }
120     bool ret = (!dead_ && wpdu_.solong == wpdu_.sz);
121     wpdu_.solong = wpdu_.sz = 0;
122     wpdu_.buf = NULL;
123     if (waiters_ > 0)
124         send_wait_.notify_all();
125     return ret;
126 }
127
128 //fd_ is ready to be written
129 void connection::write_cb(int s) {
130     lock ml(m_);
131     VERIFY(!dead_);
132     VERIFY(fd_ == s);
133     if (wpdu_.sz == 0) {
134         PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
135         return;
136     }
137     if (!writepdu()) {
138         PollMgr::Instance()->del_callback(fd_, CB_RDWR);
139         dead_ = true;
140     } else {
141         VERIFY(wpdu_.solong != size_t_max);
142         if (wpdu_.solong < wpdu_.sz) {
143             return;
144         }
145     }
146     send_complete_.notify_one();
147 }
148
149 //fd_ is ready to be read
150 void connection::read_cb(int s) {
151     lock ml(m_);
152     VERIFY(fd_ == s);
153     if (dead_)  {
154         return;
155     }
156
157     bool succ = true;
158     if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) {
159         succ = readpdu();
160     }
161
162     if (!succ) {
163         PollMgr::Instance()->del_callback(fd_,CB_RDWR);
164         dead_ = true;
165         send_complete_.notify_one();
166     }
167
168     if (rpdu_.buf && rpdu_.sz == rpdu_.solong) {
169         if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) {
170             //chanmgr has successfully consumed the pdu
171             rpdu_.buf = NULL;
172             rpdu_.sz = rpdu_.solong = 0;
173         }
174     }
175 }
176
177 bool connection::writepdu() {
178     VERIFY(wpdu_.solong != size_t_max);
179     if (wpdu_.solong == wpdu_.sz)
180         return true;
181
182     if (wpdu_.solong == 0) {
183         uint32_t sz = htonl((uint32_t)wpdu_.sz);
184         bcopy(&sz,wpdu_.buf,sizeof(sz));
185     }
186     ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
187     if (n < 0) {
188         if (errno != EAGAIN) {
189             IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno);
190             wpdu_.solong = size_t_max;
191             wpdu_.sz = 0;
192         }
193         return (errno == EAGAIN);
194     }
195     wpdu_.solong += (size_t)n;
196     return true;
197 }
198
199 bool connection::readpdu() {
200     if (!rpdu_.sz) {
201         uint32_t sz1;
202         ssize_t n = read(fd_, &sz1, sizeof(sz1));
203
204         if (n == 0) {
205             return false;
206         }
207
208         if (n < 0) {
209             VERIFY(errno!=EAGAIN);
210             return false;
211         }
212
213         if (n > 0 && n != sizeof(sz1)) {
214             IF_LEVEL(0) LOG("connection::readpdu short read of sz");
215             return false;
216         }
217
218         size_t sz = ntohl(sz1);
219
220         if (sz > MAX_PDU) {
221             IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1);
222             return false;
223         }
224
225         rpdu_.sz = sz;
226         VERIFY(rpdu_.buf == NULL);
227         rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
228         VERIFY(rpdu_.buf);
229         bcopy(&sz1,rpdu_.buf,sizeof(sz1));
230         rpdu_.solong = sizeof(sz1);
231     }
232
233     ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
234     if (n <= 0) {
235         if (errno == EAGAIN)
236             return true;
237         if (rpdu_.buf)
238             free(rpdu_.buf);
239         rpdu_.buf = NULL;
240         rpdu_.sz = rpdu_.solong = 0;
241         return (errno == EAGAIN);
242     }
243     rpdu_.solong += (size_t)n;
244     return true;
245 }
246
247 tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
248 : mgr_(m1), lossy_(lossytest)
249 {
250     struct sockaddr_in sin;
251     memset(&sin, 0, sizeof(sin));
252     sin.sin_family = AF_INET;
253     sin.sin_port = htons(port);
254
255     tcp_ = socket(AF_INET, SOCK_STREAM, 0);
256     if (tcp_ < 0) {
257         perror("tcpsconn::tcpsconn accept_loop socket:");
258         VERIFY(0);
259     }
260
261     int yes = 1;
262     setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
263     setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
264
265     if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
266         perror("accept_loop tcp bind:");
267         VERIFY(0);
268     }
269
270     if (listen(tcp_, 1000) < 0) {
271         perror("tcpsconn::tcpsconn listen:");
272         VERIFY(0);
273     }
274
275     socklen_t addrlen = sizeof(sin);
276     VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0);
277     port_ = ntohs(sin.sin_port);
278
279     IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port);
280
281     if (pipe(pipe_) < 0) {
282         perror("accept_loop pipe:");
283         VERIFY(0);
284     }
285
286     int flags = fcntl(pipe_[0], F_GETFL, NULL);
287     flags |= O_NONBLOCK;
288     fcntl(pipe_[0], F_SETFL, flags);
289
290     th_ = thread(&tcpsconn::accept_conn, this);
291 }
292
293 tcpsconn::~tcpsconn()
294 {
295     VERIFY(close(pipe_[1]) == 0);
296     th_.join();
297
298     //close all the active connections
299     map<int, connection *>::iterator i;
300     for (i = conns_.begin(); i != conns_.end(); i++) {
301         i->second->closeconn();
302         i->second->decref();
303     }
304 }
305
306 void tcpsconn::process_accept() {
307     sockaddr_in sin;
308     socklen_t slen = sizeof(sin);
309     int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
310     if (s1 < 0) {
311         perror("tcpsconn::accept_conn error");
312         throw thread_exit_exception();
313     }
314
315     IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntohs(sin.sin_port));
316     connection *ch = new connection(mgr_, s1, lossy_);
317
318     // garbage collect all dead connections with refcount of 1
319     for (auto i = conns_.begin(); i != conns_.end();) {
320         if (i->second->isdead() && i->second->ref() == 1) {
321             IF_LEVEL(2) LOG("accept_loop garbage collected fd=" << i->second->channo());
322             i->second->decref();
323             // Careful not to reuse i right after erase. (i++) will
324             // be evaluated before the erase call because in C++,
325             // there is a sequence point before a function call.
326             // See http://en.wikipedia.org/wiki/Sequence_point.
327             conns_.erase(i++);
328         } else
329             ++i;
330     }
331
332     conns_[ch->channo()] = ch;
333 }
334
335 void tcpsconn::accept_conn() {
336     fd_set rfds;
337     int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
338
339     try {
340         while (1) {
341             FD_ZERO(&rfds);
342             FD_SET(pipe_[0], &rfds);
343             FD_SET(tcp_, &rfds);
344
345             int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
346
347             if (ret < 0) {
348                 if (errno == EINTR) {
349                     continue;
350                 } else {
351                     perror("accept_conn select:");
352                     IF_LEVEL(0) LOG("tcpsconn::accept_conn failure errno " << errno);
353                     VERIFY(0);
354                 }
355             }
356
357             if (FD_ISSET(pipe_[0], &rfds)) {
358                 close(pipe_[0]);
359                 close(tcp_);
360                 return;
361             }
362             else if (FD_ISSET(tcp_, &rfds)) {
363                 process_accept();
364             } else {
365                 VERIFY(0);
366             }
367         }
368     }
369     catch (thread_exit_exception e)
370     {
371     }
372 }
373
374 connection * connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
375     int s = socket(AF_INET, SOCK_STREAM, 0);
376     int yes = 1;
377     setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
378     if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
379         IF_LEVEL(1) LOG_NONMEMBER("rpcc::connect_to_dst failed to " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
380         close(s);
381         return NULL;
382     }
383     IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntohs(dst.sin_port));
384     return new connection(mgr, s, lossy);
385 }
386