TEMPLATE MAGIC FOR GREAT JUSTICE
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <list>
7 #include <map>
8 #include <stdio.h>
9
10 #include "thr_pool.h"
11 #include "marshall.h"
12 #include "connection.h"
13
14 #ifdef DMALLOC
15 #include "dmalloc.h"
16 #endif
17
18 class rpc_const {
19     public:
20         static const unsigned int bind = 1;   // handler number reserved for bind
21         static const int timeout_failure = -1;
22         static const int unmarshal_args_failure = -2;
23         static const int unmarshal_reply_failure = -3;
24         static const int atmostonce_failure = -4;
25         static const int oldsrv_failure = -5;
26         static const int bind_failure = -6;
27         static const int cancel_failure = -7;
28 };
29
30 struct ReturnOnFailure {
31     static inline int unmarshall_args_failure() {
32         return rpc_const::unmarshal_args_failure;
33     }
34 };
35
36 // rpc client endpoint.
37 // manages a xid space per destination socket
38 // threaded: multiple threads can be sending RPCs,
39 class rpcc : public chanmgr {
40
41     private:
42
43         //manages per rpc info
44         struct caller {
45             caller(unsigned int xxid, unmarshall *un);
46             ~caller();
47
48             unsigned int xid;
49             unmarshall *un;
50             int intret;
51             bool done;
52             std::mutex m;
53             std::condition_variable c;
54         };
55
56         void get_refconn(connection **ch);
57         void update_xid_rep(unsigned int xid);
58
59
60         sockaddr_in dst_;
61         unsigned int clt_nonce_;
62         unsigned int srv_nonce_;
63         bool bind_done_;
64         unsigned int xid_;
65         int lossytest_;
66         bool retrans_;
67         bool reachable_;
68
69         connection *chan_;
70
71         std::mutex m_; // protect insert/delete to calls[]
72         std::mutex chan_m_;
73
74         bool destroy_wait_;
75         std::condition_variable destroy_wait_c_;
76
77         std::map<int, caller *> calls_;
78         std::list<unsigned int> xid_rep_window_;
79                 
80                 struct request {
81                     request() { clear(); }
82                     void clear() { buf.clear(); xid = -1; }
83                     bool isvalid() { return xid != -1; }
84                     std::string buf;
85                     int xid;
86                 };
87                 struct request dup_req_;
88                 int xid_rep_done_;
89     public:
90
91         rpcc(sockaddr_in d, bool retrans=true);
92         ~rpcc();
93
94         struct TO {
95             int to;
96         };
97         static const TO to_max;
98         static const TO to_min;
99         static TO to(int x) { TO t; t.to = x; return t;}
100
101         unsigned int id() { return clt_nonce_; }
102
103         int bind(TO to = to_max);
104
105         void set_reachable(bool r) { reachable_ = r; }
106
107         void cancel();
108                 
109                 int islossy() { return lossytest_ > 0; }
110
111         int call1(unsigned int proc, 
112                 marshall &req, unmarshall &rep, TO to);
113
114         bool got_pdu(connection *c, char *b, int sz);
115
116
117         template<class R>
118             int call_m(unsigned int proc, marshall &req, R & r, TO to);
119
120         template<class R, typename ...Args>
121             inline int call(unsigned int proc, R & r, const Args&... args);
122
123         template<class R, typename ...Args>
124             inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
125 };
126
127 template<class R> int 
128 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
129 {
130     unmarshall u;
131     int intret = call1(proc, req, u, to);
132     if (intret < 0) return intret;
133     u >> r;
134     if (u.okdone() != true) {
135         fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
136                 "You are probably calling RPC 0x%x with wrong return "
137                 "type.\n", proc);
138         VERIFY(0);
139         return rpc_const::unmarshal_reply_failure;
140     }
141     return intret;
142 }
143
144 template<class R, typename... Args> inline int
145 rpcc::call(unsigned int proc, R & r, const Args&... args)
146 {
147     return call_timeout(proc, rpcc::to_max, r, args...);
148 }
149
150 template<class R, typename... Args> inline int
151 rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
152 {
153     marshall m{args...};
154     return call_m(proc, m, r, to);
155 }
156
157 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
158
159 // rpc server endpoint.
160 class rpcs : public chanmgr {
161
162     typedef enum {
163         NEW,  // new RPC, not a duplicate
164         INPROGRESS, // duplicate of an RPC we're still processing
165         DONE, // duplicate of an RPC we already replied to (have reply)
166         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
167     } rpcstate_t;
168
169     private:
170
171         // state about an in-progress or completed RPC, for at-most-once.
172         // if cb_present is true, then the RPC is complete and a reply
173         // has been sent; in that case buf points to a copy of the reply,
174         // and sz holds the size of the reply.
175     struct reply_t {
176         reply_t (unsigned int _xid) {
177             xid = _xid;
178             cb_present = false;
179             buf = NULL;
180             sz = 0;
181         }
182         reply_t (unsigned int _xid, char *_buf, int _sz) {
183             xid = _xid;
184             cb_present = true;
185             buf = _buf;
186             sz = _sz;
187         }
188         unsigned int xid;
189         bool cb_present; // whether the reply buffer is valid
190         char *buf;      // the reply buffer
191         int sz;         // the size of reply buffer
192     };
193
194     int port_;
195     unsigned int nonce_;
196
197     // provide at most once semantics by maintaining a window of replies
198     // per client that that client hasn't acknowledged receiving yet.
199         // indexed by client nonce.
200     std::map<unsigned int, std::list<reply_t> > reply_window_;
201
202     void free_reply_window(void);
203     void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
204
205     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
206             unsigned int xid, unsigned int rep_xid,
207             char **b, int *sz);
208
209     void updatestat(unsigned int proc);
210
211     // latest connection to the client
212     std::map<unsigned int, connection *> conns_;
213
214     // counting
215     const int counting_;
216     int curr_counts_;
217     std::map<int, int> counts_;
218
219     int lossytest_; 
220     bool reachable_;
221
222     // map proc # to function
223     std::map<int, handler *> procs_;
224
225     std::mutex procs_m_; // protect insert/delete to procs[]
226     std::mutex count_m_;  //protect modification of counts
227     std::mutex reply_window_m_; // protect reply window et al
228     std::mutex conss_m_; // protect conns_
229
230
231     protected:
232
233     struct djob_t {
234         djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
235         char *buf;
236         int sz;
237         connection *conn;
238     };
239     void dispatch(djob_t *);
240
241     // internal handler registration
242     void reg1(unsigned int proc, handler *);
243
244     ThrPool* dispatchpool_;
245     tcpsconn* listener_;
246
247     public:
248     rpcs(unsigned int port, int counts=0);
249     ~rpcs();
250     inline int port() { return listener_->port(); }
251     //RPC handler for clients binding
252     int rpcbind(int &r, int a);
253
254     void set_reachable(bool r) { reachable_ = r; }
255
256     bool got_pdu(connection *c, char *b, int sz);
257
258     template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
259 };
260
261 template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
262     reg1(proc, marshalled_func<F, F, ReturnOnFailure>::wrap(f, c));
263 }
264
265 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
266 void make_sockaddr(const char *host, const char *port,
267         struct sockaddr_in *dst);
268
269 #endif