More clean-ups and cool template stuff
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <list>
7 #include <map>
8 #include <stdio.h>
9
10 #include "thr_pool.h"
11 #include "marshall.h"
12 #include "connection.h"
13 #include "lock.h"
14
15 using std::string;
16 using std::map;
17 using std::list;
18
19 class rpc_const {
20     public:
21         static const unsigned int bind = 1;   // handler number reserved for bind
22         static const int timeout_failure = -1;
23         static const int unmarshal_args_failure = -2;
24         static const int unmarshal_reply_failure = -3;
25         static const int atmostonce_failure = -4;
26         static const int oldsrv_failure = -5;
27         static const int bind_failure = -6;
28         static const int cancel_failure = -7;
29 };
30
31 // rpc client endpoint.
32 // manages a xid space per destination socket
33 // threaded: multiple threads can be sending RPCs,
34 class rpcc : public chanmgr {
35
36     private:
37
38         //manages per rpc info
39         struct caller {
40             caller(int xxid, unmarshall *un);
41             ~caller();
42
43             int xid;
44             unmarshall *un;
45             int intret;
46             bool done;
47             mutex m;
48             cond c;
49         };
50
51         void get_refconn(connection **ch);
52         void update_xid_rep(int xid);
53
54
55         sockaddr_in dst_;
56         unsigned int clt_nonce_;
57         unsigned int srv_nonce_;
58         bool bind_done_;
59         int xid_;
60         int lossytest_;
61         bool retrans_;
62         bool reachable_;
63
64         connection *chan_;
65
66         mutex m_; // protect insert/delete to calls[]
67         mutex chan_m_;
68
69         bool destroy_wait_;
70         cond destroy_wait_c_;
71
72         map<int, caller *> calls_;
73         list<int> xid_rep_window_;
74
75         struct request {
76             request() { clear(); }
77             void clear() { buf.clear(); xid = -1; }
78             bool isvalid() { return xid != -1; }
79             string buf;
80             int xid;
81         };
82         struct request dup_req_;
83         int xid_rep_done_;
84     public:
85
86         rpcc(const string & d, bool retrans=true);
87         ~rpcc();
88
89         struct TO {
90             int to;
91         };
92         static const TO to_max;
93         static const TO to_min;
94         static TO to(int x) { TO t; t.to = x; return t;}
95
96         unsigned int id() { return clt_nonce_; }
97
98         int bind(TO to = to_max);
99
100         void set_reachable(bool r) { reachable_ = r; }
101
102         void cancel();
103
104         int islossy() { return lossytest_ > 0; }
105
106         int call1(proc_t proc, 
107                 marshall &req, unmarshall &rep, TO to);
108
109         bool got_pdu(connection *c, char *b, size_t sz);
110
111         template<class R>
112             int call_m(proc_t proc, marshall &req, R & r, TO to);
113
114         template<class R, typename ...Args>
115             inline int call(proc_t proc, R & r, const Args&... args);
116
117         template<class R, typename ...Args>
118             inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
119 };
120
121 template<class R> int 
122 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
123 {
124     unmarshall u;
125     int intret = call1(proc, req, u, to);
126     if (intret < 0) return intret;
127     u >> r;
128     if (u.okdone() != true) {
129         fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
130                 "You are probably calling RPC 0x%x with wrong return "
131                 "type.\n", proc);
132         VERIFY(0);
133         return rpc_const::unmarshal_reply_failure;
134     }
135     return intret;
136 }
137
138 template<class R, typename... Args> inline int
139 rpcc::call(proc_t proc, R & r, const Args&... args)
140 {
141     return call_timeout(proc, rpcc::to_max, r, args...);
142 }
143
144 template<class R, typename... Args> inline int
145 rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
146 {
147     marshall m{args...};
148     return call_m(proc, m, r, to);
149 }
150
151 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
152
153 // rpc server endpoint.
154 class rpcs : public chanmgr {
155
156     typedef enum {
157         NEW,  // new RPC, not a duplicate
158         INPROGRESS, // duplicate of an RPC we're still processing
159         DONE, // duplicate of an RPC we already replied to (have reply)
160         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
161     } rpcstate_t;
162
163     private:
164
165         // state about an in-progress or completed RPC, for at-most-once.
166         // if cb_present is true, then the RPC is complete and a reply
167         // has been sent; in that case buf points to a copy of the reply,
168         // and sz holds the size of the reply.
169     struct reply_t {
170         reply_t (int _xid) {
171             xid = _xid;
172             cb_present = false;
173             buf = NULL;
174             sz = 0;
175         }
176         reply_t (int _xid, char *_buf, size_t _sz) {
177             xid = _xid;
178             cb_present = true;
179             buf = _buf;
180             sz = _sz;
181         }
182         int xid;
183         bool cb_present; // whether the reply buffer is valid
184         char *buf;      // the reply buffer
185         size_t sz;         // the size of reply buffer
186     };
187
188     unsigned int port_;
189     unsigned int nonce_;
190
191     // provide at most once semantics by maintaining a window of replies
192     // per client that that client hasn't acknowledged receiving yet.
193         // indexed by client nonce.
194     map<unsigned int, list<reply_t> > reply_window_;
195
196     void free_reply_window(void);
197     void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
198
199     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
200             int xid, int rep_xid,
201             char **b, size_t *sz);
202
203     void updatestat(proc_t proc);
204
205     // latest connection to the client
206     map<unsigned int, connection *> conns_;
207
208     // counting
209     const size_t counting_;
210     size_t curr_counts_;
211     map<proc_t, size_t> counts_;
212
213     int lossytest_; 
214     bool reachable_;
215
216     // map proc # to function
217     map<proc_t, handler *> procs_;
218
219     mutex procs_m_; // protect insert/delete to procs[]
220     mutex count_m_;  //protect modification of counts
221     mutex reply_window_m_; // protect reply window et al
222     mutex conss_m_; // protect conns_
223
224
225     protected:
226
227     struct djob_t {
228         djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
229         char *buf;
230         size_t sz;
231         connection *conn;
232     };
233     void dispatch(djob_t *);
234
235     // internal handler registration
236     void reg1(proc_t proc, handler *);
237
238     ThrPool* dispatchpool_;
239     tcpsconn* listener_;
240
241     public:
242     rpcs(unsigned int port, size_t counts=0);
243     ~rpcs();
244     inline unsigned int port() { return listener_->port(); }
245     //RPC handler for clients binding
246     int rpcbind(unsigned int &r, int a);
247
248     void set_reachable(bool r) { reachable_ = r; }
249
250     bool got_pdu(connection *c, char *b, size_t sz);
251
252     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
253 };
254
255 struct ReturnOnFailure {
256     static inline int unmarshall_args_failure() {
257         return rpc_const::unmarshal_args_failure;
258     }
259 };
260
261 template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
262     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
263 }
264
265 sockaddr_in make_sockaddr(const string &hostandport);
266 sockaddr_in make_sockaddr(const string &host, const string &port);
267
268 #endif