84c12f346a132c4ad06fa420f5c774e46978ab0b
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include "types.h"
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7
8 #include "thr_pool.h"
9 #include "marshall.h"
10 #include "marshall_wrap.h"
11 #include "connection.h"
12
13 namespace rpc {
14     static constexpr milliseconds to_max{12000};
15     static constexpr milliseconds to_min{100};
16 }
17
18 class rpc_const {
19     public:
20         static const unsigned int bind = 1;   // handler number reserved for bind
21         static const int timeout_failure = -1;
22         static const int unmarshal_args_failure = -2;
23         static const int unmarshal_reply_failure = -3;
24         static const int atmostonce_failure = -4;
25         static const int oldsrv_failure = -5;
26         static const int bind_failure = -6;
27         static const int cancel_failure = -7;
28 };
29
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : private connection_delegate {
34     private:
35
36         // manages per rpc info
37         struct caller {
38             caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
39
40             int xid;
41             string *rep;
42             int intret;
43             bool done = false;
44             mutex m;
45             cond c;
46         };
47
48         void get_refconn(shared_ptr<connection> & ch);
49         void update_xid_rep(int xid);
50
51
52         sockaddr_in dst_;
53         unsigned int clt_nonce_;
54         unsigned int srv_nonce_;
55         bool bind_done_;
56         int xid_;
57         int lossytest_;
58         bool retrans_;
59         bool reachable_;
60
61         shared_ptr<connection> chan_;
62
63         mutex m_; // protect insert/delete to calls[]
64         mutex chan_m_;
65
66         bool destroy_wait_;
67         cond destroy_wait_c_;
68
69         map<int, caller *> calls_;
70         list<int> xid_rep_window_;
71
72         struct request {
73             void clear() { buf.clear(); xid = -1; }
74             bool isvalid() { return xid != -1; }
75             string buf;
76             int xid = -1;
77         };
78         request dup_req_;
79         int xid_rep_done_;
80
81         int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
82
83         template<class R>
84         int call_m(proc_t proc, marshall &req, R & r, milliseconds to) {
85             string rep;
86             int intret = call1(proc, req, rep, to);
87             unmarshall u(rep, true);
88             if (intret < 0) return intret;
89             u >> r;
90             if (u.okdone() != true) {
91                 cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
92                     "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
93                 VERIFY(0);
94                 return rpc_const::unmarshal_reply_failure;
95             }
96             return intret;
97         }
98
99         bool got_pdu(const shared_ptr<connection> & c, const string & b);
100
101     public:
102
103         rpcc(const string & d, bool retrans=true);
104         ~rpcc();
105
106         unsigned int id() { return clt_nonce_; }
107
108         int bind(milliseconds to = rpc::to_max);
109
110         void set_reachable(bool r) { reachable_ = r; }
111
112         void cancel();
113
114         template<class R, typename ...Args>
115         inline int call(proc_t proc, R & r, const Args&... args) {
116             return call_timeout(proc, rpc::to_max, r, args...);
117         }
118
119         template<class R, typename ...Args>
120         inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args) {
121             marshall m{args...};
122             return call_m(proc, m, r, to);
123         }
124 };
125
126 // rpc server endpoint.
127 class rpcs : private connection_delegate {
128     private:
129
130         typedef enum {
131             NEW,  // new RPC, not a duplicate
132             INPROGRESS, // duplicate of an RPC we're still processing
133             DONE, // duplicate of an RPC we already replied to (have reply)
134             FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
135         } rpcstate_t;
136
137         // state about an in-progress or completed RPC, for at-most-once.
138         // if cb_present is true, then the RPC is complete and a reply
139         // has been sent; in that case buf points to a copy of the reply,
140         // and sz holds the size of the reply.
141         struct reply_t {
142             reply_t (int _xid) : xid(_xid), cb_present(false) {}
143             reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
144             int xid;
145             bool cb_present; // whether the reply buffer is valid
146             string buf;      // the reply buffer
147         };
148
149         in_port_t port_;
150         unsigned int nonce_;
151
152         // provide at most once semantics by maintaining a window of replies
153         // per client that that client hasn't acknowledged receiving yet.
154         // indexed by client nonce.
155         map<unsigned int, list<reply_t>> reply_window_;
156
157         void free_reply_window(void);
158         void add_reply(unsigned int clt_nonce, int xid, const string & b);
159
160         rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
161                 int xid, int rep_xid, string & b);
162
163         void updatestat(proc_t proc);
164
165         // latest connection to the client
166         map<unsigned int, shared_ptr<connection>> conns_;
167
168         // counting
169         const size_t counting_;
170         size_t curr_counts_;
171         map<proc_t, size_t> counts_;
172
173         bool reachable_;
174
175         // map proc # to function
176         map<proc_t, handler *> procs_;
177
178         mutex procs_m_; // protect insert/delete to procs[]
179         mutex count_m_;  // protect modification of counts
180         mutex reply_window_m_; // protect reply window et al
181         mutex conns_m_; // protect conns_
182
183         void dispatch(shared_ptr<connection> c, const string & buf);
184
185         // internal handler registration
186         void reg1(proc_t proc, handler *);
187
188         unique_ptr<thread_pool> dispatchpool_;
189         unique_ptr<tcpsconn> listener_;
190
191         // RPC handler for clients binding
192         int rpcbind(unsigned int &r, int a);
193
194         bool got_pdu(const shared_ptr<connection> & c, const string & b);
195
196     public:
197
198         rpcs(in_port_t port, size_t counts=0);
199         ~rpcs();
200
201         void set_reachable(bool r) { reachable_ = r; }
202
203         template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
204             struct ReturnOnFailure {
205                 static inline int unmarshall_args_failure() {
206                     return rpc_const::unmarshal_args_failure;
207                 }
208             };
209             reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
210         }
211
212         void start();
213 };
214
215 #endif