4 #include <sys/socket.h>
5 #include <netinet/in.h>
12 #include "connection.h"
20 static const unsigned int bind = 1; // handler number reserved for bind
21 static const int timeout_failure = -1;
22 static const int unmarshal_args_failure = -2;
23 static const int unmarshal_reply_failure = -3;
24 static const int atmostonce_failure = -4;
25 static const int oldsrv_failure = -5;
26 static const int bind_failure = -6;
27 static const int cancel_failure = -7;
30 // rpc client endpoint.
31 // manages a xid space per destination socket
32 // threaded: multiple threads can be sending RPCs,
33 class rpcc : public chanmgr {
37 //manages per rpc info
39 caller(unsigned int xxid, unmarshall *un);
50 void get_refconn(connection **ch);
51 void update_xid_rep(unsigned int xid);
55 unsigned int clt_nonce_;
56 unsigned int srv_nonce_;
65 pthread_mutex_t m_; // protect insert/delete to calls[]
66 pthread_mutex_t chan_m_;
69 pthread_cond_t destroy_wait_c_;
71 std::map<int, caller *> calls_;
72 std::list<unsigned int> xid_rep_window_;
75 request() { clear(); }
76 void clear() { buf.clear(); xid = -1; }
77 bool isvalid() { return xid != -1; }
81 struct request dup_req_;
85 rpcc(sockaddr_in d, bool retrans=true);
91 static const TO to_max;
92 static const TO to_min;
93 static TO to(int x) { TO t; t.to = x; return t;}
95 unsigned int id() { return clt_nonce_; }
97 int bind(TO to = to_max);
99 void set_reachable(bool r) { reachable_ = r; }
103 int islossy() { return lossytest_ > 0; }
105 int call1(unsigned int proc,
106 marshall &req, unmarshall &rep, TO to);
108 bool got_pdu(connection *c, char *b, int sz);
112 int call_m(unsigned int proc, marshall &req, R & r, TO to);
115 int call(unsigned int proc, R & r, TO to = to_max);
116 template<class R, class A1>
117 int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max);
118 template<class R, class A1, class A2>
119 int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r,
121 template<class R, class A1, class A2, class A3>
122 int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
123 R & r, TO to = to_max);
124 template<class R, class A1, class A2, class A3, class A4>
125 int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
126 const A4 & a4, R & r, TO to = to_max);
127 template<class R, class A1, class A2, class A3, class A4, class A5>
128 int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
129 const A4 & a4, const A5 & a5, R & r, TO to = to_max);
130 template<class R, class A1, class A2, class A3, class A4, class A5,
132 int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
133 const A4 & a4, const A5 & a5, const A6 & a6,
134 R & r, TO to = to_max);
135 template<class R, class A1, class A2, class A3, class A4, class A5,
137 int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3,
138 const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7,
139 R & r, TO to = to_max);
143 template<class R> int
144 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to)
147 int intret = call1(proc, req, u, to);
148 if (intret < 0) return intret;
150 if(u.okdone() != true) {
151 fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
152 "You are probably calling RPC 0x%x with wrong return "
155 return rpc_const::unmarshal_reply_failure;
160 template<class R> int
161 rpcc::call(unsigned int proc, R & r, TO to)
164 return call_m(proc, m, r, to);
167 template<class R, class A1> int
168 rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to)
172 return call_m(proc, m, r, to);
175 template<class R, class A1, class A2> int
176 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
182 return call_m(proc, m, r, to);
185 template<class R, class A1, class A2, class A3> int
186 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
187 const A3 & a3, R & r, TO to)
193 return call_m(proc, m, r, to);
196 template<class R, class A1, class A2, class A3, class A4> int
197 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
198 const A3 & a3, const A4 & a4, R & r, TO to)
205 return call_m(proc, m, r, to);
208 template<class R, class A1, class A2, class A3, class A4, class A5> int
209 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
210 const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to)
218 return call_m(proc, m, r, to);
221 template<class R, class A1, class A2, class A3, class A4, class A5,
223 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
224 const A3 & a3, const A4 & a4, const A5 & a5,
225 const A6 & a6, R & r, TO to)
234 return call_m(proc, m, r, to);
237 template<class R, class A1, class A2, class A3, class A4, class A5,
238 class A6, class A7> int
239 rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
240 const A3 & a3, const A4 & a4, const A5 & a5,
241 const A6 & a6, const A7 & a7,
252 return call_m(proc, m, r, to);
255 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
260 virtual ~handler() { }
261 virtual int fn(unmarshall &, marshall &) = 0;
265 // rpc server endpoint.
266 class rpcs : public chanmgr {
269 NEW, // new RPC, not a duplicate
270 INPROGRESS, // duplicate of an RPC we're still processing
271 DONE, // duplicate of an RPC we already replied to (have reply)
272 FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
277 // state about an in-progress or completed RPC, for at-most-once.
278 // if cb_present is true, then the RPC is complete and a reply
279 // has been sent; in that case buf points to a copy of the reply,
280 // and sz holds the size of the reply.
282 reply_t (unsigned int _xid) {
288 reply_t (unsigned int _xid, char *_buf, int _sz) {
295 bool cb_present; // whether the reply buffer is valid
296 char *buf; // the reply buffer
297 int sz; // the size of reply buffer
303 // provide at most once semantics by maintaining a window of replies
304 // per client that that client hasn't acknowledged receiving yet.
305 // indexed by client nonce.
306 std::map<unsigned int, std::list<reply_t> > reply_window_;
308 void free_reply_window(void);
309 void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
311 rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
312 unsigned int xid, unsigned int rep_xid,
315 void updatestat(unsigned int proc);
317 // latest connection to the client
318 std::map<unsigned int, connection *> conns_;
323 std::map<int, int> counts_;
328 // map proc # to function
329 std::map<int, handler *> procs_;
331 pthread_mutex_t procs_m_; // protect insert/delete to procs[]
332 pthread_mutex_t count_m_; //protect modification of counts
333 pthread_mutex_t reply_window_m_; // protect reply window et al
334 pthread_mutex_t conss_m_; // protect conns_
340 djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
345 void dispatch(djob_t *);
347 // internal handler registration
348 void reg1(unsigned int proc, handler *);
350 ThrPool* dispatchpool_;
354 rpcs(unsigned int port, int counts=0);
356 inline int port() { return listener_->port(); }
357 //RPC handler for clients binding
358 int rpcbind(int a, int &r);
360 void set_reachable(bool r) { reachable_ = r; }
362 bool got_pdu(connection *c, char *b, int sz);
364 // register a handler
365 template<class S, class A1, class R>
366 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
367 template<class S, class A1, class A2, class R>
368 void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2,
370 template<class S, class A1, class A2, class A3, class R>
371 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
373 template<class S, class A1, class A2, class A3, class A4, class R>
374 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
375 const A3, const A4, R & r));
376 template<class S, class A1, class A2, class A3, class A4, class A5, class R>
377 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
378 const A3, const A4, const A5,
380 template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
382 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
383 const A3, const A4, const A5,
385 template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
387 void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
388 const A3, const A4, const A5,
393 template<class S, class A1, class R> void
394 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
396 class h1 : public handler {
399 int (S::*meth)(const A1 a1, R & r);
401 h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
402 : sob(xsob), meth(xmeth) { }
403 int fn(unmarshall &args, marshall &ret) {
408 return rpc_const::unmarshal_args_failure;
409 int b = (sob->*meth)(a1, r);
414 reg1(proc, new h1(sob, meth));
417 template<class S, class A1, class A2, class R> void
418 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
421 class h1 : public handler {
424 int (S::*meth)(const A1 a1, const A2 a2, R & r);
426 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
427 : sob(xsob), meth(xmeth) { }
428 int fn(unmarshall &args, marshall &ret) {
435 return rpc_const::unmarshal_args_failure;
436 int b = (sob->*meth)(a1, a2, r);
441 reg1(proc, new h1(sob, meth));
444 template<class S, class A1, class A2, class A3, class R> void
445 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
448 class h1 : public handler {
451 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
453 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
454 : sob(xsob), meth(xmeth) { }
455 int fn(unmarshall &args, marshall &ret) {
464 return rpc_const::unmarshal_args_failure;
465 int b = (sob->*meth)(a1, a2, a3, r);
470 reg1(proc, new h1(sob, meth));
473 template<class S, class A1, class A2, class A3, class A4, class R> void
474 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
475 const A3 a3, const A4 a4,
478 class h1 : public handler {
481 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
483 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
485 : sob(xsob), meth(xmeth) { }
486 int fn(unmarshall &args, marshall &ret) {
497 return rpc_const::unmarshal_args_failure;
498 int b = (sob->*meth)(a1, a2, a3, a4, r);
503 reg1(proc, new h1(sob, meth));
506 template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
507 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
508 const A3 a3, const A4 a4,
511 class h1 : public handler {
514 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
517 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
518 const A4 a4, const A5 a5, R & r))
519 : sob(xsob), meth(xmeth) { }
520 int fn(unmarshall &args, marshall &ret) {
533 return rpc_const::unmarshal_args_failure;
534 int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
539 reg1(proc, new h1(sob, meth));
542 template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
543 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
544 const A3 a3, const A4 a4,
545 const A5 a5, const A6 a6,
548 class h1 : public handler {
551 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
552 const A5 a5, const A6 a6, R & r);
554 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
555 const A4 a4, const A5 a5, const A6 a6, R & r))
556 : sob(xsob), meth(xmeth) { }
557 int fn(unmarshall &args, marshall &ret) {
572 return rpc_const::unmarshal_args_failure;
573 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
578 reg1(proc, new h1(sob, meth));
581 template<class S, class A1, class A2, class A3, class A4, class A5,
582 class A6, class A7, class R> void
583 rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
584 const A3 a3, const A4 a4,
585 const A5 a5, const A6 a6,
588 class h1 : public handler {
591 int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
592 const A5 a5, const A6 a6, const A7 a7, R & r);
594 h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
595 const A4 a4, const A5 a5, const A6 a6,
597 : sob(xsob), meth(xmeth) { }
598 int fn(unmarshall &args, marshall &ret) {
615 return rpc_const::unmarshal_args_failure;
616 int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
621 reg1(proc, new h1(sob, meth));
625 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
626 void make_sockaddr(const char *host, const char *port,
627 struct sockaddr_in *dst);
629 int cmp_timespec(const struct timespec &a, const struct timespec &b);
630 void add_timespec(const struct timespec &a, int b, struct timespec *result);
631 int diff_timespec(const struct timespec &a, const struct timespec &b);