Split out marshall code into a new file
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include "types.h"
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7
8 #include "thr_pool.h"
9 #include "marshall.h"
10 #include "connection.h"
11
12 class rpc_const {
13     public:
14         static const unsigned int bind = 1;   // handler number reserved for bind
15         static const int timeout_failure = -1;
16         static const int unmarshal_args_failure = -2;
17         static const int unmarshal_reply_failure = -3;
18         static const int atmostonce_failure = -4;
19         static const int oldsrv_failure = -5;
20         static const int bind_failure = -6;
21         static const int cancel_failure = -7;
22 };
23
24 // rpc client endpoint.
25 // manages a xid space per destination socket
26 // threaded: multiple threads can be sending RPCs,
27 class rpcc : public chanmgr {
28
29     private:
30
31         //manages per rpc info
32         struct caller {
33             caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {}
34
35             int xid;
36             unmarshall *un;
37             int intret;
38             bool done = false;
39             mutex m;
40             cond c;
41         };
42
43         void get_refconn(connection **ch);
44         void update_xid_rep(int xid);
45
46
47         sockaddr_in dst_;
48         unsigned int clt_nonce_;
49         unsigned int srv_nonce_;
50         bool bind_done_;
51         int xid_;
52         int lossytest_;
53         bool retrans_;
54         bool reachable_;
55
56         connection *chan_;
57
58         mutex m_; // protect insert/delete to calls[]
59         mutex chan_m_;
60
61         bool destroy_wait_;
62         cond destroy_wait_c_;
63
64         map<int, caller *> calls_;
65         list<int> xid_rep_window_;
66
67         struct request {
68             request() { clear(); }
69             void clear() { buf.clear(); xid = -1; }
70             bool isvalid() { return xid != -1; }
71             string buf;
72             int xid;
73         };
74         struct request dup_req_;
75         int xid_rep_done_;
76     public:
77
78         rpcc(const string & d, bool retrans=true);
79         ~rpcc();
80
81         struct TO {
82             int to;
83         };
84         static const TO to_max;
85         static const TO to_min;
86         static TO to(int x) { TO t; t.to = x; return t;}
87
88         unsigned int id() { return clt_nonce_; }
89
90         int bind(TO to = to_max);
91
92         void set_reachable(bool r) { reachable_ = r; }
93
94         void cancel();
95
96         int islossy() { return lossytest_ > 0; }
97
98         int call1(proc_t proc, 
99                 marshall &req, unmarshall &rep, TO to);
100
101         bool got_pdu(connection *c, char *b, size_t sz);
102
103         template<class R>
104             int call_m(proc_t proc, marshall &req, R & r, TO to);
105
106         template<class R, typename ...Args>
107             inline int call(proc_t proc, R & r, const Args&... args);
108
109         template<class R, typename ...Args>
110             inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
111 };
112
113 template<class R> int 
114 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
115 {
116     unmarshall u;
117     int intret = call1(proc, req, u, to);
118     if (intret < 0) return intret;
119     u >> r;
120     if (u.okdone() != true) {
121         cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
122                 "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
123         VERIFY(0);
124         return rpc_const::unmarshal_reply_failure;
125     }
126     return intret;
127 }
128
129 template<class R, typename... Args> inline int
130 rpcc::call(proc_t proc, R & r, const Args&... args)
131 {
132     return call_timeout(proc, rpcc::to_max, r, args...);
133 }
134
135 template<class R, typename... Args> inline int
136 rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
137 {
138     marshall m{args...};
139     return call_m(proc, m, r, to);
140 }
141
142 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
143
144 // rpc server endpoint.
145 class rpcs : public chanmgr {
146
147     typedef enum {
148         NEW,  // new RPC, not a duplicate
149         INPROGRESS, // duplicate of an RPC we're still processing
150         DONE, // duplicate of an RPC we already replied to (have reply)
151         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
152     } rpcstate_t;
153
154     private:
155
156         // state about an in-progress or completed RPC, for at-most-once.
157         // if cb_present is true, then the RPC is complete and a reply
158         // has been sent; in that case buf points to a copy of the reply,
159         // and sz holds the size of the reply.
160     struct reply_t {
161         reply_t (int _xid) {
162             xid = _xid;
163             cb_present = false;
164             buf = NULL;
165             sz = 0;
166         }
167         reply_t (int _xid, char *_buf, size_t _sz) {
168             xid = _xid;
169             cb_present = true;
170             buf = _buf;
171             sz = _sz;
172         }
173         int xid;
174         bool cb_present; // whether the reply buffer is valid
175         char *buf;      // the reply buffer
176         size_t sz;         // the size of reply buffer
177     };
178
179     unsigned int port_;
180     unsigned int nonce_;
181
182     // provide at most once semantics by maintaining a window of replies
183     // per client that that client hasn't acknowledged receiving yet.
184         // indexed by client nonce.
185     map<unsigned int, list<reply_t> > reply_window_;
186
187     void free_reply_window(void);
188     void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz);
189
190     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
191             int xid, int rep_xid,
192             char **b, size_t *sz);
193
194     void updatestat(proc_t proc);
195
196     // latest connection to the client
197     map<unsigned int, connection *> conns_;
198
199     // counting
200     const size_t counting_;
201     size_t curr_counts_;
202     map<proc_t, size_t> counts_;
203
204     int lossytest_; 
205     bool reachable_;
206
207     // map proc # to function
208     map<proc_t, handler *> procs_;
209
210     mutex procs_m_; // protect insert/delete to procs[]
211     mutex count_m_;  //protect modification of counts
212     mutex reply_window_m_; // protect reply window et al
213     mutex conss_m_; // protect conns_
214
215
216     protected:
217
218     struct djob_t {
219         djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {}
220         char *buf;
221         size_t sz;
222         connection *conn;
223     };
224     void dispatch(djob_t *);
225
226     // internal handler registration
227     void reg1(proc_t proc, handler *);
228
229     ThrPool* dispatchpool_;
230     tcpsconn* listener_;
231
232     public:
233     rpcs(unsigned int port, size_t counts=0);
234     ~rpcs();
235     inline unsigned int port() { return listener_->port(); }
236     //RPC handler for clients binding
237     int rpcbind(unsigned int &r, int a);
238
239     void set_reachable(bool r) { reachable_ = r; }
240
241     bool got_pdu(connection *c, char *b, size_t sz);
242
243     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
244 };
245
246 struct ReturnOnFailure {
247     static inline int unmarshall_args_failure() {
248         return rpc_const::unmarshal_args_failure;
249     }
250 };
251
252 template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
253     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
254 }
255
256 sockaddr_in make_sockaddr(const string &hostandport);
257 sockaddr_in make_sockaddr(const string &host, const string &port);
258
259 #endif