261cf9ff9ebc907b5e53e09814f20c86a8bc8edd
[invirt/third/libt4.git] / rpc / connection.h
1 #ifndef connection_h
2 #define connection_h
3
4 #include "types.h"
5 #include <sys/types.h>
6 #include <arpa/inet.h>
7 #include <netinet/in.h>
8 #include <cstddef>
9 #include "pollmgr.h"
10
11 constexpr size_t size_t_max = numeric_limits<size_t>::max();
12
13 class thread_exit_exception : exception {};
14
15 class connection;
16
17 class chanmgr {
18     public:
19         virtual bool got_pdu(connection *c, char *b, size_t sz) = 0;
20         virtual ~chanmgr() {}
21 };
22
23 class connection : public aio_callback {
24     public:
25         struct charbuf {
26             charbuf(): buf(NULL), sz(0), solong(0) {}
27             charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){}
28             char *buf;
29             size_t sz;
30             size_t solong; // number of bytes written or read so far
31         };
32
33         connection(chanmgr *m1, int f1, int lossytest=0);
34         ~connection();
35
36         int channo() { return fd_; }
37         bool isdead();
38         void closeconn();
39
40         bool send(char *b, size_t sz);
41         void write_cb(int s);
42         void read_cb(int s);
43
44         void incref();
45         void decref();
46         int ref();
47
48         int compare(connection *another);
49     private:
50
51         bool readpdu();
52         bool writepdu();
53
54         chanmgr *mgr_;
55         const int fd_;
56         bool dead_;
57
58         charbuf wpdu_;
59         charbuf rpdu_;
60
61         time_point<steady_clock> create_time_;
62
63         int waiters_;
64         int refno_;
65         const int lossy_;
66
67         mutex m_;
68         mutex ref_m_;
69         cond send_complete_;
70         cond send_wait_;
71 };
72
73 class tcpsconn {
74     public:
75         tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0);
76         ~tcpsconn();
77         inline unsigned int port() { return port_; }
78         void accept_conn();
79     private:
80         unsigned int port_;
81         mutex m_;
82         thread th_;
83         int pipe_[2];
84
85         int tcp_; //file desciptor for accepting connection
86         chanmgr *mgr_;
87         int lossy_;
88         map<int, connection *> conns_;
89
90         void process_accept();
91 };
92
93 struct bundle {
94     bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
95     chanmgr *mgr;
96     int tcp;
97     int lossy;
98 };
99
100 connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
101 #endif