065cabccae53b43f799510b7f9ebc1bca4b8ce74
[invirt/third/libt4.git] / rpc / rpc.h
1 #ifndef rpc_h
2 #define rpc_h
3
4 #include "types.h"
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7
8 #include "thr_pool.h"
9 #include "marshall.h"
10 #include "marshall_wrap.h"
11 #include "connection.h"
12
13 class rpc_const {
14     public:
15         static const unsigned int bind = 1;   // handler number reserved for bind
16         static const int timeout_failure = -1;
17         static const int unmarshal_args_failure = -2;
18         static const int unmarshal_reply_failure = -3;
19         static const int atmostonce_failure = -4;
20         static const int oldsrv_failure = -5;
21         static const int bind_failure = -6;
22         static const int cancel_failure = -7;
23 };
24
25 // rpc client endpoint.
26 // manages a xid space per destination socket
27 // threaded: multiple threads can be sending RPCs,
28 class rpcc : public chanmgr {
29
30     private:
31
32         //manages per rpc info
33         struct caller {
34             caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
35
36             int xid;
37             string *rep;
38             int intret;
39             bool done = false;
40             mutex m;
41             cond c;
42         };
43
44         void get_refconn(connection **ch);
45         void update_xid_rep(int xid);
46
47
48         sockaddr_in dst_;
49         unsigned int clt_nonce_;
50         unsigned int srv_nonce_;
51         bool bind_done_;
52         int xid_;
53         int lossytest_;
54         bool retrans_;
55         bool reachable_;
56
57         connection *chan_;
58
59         mutex m_; // protect insert/delete to calls[]
60         mutex chan_m_;
61
62         bool destroy_wait_;
63         cond destroy_wait_c_;
64
65         map<int, caller *> calls_;
66         list<int> xid_rep_window_;
67
68         struct request {
69             request() { clear(); }
70             void clear() { buf.clear(); xid = -1; }
71             bool isvalid() { return xid != -1; }
72             string buf;
73             int xid;
74         };
75         struct request dup_req_;
76         int xid_rep_done_;
77     public:
78
79         rpcc(const string & d, bool retrans=true);
80         ~rpcc();
81
82         struct TO {
83             int to;
84         };
85         static const TO to_max;
86         static const TO to_min;
87         static TO to(int x) { TO t; t.to = x; return t;}
88
89         unsigned int id() { return clt_nonce_; }
90
91         int bind(TO to = to_max);
92
93         void set_reachable(bool r) { reachable_ = r; }
94
95         void cancel();
96
97         int islossy() { return lossytest_ > 0; }
98
99         int call1(proc_t proc, marshall &req, string &rep, TO to);
100
101         bool got_pdu(connection *c, const string & b);
102
103         template<class R>
104             int call_m(proc_t proc, marshall &req, R & r, TO to);
105
106         template<class R, typename ...Args>
107             inline int call(proc_t proc, R & r, const Args&... args);
108
109         template<class R, typename ...Args>
110             inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
111 };
112
113 template<class R> int 
114 rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) 
115 {
116     string rep;
117     int intret = call1(proc, req, rep, to);
118     unmarshall u(rep, true);
119     if (intret < 0) return intret;
120     u >> r;
121     if (u.okdone() != true) {
122         cerr << "rpcc::call_m: failed to unmarshall the reply.  You are probably " <<
123                 "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
124         VERIFY(0);
125         return rpc_const::unmarshal_reply_failure;
126     }
127     return intret;
128 }
129
130 template<class R, typename... Args> inline int
131 rpcc::call(proc_t proc, R & r, const Args&... args)
132 {
133     return call_timeout(proc, rpcc::to_max, r, args...);
134 }
135
136 template<class R, typename... Args> inline int
137 rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
138 {
139     marshall m{args...};
140     return call_m(proc, m, r, to);
141 }
142
143 // rpc server endpoint.
144 class rpcs : public chanmgr {
145
146     typedef enum {
147         NEW,  // new RPC, not a duplicate
148         INPROGRESS, // duplicate of an RPC we're still processing
149         DONE, // duplicate of an RPC we already replied to (have reply)
150         FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
151     } rpcstate_t;
152
153     private:
154
155         // state about an in-progress or completed RPC, for at-most-once.
156         // if cb_present is true, then the RPC is complete and a reply
157         // has been sent; in that case buf points to a copy of the reply,
158         // and sz holds the size of the reply.
159     struct reply_t {
160         reply_t (int _xid) : xid(_xid), cb_present(false) {}
161         reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
162         int xid;
163         bool cb_present; // whether the reply buffer is valid
164         string buf;      // the reply buffer
165     };
166
167     in_port_t port_;
168     unsigned int nonce_;
169
170     // provide at most once semantics by maintaining a window of replies
171     // per client that that client hasn't acknowledged receiving yet.
172         // indexed by client nonce.
173     map<unsigned int, list<reply_t> > reply_window_;
174
175     void free_reply_window(void);
176     void add_reply(unsigned int clt_nonce, int xid, const string & b);
177
178     rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
179             int xid, int rep_xid, string & b);
180
181     void updatestat(proc_t proc);
182
183     // latest connection to the client
184     map<unsigned int, connection *> conns_;
185
186     // counting
187     const size_t counting_;
188     size_t curr_counts_;
189     map<proc_t, size_t> counts_;
190
191     bool reachable_;
192
193     // map proc # to function
194     map<proc_t, handler *> procs_;
195
196     mutex procs_m_; // protect insert/delete to procs[]
197     mutex count_m_;  //protect modification of counts
198     mutex reply_window_m_; // protect reply window et al
199     mutex conss_m_; // protect conns_
200
201
202     protected:
203
204     struct djob_t {
205         connection *conn;
206         string buf;
207     };
208     void dispatch(djob_t *);
209
210     // internal handler registration
211     void reg1(proc_t proc, handler *);
212
213     ThrPool* dispatchpool_;
214     tcpsconn *listener_;
215
216     public:
217     rpcs(in_port_t port, size_t counts=0);
218     ~rpcs();
219     inline in_port_t port() { return listener_->port(); }
220     //RPC handler for clients binding
221     int rpcbind(unsigned int &r, int a);
222
223     void set_reachable(bool r) { reachable_ = r; }
224
225     bool got_pdu(connection *c, const string & b);
226
227     template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
228 };
229
230 struct ReturnOnFailure {
231     static inline int unmarshall_args_failure() {
232         return rpc_const::unmarshal_args_failure;
233     }
234 };
235
236 template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
237     reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
238 }
239
240 #endif