}
}
-int rpcc::call_marshalled(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
+int rpcc::call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, string & rep, const marshall & req) {
caller ca(0, &rep);
xid_t xid_rep;
+ marshall datagram = req;
{
lock ml(m_);
- if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
+ if ((proc.id != rpc_protocol::bind.id && !bind_done_) || (proc.id == rpc_protocol::bind.id && bind_done_)) {
IF_LEVEL(1) LOG << "rpcc has not been bound to dst or binding twice";
return rpc_protocol::bind_failure;
}
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.write_header(rpc_protocol::request_header{
- ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
+ datagram.write_header(rpc_protocol::request_header{
+ ca.xid, proc.id, clt_nonce_, srv_nonce_, xid_rep_window_.front()
});
xid_rep = xid_rep_window_.front();
}
}
if (forgot.isvalid())
ch->send(forgot.buf);
- ch->send(req);
+ ch->send(datagram);
}
else IF_LEVEL(1) LOG << "not reachable";
- IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc
+ IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc.id
<< " xid " << std::dec << ca.xid << " clt_nonce " << clt_nonce_;
}
transmit = false; // only send once on a given channel
{
lock ml(m_);
if (!dup_req_.isvalid()) {
- dup_req_.buf = req;
+ dup_req_.buf = datagram;
dup_req_.xid = ca.xid;
}
if (xid_rep > xid_rep_done_)
lock cal(ca.m);
- IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc
+ IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc.id
<< " xid " << std::dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":"
<< ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret;
// is client sending to an old instance of server?
if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
- << " (current " << nonce_ << ") proc " << std::hex << h.proc;
+ << " (current " << nonce_ << ") proc " << std::hex << proc;
rh.ret = rpc_protocol::oldsrv_failure;
rep.write_header(rh);
c->send(rep);
class rpcc : private connection_delegate {
private:
using proc_id_t = rpc_protocol::proc_id_t;
- template <class S>
- using proc_t = rpc_protocol::proc_t<S>;
using nonce_t = rpc_protocol::nonce_t;
using xid_t = rpc_protocol::xid_t;
request dup_req_;
int xid_rep_done_ = -1;
- int call_marshalled(proc_id_t proc, milliseconds to, string & rep, marshall & req);
-
- template<class R>
- inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
- string rep;
- int intret = call_marshalled(proc, to, rep, req);
- if (intret >= 0)
- VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking
- return intret;
- }
+ int call_marshalled(const rpc_protocol::proc_t & proc, milliseconds to, string & rep, const marshall & req);
bool got_pdu(const shared_ptr<connection> & c, const string & b);
void cancel(lock & m_lock);
template<class P, class R, typename ...Args>
- inline int call(proc_t<P> proc, R & r, const Args & ... args) {
+ inline int call(const rpc_protocol::proc_checked_t<P> & proc, R & r, const Args & ... args) {
return call_timeout(proc, rpc::to_max, r, args...);
}
template<class P, class R, typename ...Args>
- inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
+ inline int call_timeout(const rpc_protocol::proc_checked_t<P> & proc, milliseconds to, R & r, const Args & ... args) {
static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
- return call_m(proc.id, to, r, std::forward<marshall>(marshall(args...)));
+ string rep;
+ int intret = call_marshalled(proc, to, rep, marshall(args...));
+ if (intret >= 0) {
+ VERIFY(unmarshall(rep, true, r).okdone()); // guaranteed by static type checking
+ }
+ return intret;
}
};
class rpcs : private connection_delegate {
private:
using proc_id_t = rpc_protocol::proc_id_t;
- template <class S>
- using proc_t = rpc_protocol::proc_t<S>;
using nonce_t = rpc_protocol::nonce_t;
using xid_t = rpc_protocol::xid_t;
void set_reachable(bool r) { reachable_ = r; }
- template<class P, class F, class C=void> inline void reg(proc_t<P> proc, F f, C *c=nullptr) {
+ template<class P, class F, class C=void>
+ inline void reg(const rpc_protocol::proc_checked_t<P> & proc, F f, C *c=nullptr) {
static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
MEMBERS(xid, ret)
};
- template <typename Signature>
struct proc_t {
- using signature = Signature;
proc_id_t id;
+ const char * name;
+ };
+
+ template <typename Signature>
+ struct proc_checked_t : proc_t {
+ using signature = Signature;
+ constexpr inline proc_checked_t(proc_id_t id, const char * name) : proc_t{id, name} {}
};
union header_t { request_header req; reply_header rep; };
const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
-#define REMOTE_PROCEDURE_BASE(_base_) static constexpr rpc_protocol::proc_id_t base = _base_
-#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr rpc_protocol::proc_t<status _args_> _name_{base + _offset_}
+#define REMOTE_PROCEDURE_BASE(_base_) \
+ static constexpr rpc_protocol::proc_id_t base = _base_
+
+#define REMOTE_PROCEDURE(_offset_, _name_, _args_) \
+ static constexpr rpc_protocol::proc_checked_t<status _args_> _name_{base + _offset_, #_name_}
REMOTE_PROCEDURE_BASE(0);
REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind
void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
void commit_change(unsigned vid);
- template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
+ template<class P, class F, class C=void> void reg(rpc_protocol::proc_checked_t<P> proc, F f, C *c=nullptr) {
static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
lock ml(rsm_mutex);
procs[proc.id] = marshalled_func<F>::wrap(f, c);
std::mutex rsm_client_mutex;
void primary_failure(lock & rsm_client_mutex_lock);
bool init_members(lock & rsm_client_mutex_lock);
- rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req);
- template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
+ rsm_protocol::status invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req);
+ template<class R> int call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req);
public:
rsm_client(string dst);
template<class P, class R, class ...Args>
- inline int call(rpc_protocol::proc_t<P> proc, R & r, const Args & ...a1) {
+ inline int call(rpc_protocol::proc_checked_t<P> proc, R & r, const Args & ...a1) {
static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
- return call_m(proc.id, r, marshall(a1...));
+ return call_marshalled(proc, r, marshall(a1...));
}
};
}
template<class R>
-int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
+int rsm_client::call_marshalled(rpc_protocol::proc_t & proc, R & r, const marshall & req) {
string rep;
- int intret = invoke(proc, rep, req.content());
+ int intret = invoke(proc.id, rep, req.content());
VERIFY( intret == rsm_client_protocol::OK );
unmarshall u(rep, false, intret);
if (intret < 0) return intret;
LOG << "failed to unmarshall the reply.";
LOG << "You probably forgot to set the reply string in "
<< "rsm::client_invoke, or you may have called RPC "
- << "0x" << std::hex << proc << " with the wrong return type";
+ << proc.name << " (0x" << std::hex << proc.id << ") with the wrong return type";
LOG << "here's what I got: \"" << hexify(rep) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
if(!unmarshall(res, false, r).okdone()) {
LOG << "failed to unmarshall the reply.";
- LOG << "You are probably calling RPC 0x" << std::hex << proc
- << " with the wrong return type.";
+ LOG << "You are probably calling RPC " << proc.name << " (0x"
+ << std::hex << proc.id << ") with the wrong return type.";
LOG << "here's what I got: \"" << hexify(res) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;