20a0aa91113e351beb7cf4ae436293030f46e053
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <list>
7 #include <map>
8 #include <stdio.h>
9
10 #include "thr_pool.h"
11 #include "marshall.h"
12 #include "connection.h"
13
14 #ifdef DMALLOC
15 #include "dmalloc.h"
16 #endif
17
18 class rpc_const {
19     public:
20         static const unsigned int bind = 1;   // handler number reserved for bind
21         static const int timeout_failure = -1;
22         static const int unmarshal_args_failure = -2;
23         static const int unmarshal_reply_failure = -3;
24         static const int atmostonce_failure = -4;
25         static const int oldsrv_failure = -5;
26         static const int bind_failure = -6;
27         static const int cancel_failure = -7;
28 };
29
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : public chanmgr {
34
35     private:
36
37         //manages per rpc info
38         struct caller {
39             caller(unsigned int xxid, unmarshall *un);
40             ~caller();
41
42             unsigned int xid;
43             unmarshall *un;
44             int intret;
45             bool done;
46             std::mutex m;
47             std::condition_variable c;
48         };
49
50         void get_refconn(connection **ch);
51         void update_xid_rep(unsigned int xid);
52
53
54         sockaddr_in dst_;
55         unsigned int clt_nonce_;
56         unsigned int srv_nonce_;
57         bool bind_done_;
58         unsigned int xid_;
59         int lossytest_;
60         bool retrans_;
61         bool reachable_;
62
63         connection *chan_;
64
65         std::mutex m_; // protect insert/delete to calls[]
66         std::mutex chan_m_;
67
68         bool destroy_wait_;
69         std::condition_variable destroy_wait_c_;
70
71         std::map<int, caller *> calls_;
72         std::list<unsigned int> xid_rep_window_;
73
74         struct request {
75             request() { clear(); }
76             void clear() { buf.clear(); xid = -1; }
77             bool isvalid() { return xid != -1; }
78             std::string buf;
79             int xid;
80         };
81         struct request dup_req_;
82         int xid_rep_done_;
83     public:
84
85         rpcc(sockaddr_in d, bool retrans=true);
86         ~rpcc();
87
88         struct TO {
89             int to;
90         };
91         static const TO to_max;
92         static const TO to_min;
93         static TO to(int x) { TO t; t.to = x; return t;}
94
95         unsigned int id() { return clt_nonce_; }
96
97         int bind(TO to = to_max);
98
99         void set_reachable(bool r) { reachable_ = r; }
100
101         void cancel();
102                 
103                 int islossy() { return lossytest_ > 0; }
104
105         int call1(unsigned int proc, 
106                 marshall &req, unmarshall &rep, TO to);
107
108         bool got_pdu(connection *c, char *b, int sz);
109
110
111         template<class R>
112             int call_m(unsigned int proc, marshall &req, R & r, TO to);
113
114         template<class R, typename ...Args>
115             inline int call(unsigned int proc, R & r, const Args&... args);
116
117         template<class R, typename ...Args>
118             inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
119 };
120
121 template<class R> int 
122 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
123 {
124     unmarshall u;
125     int intret = call1(proc, req, u, to);
126     if (intret < 0) return intret;
127     u >> r;
128     if (u.okdone() != true) {
129         fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
130                 "You are probably calling RPC 0x%x with wrong return "
131                 "type.\n", proc);
132         VERIFY(0);
133         return rpc_const::unmarshal_reply_failure;
134     }
135     return intret;
136 }
137
138 template<class R, typename... Args> inline int
139 rpcc::call(unsigned int proc, R & r, const Args&... args)
140 {
141     return call_timeout(proc, rpcc::to_max, r, args...);
142 }
143
144 template<class R, typename... Args> inline int
145 rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
146 {
147     marshall m{args...};
148     return call_m(proc, m, r, to);
149 }
150
151 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
152
153 // rpc server endpoint.
154 class rpcs : public chanmgr {
155
156     typedef enum {
157         NEW,  // new RPC, not a duplicate
158         INPROGRESS, // duplicate of an RPC we're still processing
159         DONE, // duplicate of an RPC we already replied to (have reply)
160         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
161     } rpcstate_t;
162
163     private:
164
165         // state about an in-progress or completed RPC, for at-most-once.
166         // if cb_present is true, then the RPC is complete and a reply
167         // has been sent; in that case buf points to a copy of the reply,
168         // and sz holds the size of the reply.
169     struct reply_t {
170         reply_t (unsigned int _xid) {
171             xid = _xid;
172             cb_present = false;
173             buf = NULL;
174             sz = 0;
175         }
176         reply_t (unsigned int _xid, char *_buf, int _sz) {
177             xid = _xid;
178             cb_present = true;
179             buf = _buf;
180             sz = _sz;
181         }
182         unsigned int xid;
183         bool cb_present; // whether the reply buffer is valid
184         char *buf;      // the reply buffer
185         int sz;         // the size of reply buffer
186     };
187
188     int port_;
189     unsigned int nonce_;
190
191     // provide at most once semantics by maintaining a window of replies
192     // per client that that client hasn't acknowledged receiving yet.
193         // indexed by client nonce.
194     std::map<unsigned int, std::list<reply_t> > reply_window_;
195
196     void free_reply_window(void);
197     void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
198
199     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
200             unsigned int xid, unsigned int rep_xid,
201             char **b, int *sz);
202
203     void updatestat(unsigned int proc);
204
205     // latest connection to the client
206     std::map<unsigned int, connection *> conns_;
207
208     // counting
209     const int counting_;
210     int curr_counts_;
211     std::map<int, int> counts_;
212
213     int lossytest_; 
214     bool reachable_;
215
216     // map proc # to function
217     std::map<int, handler *> procs_;
218
219     std::mutex procs_m_; // protect insert/delete to procs[]
220     std::mutex count_m_;  //protect modification of counts
221     std::mutex reply_window_m_; // protect reply window et al
222     std::mutex conss_m_; // protect conns_
223
224
225     protected:
226
227     struct djob_t {
228         djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
229         char *buf;
230         int sz;
231         connection *conn;
232     };
233     void dispatch(djob_t *);
234
235     // internal handler registration
236     void reg1(unsigned int proc, handler *);
237
238     ThrPool* dispatchpool_;
239     tcpsconn* listener_;
240
241     public:
242     rpcs(unsigned int port, int counts=0);
243     ~rpcs();
244     inline int port() { return listener_->port(); }
245     //RPC handler for clients binding
246     int rpcbind(int &r, int a);
247
248     void set_reachable(bool r) { reachable_ = r; }
249
250     bool got_pdu(connection *c, char *b, int sz);
251
252     template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
253 };
254
255 struct ReturnOnFailure {
256     static inline int unmarshall_args_failure() {
257         return rpc_const::unmarshal_args_failure;
258     }
259 };
260
261 template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
262     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
263 }
264
265 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
266 void make_sockaddr(const char *host, const char *port,
267         struct sockaddr_in *dst);
268
269 #endif