2b32e2867185192e8c3b8dff9770eec6d3d53e89
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include "types.h"
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7
8 #include "thr_pool.h"
9 #include "marshall.h"
10 #include "connection.h"
11
12 class rpc_const {
13     public:
14         static const unsigned int bind = 1;   // handler number reserved for bind
15         static const int timeout_failure = -1;
16         static const int unmarshal_args_failure = -2;
17         static const int unmarshal_reply_failure = -3;
18         static const int atmostonce_failure = -4;
19         static const int oldsrv_failure = -5;
20         static const int bind_failure = -6;
21         static const int cancel_failure = -7;
22 };
23
24 // rpc client endpoint.
25 // manages a xid space per destination socket
26 // threaded: multiple threads can be sending RPCs,
27 class rpcc : public chanmgr {
28
29     private:
30
31         //manages per rpc info
32         struct caller {
33             caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
34
35             int xid;
36             string *rep;
37             int intret;
38             bool done = false;
39             mutex m;
40             cond c;
41         };
42
43         void get_refconn(connection **ch);
44         void update_xid_rep(int xid);
45
46
47         sockaddr_in dst_;
48         unsigned int clt_nonce_;
49         unsigned int srv_nonce_;
50         bool bind_done_;
51         int xid_;
52         int lossytest_;
53         bool retrans_;
54         bool reachable_;
55
56         connection *chan_;
57
58         mutex m_; // protect insert/delete to calls[]
59         mutex chan_m_;
60
61         bool destroy_wait_;
62         cond destroy_wait_c_;
63
64         map<int, caller *> calls_;
65         list<int> xid_rep_window_;
66
67         struct request {
68             request() { clear(); }
69             void clear() { buf.clear(); xid = -1; }
70             bool isvalid() { return xid != -1; }
71             string buf;
72             int xid;
73         };
74         struct request dup_req_;
75         int xid_rep_done_;
76     public:
77
78         rpcc(const string & d, bool retrans=true);
79         ~rpcc();
80
81         struct TO {
82             int to;
83         };
84         static const TO to_max;
85         static const TO to_min;
86         static TO to(int x) { TO t; t.to = x; return t;}
87
88         unsigned int id() { return clt_nonce_; }
89
90         int bind(TO to = to_max);
91
92         void set_reachable(bool r) { reachable_ = r; }
93
94         void cancel();
95
96         int islossy() { return lossytest_ > 0; }
97
98         int call1(proc_t proc, marshall &req, string &rep, TO to);
99
100         bool got_pdu(connection *c, const string & b);
101
102         template<class R>
103             int call_m(proc_t proc, marshall &req, R & r, TO to);
104
105         template<class R, typename ...Args>
106             inline int call(proc_t proc, R & r, const Args&... args);
107
108         template<class R, typename ...Args>
109             inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
110 };
111
112 template<class R> int 
113 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
114 {
115     string rep;
116     int intret = call1(proc, req, rep, to);
117     unmarshall u(rep, true);
118     if (intret < 0) return intret;
119     u >> r;
120     if (u.okdone() != true) {
121         cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
122                 "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
123         VERIFY(0);
124         return rpc_const::unmarshal_reply_failure;
125     }
126     return intret;
127 }
128
129 template<class R, typename... Args> inline int
130 rpcc::call(proc_t proc, R & r, const Args&... args)
131 {
132     return call_timeout(proc, rpcc::to_max, r, args...);
133 }
134
135 template<class R, typename... Args> inline int
136 rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
137 {
138     marshall m{args...};
139     return call_m(proc, m, r, to);
140 }
141
142 // rpc server endpoint.
143 class rpcs : public chanmgr {
144
145     typedef enum {
146         NEW,  // new RPC, not a duplicate
147         INPROGRESS, // duplicate of an RPC we're still processing
148         DONE, // duplicate of an RPC we already replied to (have reply)
149         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
150     } rpcstate_t;
151
152     private:
153
154         // state about an in-progress or completed RPC, for at-most-once.
155         // if cb_present is true, then the RPC is complete and a reply
156         // has been sent; in that case buf points to a copy of the reply,
157         // and sz holds the size of the reply.
158     struct reply_t {
159         reply_t (int _xid) : xid(_xid), cb_present(false) {}
160         reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
161         int xid;
162         bool cb_present; // whether the reply buffer is valid
163         string buf;      // the reply buffer
164     };
165
166     in_port_t port_;
167     unsigned int nonce_;
168
169     // provide at most once semantics by maintaining a window of replies
170     // per client that that client hasn't acknowledged receiving yet.
171         // indexed by client nonce.
172     map<unsigned int, list<reply_t> > reply_window_;
173
174     void free_reply_window(void);
175     void add_reply(unsigned int clt_nonce, int xid, const string & b);
176
177     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
178             int xid, int rep_xid, string & b);
179
180     void updatestat(proc_t proc);
181
182     // latest connection to the client
183     map<unsigned int, connection *> conns_;
184
185     // counting
186     const size_t counting_;
187     size_t curr_counts_;
188     map<proc_t, size_t> counts_;
189
190     bool reachable_;
191
192     // map proc # to function
193     map<proc_t, handler *> procs_;
194
195     mutex procs_m_; // protect insert/delete to procs[]
196     mutex count_m_;  //protect modification of counts
197     mutex reply_window_m_; // protect reply window et al
198     mutex conss_m_; // protect conns_
199
200
201     protected:
202
203     struct djob_t {
204         connection *conn;
205         string buf;
206     };
207     void dispatch(djob_t *);
208
209     // internal handler registration
210     void reg1(proc_t proc, handler *);
211
212     ThrPool* dispatchpool_;
213     tcpsconn *listener_;
214
215     public:
216     rpcs(in_port_t port, size_t counts=0);
217     ~rpcs();
218     inline in_port_t port() { return listener_->port(); }
219     //RPC handler for clients binding
220     int rpcbind(unsigned int &r, int a);
221
222     void set_reachable(bool r) { reachable_ = r; }
223
224     bool got_pdu(connection *c, const string & b);
225
226     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
227 };
228
229 struct ReturnOnFailure {
230     static inline int unmarshall_args_failure() {
231         return rpc_const::unmarshal_args_failure;
232     }
233 };
234
235 template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
236     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
237 }
238
239 #endif