More clean-ups and cool template stuff
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include <sys/types.h>
5 #include <sys/socket.h>
6 #include <arpa/inet.h>
7 #include <netinet/in.h>
8 #include <cstddef>
9 #include <thread>
10
11 #include <map>
12 #include <limits>
13
14 #include "pollmgr.h"
15
16 constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
17
18 class thread_exit_exception : std::exception {
19 };
20
21 class connection;
22
23 class chanmgr {
24     public:
25         virtual bool got_pdu(connection *c, char *b, size_t sz) = 0;
26         virtual ~chanmgr() {}
27 };
28
29 class connection : public aio_callback {
30     public:
31         struct charbuf {
32             charbuf(): buf(NULL), sz(0), solong(0) {}
33             charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){}
34             char *buf;
35             size_t sz;
36             size_t solong; // number of bytes written or read so far
37         };
38
39         connection(chanmgr *m1, int f1, int lossytest=0);
40         ~connection();
41
42         int channo() { return fd_; }
43         bool isdead();
44         void closeconn();
45
46         bool send(char *b, size_t sz);
47         void write_cb(int s);
48         void read_cb(int s);
49
50         void incref();
51         void decref();
52         int ref();
53
54         int compare(connection *another);
55     private:
56
57         bool readpdu();
58         bool writepdu();
59
60         chanmgr *mgr_;
61         const int fd_;
62         bool dead_;
63
64         charbuf wpdu_;
65         charbuf rpdu_;
66
67         std::chrono::time_point<std::chrono::steady_clock> create_time_;
68
69         int waiters_;
70         int refno_;
71         const int lossy_;
72
73         std::mutex m_;
74         std::mutex ref_m_;
75         std::condition_variable send_complete_;
76         std::condition_variable send_wait_;
77 };
78
79 class tcpsconn {
80     public:
81         tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0);
82         ~tcpsconn();
83         inline unsigned int port() { return port_; }
84         void accept_conn();
85     private:
86         unsigned int port_;
87         std::mutex m_;
88         std::thread th_;
89         int pipe_[2];
90
91         int tcp_; //file desciptor for accepting connection
92         chanmgr *mgr_;
93         int lossy_;
94         std::map<int, connection *> conns_;
95
96         void process_accept();
97 };
98
99 struct bundle {
100     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
101     chanmgr *mgr;
102     int tcp;
103     int lossy;
104 };
105
106 connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
107 #endif