Includes cleanups
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <arpa/inet.h>
6 #include <netinet/in.h>
7 #include "pollmgr.h"
8 #include "file.h"
9
10 constexpr size_t size_t_max = numeric_limits<size_t>::max();
11
12 class thread_exit_exception : exception {};
13
14 class connection;
15
16 class chanmgr {
17     public:
18         virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
19         virtual ~chanmgr() {}
20 };
21
22 class connection : public aio_callback, public enable_shared_from_this<connection> {
23     public:
24         struct charbuf {
25             string buf;
26             size_t solong = 0; // number of bytes written or read so far
27         };
28
29         connection(chanmgr *m1, int f1, int lossytest=0);
30         ~connection();
31
32         int channo() { return fd_; }
33         bool isdead() { lock ml(m_); return dead_; }
34         void closeconn();
35
36         bool send(const string & b);
37         void write_cb(int s);
38         void read_cb(int s);
39
40         time_point<steady_clock> create_time() const { return create_time_; }
41
42     private:
43
44         bool readpdu();
45         bool writepdu();
46
47         chanmgr *mgr_;
48         const file_t fd_;
49         bool dead_ = false;
50
51         charbuf wpdu_;
52         charbuf rpdu_;
53
54         time_point<steady_clock> create_time_;
55
56         int waiters_ = 0;
57         int lossy_ = 0;
58
59         mutex m_;
60         cond send_complete_;
61         cond send_wait_;
62 };
63
64 class tcpsconn {
65     public:
66         tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
67         ~tcpsconn();
68         inline in_port_t port() { return port_; }
69         void accept_conn();
70     private:
71         in_port_t port_;
72         mutex m_;
73         thread th_;
74         file_t pipe_[2];
75
76         socket_t tcp_; // listens for connections
77         chanmgr *mgr_;
78         int lossy_;
79         map<int, shared_ptr<connection>> conns_;
80
81         void process_accept();
82 };
83
84 struct bundle {
85     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
86     chanmgr *mgr;
87     int tcp;
88     int lossy;
89 };
90
91 shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
92 #endif