cond c;
};
- void get_refconn(shared_ptr<connection> & ch);
- void update_xid_rep(xid_t xid);
+ void get_latest_connection(shared_ptr<connection> & ch);
+ void update_xid_rep(xid_t xid, lock & m_lock);
sockaddr_in dst_;
nonce_t clt_nonce_;
- nonce_t srv_nonce_;
- bool bind_done_;
- xid_t xid_;
- int lossytest_;
- bool retrans_;
- bool reachable_;
+ nonce_t srv_nonce_ = 0;
+ bool bind_done_ = false;
+ int lossytest_ = 0;
+ bool reachable_ = true;
shared_ptr<connection> chan_;
mutex m_; // protect insert/delete to calls[]
mutex chan_m_;
- bool destroy_wait_;
+ bool destroy_wait_ = false;
cond destroy_wait_c_;
map<int, caller *> calls_;
- list<xid_t> xid_rep_window_;
+
+ // xid starts with 1 and latest received reply starts with 0
+ xid_t xid_ = 1;
+ list<xid_t> xid_rep_window_ = {0};
struct request {
void clear() { buf.clear(); xid = -1; }
xid_t xid = -1;
};
request dup_req_;
- int xid_rep_done_;
+ int xid_rep_done_ = -1;
- int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to);
+ int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req);
template<class R>
- int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) {
+ inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
string rep;
- int intret = call1(proc, req, rep, to);
- unmarshall u(rep, true);
+ int intret = call1(proc, to, rep, req);
if (intret < 0) return intret;
- u >> r;
+ unmarshall u(rep, true, r);
if (u.okdone() != true) {
LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " <<
"calling RPC 0x" << hex << proc << " with the wrong return type.");
VERIFY(0);
- return rpc_protocol::unmarshal_reply_failure;
+ return rpc_protocol::unmarshall_reply_failure;
}
return intret;
}
public:
- rpcc(const string & d, bool retrans=true);
+ rpcc(const string & d);
~rpcc();
nonce_t id() { return clt_nonce_; }
void cancel();
template<class P, class R, typename ...Args>
- inline int call(proc_t<P> proc, R & r, const Args&... args) {
+ inline int call(proc_t<P> proc, R & r, const Args & ... args) {
return call_timeout(proc, rpc::to_max, r, args...);
}
template<class P, class R, typename ...Args>
- inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args&... args) {
+ inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
- marshall m{args...};
- return call_m(proc.id, m, r, to);
+ return call_m(proc.id, to, r, forward<marshall>(marshall(args...)));
}
};
// indexed by client nonce.
map<nonce_t, list<reply_t>> reply_window_;
- void free_reply_window(void);
void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
- rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+ rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
xid_t rep_xid, string & b);
// latest connection to the client
map<nonce_t, shared_ptr<connection>> conns_;
- bool reachable_;
+ bool reachable_ = true;
// map proc # to function
map<proc_id_t, handler *> procs_;
void dispatch(shared_ptr<connection> c, const string & buf);
- // internal handler registration
- void reg1(proc_id_t proc, handler *);
-
- unique_ptr<thread_pool> dispatchpool_;
+ unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
unique_ptr<connection_listener> listener_;
// RPC handler for clients binding
- rpc_protocol::status rpcbind(nonce_t &r);
+ rpc_protocol::status rpcbind(nonce_t & r);
bool got_pdu(const shared_ptr<connection> & c, const string & b);
static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
- return rpc_protocol::unmarshal_args_failure;
+ return rpc_protocol::unmarshall_args_failure;
}
};
- reg1(proc.id, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
+ lock pl(procs_m_);
+ VERIFY(procs_.count(proc.id) == 0);
+ procs_[proc.id] = marshalled_func<F, ReturnOnFailure>::wrap(f, c);
+ VERIFY(procs_.count(proc.id) >= 1);
}
void start();