7032b8c56e9fbe7e87a48a027491b67dc5115b8c
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <sys/types.h>
6 #include <arpa/inet.h>
7 #include <netinet/in.h>
8 #include "pollmgr.h"
9 #include "file.h"
10
11 constexpr size_t size_t_max = numeric_limits<size_t>::max();
12
13 class thread_exit_exception : exception {};
14
15 class connection;
16
17 class chanmgr {
18     public:
19         virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
20         virtual ~chanmgr() {}
21 };
22
23 class connection : public aio_callback, public enable_shared_from_this<connection> {
24     public:
25         struct charbuf {
26             string buf;
27             size_t solong = 0; // number of bytes written or read so far
28         };
29
30         connection(chanmgr *m1, int f1, int lossytest=0);
31         ~connection();
32
33         int channo() { return fd_; }
34         bool isdead() { lock ml(m_); return dead_; }
35         void closeconn();
36
37         bool send(const string & b);
38         void write_cb(int s);
39         void read_cb(int s);
40
41         time_point<steady_clock> create_time() const { return create_time_; }
42
43     private:
44
45         bool readpdu();
46         bool writepdu();
47
48         chanmgr *mgr_;
49         const file_t fd_;
50         bool dead_ = false;
51
52         charbuf wpdu_;
53         charbuf rpdu_;
54
55         time_point<steady_clock> create_time_;
56
57         int waiters_ = 0;
58         int lossy_ = 0;
59
60         mutex m_;
61         cond send_complete_;
62         cond send_wait_;
63 };
64
65 class tcpsconn {
66     public:
67         tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
68         ~tcpsconn();
69         inline in_port_t port() { return port_; }
70         void accept_conn();
71     private:
72         in_port_t port_;
73         mutex m_;
74         thread th_;
75         file_t pipe_[2];
76
77         socket_t tcp_; // listens for connections
78         chanmgr *mgr_;
79         int lossy_;
80         map<int, shared_ptr<connection>> conns_;
81
82         void process_accept();
83 };
84
85 struct bundle {
86     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
87     chanmgr *mgr;
88     int tcp;
89     int lossy;
90 };
91
92 shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
93 #endif