4 #include <sys/socket.h>
5 #include <netinet/in.h>
12 #include "connection.h"
20 static const unsigned int bind = 1; // handler number reserved for bind
21 static const int timeout_failure = -1;
22 static const int unmarshal_args_failure = -2;
23 static const int unmarshal_reply_failure = -3;
24 static const int atmostonce_failure = -4;
25 static const int oldsrv_failure = -5;
26 static const int bind_failure = -6;
27 static const int cancel_failure = -7;
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : public chanmgr {
37 //manages per rpc info
39 caller(unsigned int xxid, unmarshall *un);
47 std::condition_variable c;
50 void get_refconn(connection **ch);
51 void update_xid_rep(unsigned int xid);
55 unsigned int clt_nonce_;
56 unsigned int srv_nonce_;
65 std::mutex m_; // protect insert/delete to calls[]
69 std::condition_variable destroy_wait_c_;
71 std::map<int, caller *> calls_;
72 std::list<unsigned int> xid_rep_window_;
75 request() { clear(); }
76 void clear() { buf.clear(); xid = -1; }
77 bool isvalid() { return xid != -1; }
81 struct request dup_req_;
85 rpcc(sockaddr_in d, bool retrans=true);
91 static const TO to_max;
92 static const TO to_min;
93 static TO to(int x) { TO t; t.to = x; return t;}
95 unsigned int id() { return clt_nonce_; }
97 int bind(TO to = to_max);
99 void set_reachable(bool r) { reachable_ = r; }
103 int islossy() { return lossytest_ > 0; }
105 int call1(unsigned int proc,
106 marshall &req, unmarshall &rep, TO to);
108 bool got_pdu(connection *c, char *b, int sz);
112 int call_m(unsigned int proc, marshall &req, R & r, TO to);
114 template<class R, typename ...Args>
115 inline int call(unsigned int proc, R & r, const Args&... args);
117 template<class R, typename ...Args>
118 inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
121 template<class R> int
122 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to)
125 int intret = call1(proc, req, u, to);
126 if (intret < 0) return intret;
128 if (u.okdone() != true) {
129 fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
130 "You are probably calling RPC 0x%x with wrong return "
133 return rpc_const::unmarshal_reply_failure;
138 template<class R, typename... Args> inline int
139 rpcc::call(unsigned int proc, R & r, const Args&... args)
141 return call_timeout(proc, rpcc::to_max, r, args...);
144 template<class R, typename... Args> inline int
145 rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
148 return call_m(proc, m, r, to);
151 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
156 virtual ~handler() { }
157 virtual int fn(unmarshall &, marshall &) = 0;
161 // rpc server endpoint.
162 class rpcs : public chanmgr {
165 NEW, // new RPC, not a duplicate
166 INPROGRESS, // duplicate of an RPC we're still processing
167 DONE, // duplicate of an RPC we already replied to (have reply)
168 FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
173 // state about an in-progress or completed RPC, for at-most-once.
174 // if cb_present is true, then the RPC is complete and a reply
175 // has been sent; in that case buf points to a copy of the reply,
176 // and sz holds the size of the reply.
178 reply_t (unsigned int _xid) {
184 reply_t (unsigned int _xid, char *_buf, int _sz) {
191 bool cb_present; // whether the reply buffer is valid
192 char *buf; // the reply buffer
193 int sz; // the size of reply buffer
199 // provide at most once semantics by maintaining a window of replies
200 // per client that that client hasn't acknowledged receiving yet.
201 // indexed by client nonce.
202 std::map<unsigned int, std::list<reply_t> > reply_window_;
204 void free_reply_window(void);
205 void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
207 rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
208 unsigned int xid, unsigned int rep_xid,
211 void updatestat(unsigned int proc);
213 // latest connection to the client
214 std::map<unsigned int, connection *> conns_;
219 std::map<int, int> counts_;
224 // map proc # to function
225 std::map<int, handler *> procs_;
227 std::mutex procs_m_; // protect insert/delete to procs[]
228 std::mutex count_m_; //protect modification of counts
229 std::mutex reply_window_m_; // protect reply window et al
230 std::mutex conss_m_; // protect conns_
236 djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
241 void dispatch(djob_t *);
243 // internal handler registration
244 void reg1(unsigned int proc, handler *);
246 ThrPool* dispatchpool_;
250 rpcs(unsigned int port, int counts=0);
252 inline int port() { return listener_->port(); }
253 //RPC handler for clients binding
254 int rpcbind(int a, int &r);
256 void set_reachable(bool r) { reachable_ = r; }
258 bool got_pdu(connection *c, char *b, int sz);
260 // register a handler
261 template<class S, class A1, class R>
262 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
263 template<class S, class A1, class A2, class R>
264 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2,
266 template<class S, class A1, class A2, class A3, class R>
267 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
269 template<class S, class A1, class A2, class A3, class A4, class R>
270 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
271 const A3, const A4, R & r));
272 template<class S, class A1, class A2, class A3, class A4, class A5, class R>
273 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
274 const A3, const A4, const A5,
276 template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
278 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
279 const A3, const A4, const A5,
281 template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
283 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
284 const A3, const A4, const A5,
289 template<class S, class A1, class R> void
290 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
292 class h1 : public handler {
295 int (S::*meth)(const A1 a1, R & r);
297 h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
298 : sob(xsob), meth(xmeth) { }
299 int fn(unmarshall &args, marshall &ret) {
304 return rpc_const::unmarshal_args_failure;
305 int b = (sob->*meth)(a1, r);
310 reg1(proc, new h1(sob, meth));
313 template<class S, class A1, class A2, class R> void
314 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
317 class h1 : public handler {
320 int (S::*meth)(const A1 a1, const A2 a2, R & r);
322 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
323 : sob(xsob), meth(xmeth) { }
324 int fn(unmarshall &args, marshall &ret) {
331 return rpc_const::unmarshal_args_failure;
332 int b = (sob->*meth)(a1, a2, r);
337 reg1(proc, new h1(sob, meth));
340 template<class S, class A1, class A2, class A3, class R> void
341 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
344 class h1 : public handler {
347 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
349 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
350 : sob(xsob), meth(xmeth) { }
351 int fn(unmarshall &args, marshall &ret) {
360 return rpc_const::unmarshal_args_failure;
361 int b = (sob->*meth)(a1, a2, a3, r);
366 reg1(proc, new h1(sob, meth));
369 template<class S, class A1, class A2, class A3, class A4, class R> void
370 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
371 const A3 a3, const A4 a4,
374 class h1 : public handler {
377 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
379 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
381 : sob(xsob), meth(xmeth) { }
382 int fn(unmarshall &args, marshall &ret) {
393 return rpc_const::unmarshal_args_failure;
394 int b = (sob->*meth)(a1, a2, a3, a4, r);
399 reg1(proc, new h1(sob, meth));
402 template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
403 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
404 const A3 a3, const A4 a4,
407 class h1 : public handler {
410 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
413 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
414 const A4 a4, const A5 a5, R & r))
415 : sob(xsob), meth(xmeth) { }
416 int fn(unmarshall &args, marshall &ret) {
429 return rpc_const::unmarshal_args_failure;
430 int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
435 reg1(proc, new h1(sob, meth));
438 template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
439 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
440 const A3 a3, const A4 a4,
441 const A5 a5, const A6 a6,
444 class h1 : public handler {
447 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
448 const A5 a5, const A6 a6, R & r);
450 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
451 const A4 a4, const A5 a5, const A6 a6, R & r))
452 : sob(xsob), meth(xmeth) { }
453 int fn(unmarshall &args, marshall &ret) {
468 return rpc_const::unmarshal_args_failure;
469 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
474 reg1(proc, new h1(sob, meth));
477 template<class S, class A1, class A2, class A3, class A4, class A5,
478 class A6, class A7, class R> void
479 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
480 const A3 a3, const A4 a4,
481 const A5 a5, const A6 a6,
484 class h1 : public handler {
487 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
488 const A5 a5, const A6 a6, const A7 a7, R & r);
490 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
491 const A4 a4, const A5 a5, const A6 a6,
493 : sob(xsob), meth(xmeth) { }
494 int fn(unmarshall &args, marshall &ret) {
511 return rpc_const::unmarshal_args_failure;
512 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
517 reg1(proc, new h1(sob, meth));
521 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
522 void make_sockaddr(const char *host, const char *port,
523 struct sockaddr_in *dst);