Lots more clean-ups
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <sys/types.h>
6 #include <arpa/inet.h>
7 #include <netinet/in.h>
8 #include "pollmgr.h"
9 #include "file.h"
10
11 constexpr size_t size_t_max = numeric_limits<size_t>::max();
12
13 class thread_exit_exception : exception {};
14
15 class connection;
16
17 class chanmgr {
18     public:
19         virtual bool got_pdu(connection *c, const string & b) = 0;
20         virtual ~chanmgr() {}
21 };
22
23 class connection : public aio_callback {
24     public:
25         struct charbuf {
26             string buf;
27             size_t solong = 0; // number of bytes written or read so far
28         };
29
30         connection(chanmgr *m1, int f1, int lossytest=0);
31         ~connection();
32
33         int channo() { return fd_; }
34         bool isdead();
35         void closeconn();
36
37         bool send(const string & b);
38         void write_cb(int s);
39         void read_cb(int s);
40
41         void incref();
42         void decref();
43         int ref() { lock rl(ref_m_); return refno_; }
44
45         int compare(connection *another);
46
47     private:
48
49         bool readpdu();
50         bool writepdu();
51
52         chanmgr *mgr_;
53         const file_t fd_;
54         bool dead_ = false;
55
56         charbuf wpdu_;
57         charbuf rpdu_;
58
59         time_point<steady_clock> create_time_;
60
61         int waiters_ = 0;
62         int refno_ = 1;
63         int lossy_ = 0;
64
65         mutex m_;
66         mutex ref_m_;
67         cond send_complete_;
68         cond send_wait_;
69 };
70
71 class tcpsconn {
72     public:
73         tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
74         ~tcpsconn();
75         inline in_port_t port() { return port_; }
76         void accept_conn();
77     private:
78         in_port_t port_;
79         mutex m_;
80         thread th_;
81         file_t pipe_[2];
82
83         socket_t tcp_; // listens for connections
84         chanmgr *mgr_;
85         int lossy_;
86         map<int, connection *> conns_;
87
88         void process_accept();
89 };
90
91 struct bundle {
92     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
93     chanmgr *mgr;
94     int tcp;
95     int lossy;
96 };
97
98 connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
99 #endif