Variadic templates for RPCs
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <list>
7 #include <map>
8 #include <stdio.h>
9
10 #include "thr_pool.h"
11 #include "marshall.h"
12 #include "connection.h"
13
14 #ifdef DMALLOC
15 #include "dmalloc.h"
16 #endif
17
18 class rpc_const {
19         public:
20                 static const unsigned int bind = 1;   // handler number reserved for bind
21                 static const int timeout_failure = -1;
22                 static const int unmarshal_args_failure = -2;
23                 static const int unmarshal_reply_failure = -3;
24                 static const int atmostonce_failure = -4;
25                 static const int oldsrv_failure = -5;
26                 static const int bind_failure = -6;
27                 static const int cancel_failure = -7;
28 };
29
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : public chanmgr {
34
35         private:
36
37                 //manages per rpc info
38                 struct caller {
39                         caller(unsigned int xxid, unmarshall *un);
40                         ~caller();
41
42                         unsigned int xid;
43                         unmarshall *un;
44                         int intret;
45                         bool done;
46             std::mutex m;
47             std::condition_variable c;
48                 };
49
50                 void get_refconn(connection **ch);
51                 void update_xid_rep(unsigned int xid);
52
53
54                 sockaddr_in dst_;
55                 unsigned int clt_nonce_;
56                 unsigned int srv_nonce_;
57                 bool bind_done_;
58                 unsigned int xid_;
59                 int lossytest_;
60                 bool retrans_;
61                 bool reachable_;
62
63                 connection *chan_;
64
65         std::mutex m_; // protect insert/delete to calls[]
66                 std::mutex chan_m_;
67
68                 bool destroy_wait_;
69         std::condition_variable destroy_wait_c_;
70
71                 std::map<int, caller *> calls_;
72                 std::list<unsigned int> xid_rep_window_;
73                 
74                 struct request {
75                     request() { clear(); }
76                     void clear() { buf.clear(); xid = -1; }
77                     bool isvalid() { return xid != -1; }
78                     std::string buf;
79                     int xid;
80                 };
81                 struct request dup_req_;
82                 int xid_rep_done_;
83         public:
84
85                 rpcc(sockaddr_in d, bool retrans=true);
86                 ~rpcc();
87
88                 struct TO {
89                         int to;
90                 };
91                 static const TO to_max;
92                 static const TO to_min;
93                 static TO to(int x) { TO t; t.to = x; return t;}
94
95                 unsigned int id() { return clt_nonce_; }
96
97                 int bind(TO to = to_max);
98
99                 void set_reachable(bool r) { reachable_ = r; }
100
101                 void cancel();
102                 
103                 int islossy() { return lossytest_ > 0; }
104
105                 int call1(unsigned int proc, 
106                                 marshall &req, unmarshall &rep, TO to);
107
108                 bool got_pdu(connection *c, char *b, int sz);
109
110
111                 template<class R>
112                         int call_m(unsigned int proc, marshall &req, R & r, TO to);
113
114                 template<class R, typename ...Args>
115                         inline int call(unsigned int proc, R & r, const Args&... args);
116
117                 template<class R, typename ...Args>
118                         inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
119 };
120
121 template<class R> int 
122 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
123 {
124     unmarshall u;
125     int intret = call1(proc, req, u, to);
126     if (intret < 0) return intret;
127     u >> r;
128     if (u.okdone() != true) {
129         fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
130                 "You are probably calling RPC 0x%x with wrong return "
131                 "type.\n", proc);
132         VERIFY(0);
133         return rpc_const::unmarshal_reply_failure;
134     }
135     return intret;
136 }
137
138 template<class R, typename... Args> inline int
139 rpcc::call(unsigned int proc, R & r, const Args&... args)
140 {
141     return call_timeout(proc, rpcc::to_max, r, args...);
142 }
143
144 template<class R, typename... Args> inline int
145 rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
146 {
147         marshall m{args...};
148         return call_m(proc, m, r, to);
149 }
150
151 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
152
153 class handler {
154         public:
155                 handler() { }
156                 virtual ~handler() { }
157                 virtual int fn(unmarshall &, marshall &) = 0;
158 };
159
160
161 // rpc server endpoint.
162 class rpcs : public chanmgr {
163
164         typedef enum {
165                 NEW,  // new RPC, not a duplicate
166                 INPROGRESS, // duplicate of an RPC we're still processing
167                 DONE, // duplicate of an RPC we already replied to (have reply)
168                 FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
169         } rpcstate_t;
170
171         private:
172
173         // state about an in-progress or completed RPC, for at-most-once.
174         // if cb_present is true, then the RPC is complete and a reply
175         // has been sent; in that case buf points to a copy of the reply,
176         // and sz holds the size of the reply.
177         struct reply_t {
178                 reply_t (unsigned int _xid) {
179                         xid = _xid;
180                         cb_present = false;
181                         buf = NULL;
182                         sz = 0;
183                 }
184                 reply_t (unsigned int _xid, char *_buf, int _sz) {
185                         xid = _xid;
186                         cb_present = true;
187                         buf = _buf;
188                         sz = _sz;
189                 }
190                 unsigned int xid;
191                 bool cb_present; // whether the reply buffer is valid
192                 char *buf;      // the reply buffer
193                 int sz;         // the size of reply buffer
194         };
195
196         int port_;
197         unsigned int nonce_;
198
199         // provide at most once semantics by maintaining a window of replies
200         // per client that that client hasn't acknowledged receiving yet.
201         // indexed by client nonce.
202         std::map<unsigned int, std::list<reply_t> > reply_window_;
203
204         void free_reply_window(void);
205         void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
206
207         rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
208                         unsigned int xid, unsigned int rep_xid,
209                         char **b, int *sz);
210
211         void updatestat(unsigned int proc);
212
213         // latest connection to the client
214         std::map<unsigned int, connection *> conns_;
215
216         // counting
217         const int counting_;
218         int curr_counts_;
219         std::map<int, int> counts_;
220
221         int lossytest_; 
222         bool reachable_;
223
224         // map proc # to function
225         std::map<int, handler *> procs_;
226
227         std::mutex procs_m_; // protect insert/delete to procs[]
228         std::mutex count_m_;  //protect modification of counts
229         std::mutex reply_window_m_; // protect reply window et al
230         std::mutex conss_m_; // protect conns_
231
232
233         protected:
234
235         struct djob_t {
236                 djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
237                 char *buf;
238                 int sz;
239                 connection *conn;
240         };
241         void dispatch(djob_t *);
242
243         // internal handler registration
244         void reg1(unsigned int proc, handler *);
245
246         ThrPool* dispatchpool_;
247         tcpsconn* listener_;
248
249         public:
250         rpcs(unsigned int port, int counts=0);
251         ~rpcs();
252         inline int port() { return listener_->port(); }
253         //RPC handler for clients binding
254         int rpcbind(int a, int &r);
255
256         void set_reachable(bool r) { reachable_ = r; }
257
258         bool got_pdu(connection *c, char *b, int sz);
259
260         // register a handler
261         template<class S, class A1, class R>
262                 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
263         template<class S, class A1, class A2, class R>
264                 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, 
265                                         R & r));
266         template<class S, class A1, class A2, class A3, class R>
267                 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
268                                         const A3, R & r));
269         template<class S, class A1, class A2, class A3, class A4, class R>
270                 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
271                                         const A3, const A4, R & r));
272         template<class S, class A1, class A2, class A3, class A4, class A5, class R>
273                 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
274                                         const A3, const A4, const A5, 
275                                         R & r));
276         template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
277                 class R>
278                         void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
279                                                 const A3, const A4, const A5, 
280                                                 const A6, R & r));
281         template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
282                 class A7, class R>
283                         void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
284                                                 const A3, const A4, const A5, 
285                                                 const A6, const A7,
286                                                 R & r));
287 };
288
289 template<class S, class A1, class R> void
290 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
291 {
292         class h1 : public handler {
293                 private:
294                         S * sob;
295                         int (S::*meth)(const A1 a1, R & r);
296                 public:
297                         h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
298                                 : sob(xsob), meth(xmeth) { }
299                         int fn(unmarshall &args, marshall &ret) {
300                                 A1 a1;
301                                 R r;
302                                 args >> a1;
303                                 if(!args.okdone())
304                                         return rpc_const::unmarshal_args_failure;
305                                 int b = (sob->*meth)(a1, r);
306                                 ret << r;
307                                 return b;
308                         }
309         };
310         reg1(proc, new h1(sob, meth));
311 }
312
313 template<class S, class A1, class A2, class R> void
314 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
315                         R & r))
316 {
317         class h1 : public handler {
318                 private:
319                         S * sob;
320                         int (S::*meth)(const A1 a1, const A2 a2, R & r);
321                 public:
322                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
323                                 : sob(xsob), meth(xmeth) { }
324                         int fn(unmarshall &args, marshall &ret) {
325                                 A1 a1;
326                                 A2 a2;
327                                 R r;
328                                 args >> a1;
329                                 args >> a2;
330                                 if(!args.okdone())
331                                         return rpc_const::unmarshal_args_failure;
332                                 int b = (sob->*meth)(a1, a2, r);
333                                 ret << r;
334                                 return b;
335                         }
336         };
337         reg1(proc, new h1(sob, meth));
338 }
339
340 template<class S, class A1, class A2, class A3, class R> void
341 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
342                         const A3 a3, R & r))
343 {
344         class h1 : public handler {
345                 private:
346                         S * sob;
347                         int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
348                 public:
349                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
350                                 : sob(xsob), meth(xmeth) { }
351                         int fn(unmarshall &args, marshall &ret) {
352                                 A1 a1;
353                                 A2 a2;
354                                 A3 a3;
355                                 R r;
356                                 args >> a1;
357                                 args >> a2;
358                                 args >> a3;
359                                 if(!args.okdone())
360                                         return rpc_const::unmarshal_args_failure;
361                                 int b = (sob->*meth)(a1, a2, a3, r);
362                                 ret << r;
363                                 return b;
364                         }
365         };
366         reg1(proc, new h1(sob, meth));
367 }
368
369 template<class S, class A1, class A2, class A3, class A4, class R> void
370 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
371                         const A3 a3, const A4 a4, 
372                         R & r))
373 {
374         class h1 : public handler {
375                 private:
376                         S * sob;
377                         int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
378                 public:
379                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
380                                                 const A4 a4, R & r))
381                                 : sob(xsob), meth(xmeth)  { }
382                         int fn(unmarshall &args, marshall &ret) {
383                                 A1 a1;
384                                 A2 a2;
385                                 A3 a3;
386                                 A4 a4;
387                                 R r;
388                                 args >> a1;
389                                 args >> a2;
390                                 args >> a3;
391                                 args >> a4;
392                                 if(!args.okdone())
393                                         return rpc_const::unmarshal_args_failure;
394                                 int b = (sob->*meth)(a1, a2, a3, a4, r);
395                                 ret << r;
396                                 return b;
397                         }
398         };
399         reg1(proc, new h1(sob, meth));
400 }
401
402 template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
403 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
404                         const A3 a3, const A4 a4, 
405                         const A5 a5, R & r))
406 {
407         class h1 : public handler {
408                 private:
409                         S * sob;
410                         int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
411                                         const A5 a5, R & r);
412                 public:
413                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
414                                                 const A4 a4, const A5 a5, R & r))
415                                 : sob(xsob), meth(xmeth) { }
416                         int fn(unmarshall &args, marshall &ret) {
417                                 A1 a1;
418                                 A2 a2;
419                                 A3 a3;
420                                 A4 a4;
421                                 A5 a5;
422                                 R r;
423                                 args >> a1;
424                                 args >> a2;
425                                 args >> a3;
426                                 args >> a4;
427                                 args >> a5;
428                                 if(!args.okdone())
429                                         return rpc_const::unmarshal_args_failure;
430                                 int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
431                                 ret << r;
432                                 return b;
433                         }
434         };
435         reg1(proc, new h1(sob, meth));
436 }
437
438 template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
439 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
440                         const A3 a3, const A4 a4, 
441                         const A5 a5, const A6 a6, 
442                         R & r))
443 {
444         class h1 : public handler {
445                 private:
446                         S * sob;
447                         int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
448                                         const A5 a5, const A6 a6, R & r);
449                 public:
450                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
451                                                 const A4 a4, const A5 a5, const A6 a6, R & r))
452                                 : sob(xsob), meth(xmeth) { }
453                         int fn(unmarshall &args, marshall &ret) {
454                                 A1 a1;
455                                 A2 a2;
456                                 A3 a3;
457                                 A4 a4;
458                                 A5 a5;
459                                 A6 a6;
460                                 R r;
461                                 args >> a1;
462                                 args >> a2;
463                                 args >> a3;
464                                 args >> a4;
465                                 args >> a5;
466                                 args >> a6;
467                                 if(!args.okdone())
468                                         return rpc_const::unmarshal_args_failure;
469                                 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
470                                 ret << r;
471                                 return b;
472                         }
473         };
474         reg1(proc, new h1(sob, meth));
475 }
476
477 template<class S, class A1, class A2, class A3, class A4, class A5, 
478         class A6, class A7, class R> void
479 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
480                         const A3 a3, const A4 a4, 
481                         const A5 a5, const A6 a6,
482                         const A7 a7, R & r))
483 {
484         class h1 : public handler {
485                 private:
486                         S * sob;
487                         int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
488                                         const A5 a5, const A6 a6, const A7 a7, R & r);
489                 public:
490                         h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
491                                                 const A4 a4, const A5 a5, const A6 a6,
492                                                 const A7 a7, R & r))
493                                 : sob(xsob), meth(xmeth) { }
494                         int fn(unmarshall &args, marshall &ret) {
495                                 A1 a1;
496                                 A2 a2;
497                                 A3 a3;
498                                 A4 a4;
499                                 A5 a5;
500                                 A6 a6;
501                                 A7 a7;
502                                 R r;
503                                 args >> a1;
504                                 args >> a2;
505                                 args >> a3;
506                                 args >> a4;
507                                 args >> a5;
508                                 args >> a6;
509                                 args >> a7;
510                                 if(!args.okdone())
511                                         return rpc_const::unmarshal_args_failure;
512                                 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
513                                 ret << r;
514                                 return b;
515                         }
516         };
517         reg1(proc, new h1(sob, meth));
518 }
519
520
521 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
522 void make_sockaddr(const char *host, const char *port,
523                 struct sockaddr_in *dst);
524
525 #endif