Global destructor clean-ups and python test fixes
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <arpa/inet.h>
6 #include <netinet/in.h>
7 #include "poll_mgr.h"
8 #include "file.h"
9
10 class connection;
11
12 class connection_delegate {
13     public:
14         virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
15         virtual ~connection_delegate();
16 };
17
18 using std::chrono::steady_clock;
19 using time_point = std::chrono::time_point<steady_clock>;
20
21 class connection : private aio_callback, public std::enable_shared_from_this<connection> {
22     public:
23         connection(connection_delegate * delegate, socket_t && f1, int lossytest=0);
24         ~connection();
25
26         bool isdead() { return dead_; }
27
28         bool send(const string & b);
29
30         static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
31
32         const time_point create_time = steady_clock::now();
33         const file_t fd;
34
35     private:
36         void write_cb(int s);
37         void read_cb(int s);
38
39         bool readpdu();
40         bool writepdu();
41
42         connection_delegate * delegate_;
43         bool dead_ = false;
44
45         enum charbuf_status_t { unused, inflight, error };
46
47         struct charbuf {
48             charbuf_status_t status;
49             string buf;
50             size_t cursor; // number of bytes written or read so far
51         };
52
53         charbuf wpdu_ = {unused, "", 0};
54         charbuf rpdu_ = {unused, "", 0};
55
56         int lossy_ = 0;
57
58         std::mutex m_;
59         cond send_complete_;
60         cond send_wait_;
61 };
62
63 class connection_listener : private aio_callback {
64     public:
65         connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0);
66         ~connection_listener();
67         inline in_port_t port() { return port_; }
68     private:
69         void write_cb(int) {}
70         void read_cb(int s);
71
72         in_port_t port_;
73         std::mutex m_;
74
75         socket_t tcp_; // listens for connections
76         connection_delegate * delegate_;
77         int lossy_;
78         std::map<int, shared_ptr<connection>> conns_;
79 };
80 #endif