1eb625b09018ec6e428edb3626f344d9776f9790
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <sys/types.h>
6 #include <arpa/inet.h>
7 #include <netinet/in.h>
8 #include "pollmgr.h"
9
10 constexpr size_t size_t_max = numeric_limits<size_t>::max();
11
12 class thread_exit_exception : exception {};
13
14 class connection;
15
16 class chanmgr {
17     public:
18         virtual bool got_pdu(connection *c, const string & b) = 0;
19         virtual ~chanmgr() {}
20 };
21
22 class connection : public aio_callback {
23     public:
24         struct charbuf {
25             string buf;
26             size_t solong = 0; // number of bytes written or read so far
27         };
28
29         connection(chanmgr *m1, int f1, int lossytest=0);
30         ~connection();
31
32         int channo() { return fd_; }
33         bool isdead();
34         void closeconn();
35
36         bool send(const string & b);
37         void write_cb(int s);
38         void read_cb(int s);
39
40         void incref();
41         void decref();
42         int ref() { lock rl(ref_m_); return refno_; }
43
44         int compare(connection *another);
45
46     private:
47
48         bool readpdu();
49         bool writepdu();
50
51         chanmgr *mgr_;
52         const int fd_;
53         bool dead_ = false;
54
55         charbuf wpdu_;
56         charbuf rpdu_;
57
58         time_point<steady_clock> create_time_;
59
60         int waiters_ = 0;
61         int refno_ = 1;
62         int lossy_ = 0;
63
64         mutex m_;
65         mutex ref_m_;
66         cond send_complete_;
67         cond send_wait_;
68 };
69
70 class tcpsconn {
71     public:
72         tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
73         ~tcpsconn();
74         inline in_port_t port() { return port_; }
75         void accept_conn();
76     private:
77         in_port_t port_;
78         mutex m_;
79         thread th_;
80         int pipe_[2];
81
82         int tcp_; //file desciptor for accepting connection
83         chanmgr *mgr_;
84         int lossy_;
85         map<int, connection *> conns_;
86
87         void process_accept();
88 };
89
90 struct bundle {
91     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
92     chanmgr *mgr;
93     int tcp;
94     int lossy;
95 };
96
97 connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
98 #endif