02c7c626641e6fe0b651934869d152b883fb9e59
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include "types.h"
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7
8 #include "thr_pool.h"
9 #include "marshall.h"
10 #include "marshall_wrap.h"
11 #include "connection.h"
12
13 namespace rpc {
14     static constexpr milliseconds to_max{12000};
15     static constexpr milliseconds to_min{100};
16 }
17
18 class rpc_const {
19     public:
20         static const unsigned int bind = 1;   // handler number reserved for bind
21         static const int timeout_failure = -1;
22         static const int unmarshal_args_failure = -2;
23         static const int unmarshal_reply_failure = -3;
24         static const int atmostonce_failure = -4;
25         static const int oldsrv_failure = -5;
26         static const int bind_failure = -6;
27         static const int cancel_failure = -7;
28 };
29
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : public chanmgr {
34     private:
35
36         //manages per rpc info
37         struct caller {
38             caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
39
40             int xid;
41             string *rep;
42             int intret;
43             bool done = false;
44             mutex m;
45             cond c;
46         };
47
48         void get_refconn(shared_ptr<connection> & ch);
49         void update_xid_rep(int xid);
50
51
52         sockaddr_in dst_;
53         unsigned int clt_nonce_;
54         unsigned int srv_nonce_;
55         bool bind_done_;
56         int xid_;
57         int lossytest_;
58         bool retrans_;
59         bool reachable_;
60
61         shared_ptr<connection> chan_;
62
63         mutex m_; // protect insert/delete to calls[]
64         mutex chan_m_;
65
66         bool destroy_wait_;
67         cond destroy_wait_c_;
68
69         map<int, caller *> calls_;
70         list<int> xid_rep_window_;
71
72         struct request {
73             void clear() { buf.clear(); xid = -1; }
74             bool isvalid() { return xid != -1; }
75             string buf;
76             int xid = -1;
77         };
78         request dup_req_;
79         int xid_rep_done_;
80
81         int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
82
83         template<class R>
84             int call_m(proc_t proc, marshall &req, R & r, milliseconds to);
85     public:
86
87         rpcc(const string & d, bool retrans=true);
88         ~rpcc();
89
90         unsigned int id() { return clt_nonce_; }
91
92         int bind(milliseconds to = rpc::to_max);
93
94         void set_reachable(bool r) { reachable_ = r; }
95
96         void cancel();
97
98         bool got_pdu(const shared_ptr<connection> & c, const string & b);
99
100         template<class R, typename ...Args>
101             inline int call(proc_t proc, R & r, const Args&... args);
102
103         template<class R, typename ...Args>
104             inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args);
105 };
106
107 template<class R> int 
108 rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) 
109 {
110     string rep;
111     int intret = call1(proc, req, rep, to);
112     unmarshall u(rep, true);
113     if (intret < 0) return intret;
114     u >> r;
115     if (u.okdone() != true) {
116         cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
117                 "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
118         VERIFY(0);
119         return rpc_const::unmarshal_reply_failure;
120     }
121     return intret;
122 }
123
124 template<class R, typename... Args> inline int
125 rpcc::call(proc_t proc, R & r, const Args&... args)
126 {
127     return call_timeout(proc, rpc::to_max, r, args...);
128 }
129
130 template<class R, typename... Args> inline int
131 rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args)
132 {
133     marshall m{args...};
134     return call_m(proc, m, r, to);
135 }
136
137 // rpc server endpoint.
138 class rpcs : public chanmgr {
139
140     typedef enum {
141         NEW,  // new RPC, not a duplicate
142         INPROGRESS, // duplicate of an RPC we're still processing
143         DONE, // duplicate of an RPC we already replied to (have reply)
144         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
145     } rpcstate_t;
146
147     private:
148
149         // state about an in-progress or completed RPC, for at-most-once.
150         // if cb_present is true, then the RPC is complete and a reply
151         // has been sent; in that case buf points to a copy of the reply,
152         // and sz holds the size of the reply.
153     struct reply_t {
154         reply_t (int _xid) : xid(_xid), cb_present(false) {}
155         reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
156         int xid;
157         bool cb_present; // whether the reply buffer is valid
158         string buf;      // the reply buffer
159     };
160
161     in_port_t port_;
162     unsigned int nonce_;
163
164     // provide at most once semantics by maintaining a window of replies
165     // per client that that client hasn't acknowledged receiving yet.
166         // indexed by client nonce.
167     map<unsigned int, list<reply_t> > reply_window_;
168
169     void free_reply_window(void);
170     void add_reply(unsigned int clt_nonce, int xid, const string & b);
171
172     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
173             int xid, int rep_xid, string & b);
174
175     void updatestat(proc_t proc);
176
177     // latest connection to the client
178     map<unsigned int, shared_ptr<connection>> conns_;
179
180     // counting
181     const size_t counting_;
182     size_t curr_counts_;
183     map<proc_t, size_t> counts_;
184
185     bool reachable_;
186
187     // map proc # to function
188     map<proc_t, handler *> procs_;
189
190     mutex procs_m_; // protect insert/delete to procs[]
191     mutex count_m_;  //protect modification of counts
192     mutex reply_window_m_; // protect reply window et al
193     mutex conns_m_; // protect conns_
194
195
196     protected:
197
198     void dispatch(shared_ptr<connection> c, const string & buf);
199
200     // internal handler registration
201     void reg1(proc_t proc, handler *);
202
203     unique_ptr<ThrPool> dispatchpool_;
204     unique_ptr<tcpsconn> listener_;
205
206     public:
207     rpcs(in_port_t port, size_t counts=0);
208     ~rpcs();
209     inline in_port_t port() { return listener_->port(); }
210     //RPC handler for clients binding
211     int rpcbind(unsigned int &r, int a);
212
213     void set_reachable(bool r) { reachable_ = r; }
214
215     bool got_pdu(const shared_ptr<connection> & c, const string & b);
216
217     struct ReturnOnFailure {
218         static inline int unmarshall_args_failure() {
219             return rpc_const::unmarshal_args_failure;
220         }
221     };
222
223     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
224         reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
225     }
226
227     void start();
228 };
229
230 #endif