handlers.
handle h(m);
cfg_mutex_lock.unlock();
- int r = 0, ret = rpc_const::bind_failure;
+ int r = 0, ret = rpc_protocol::bind_failure;
if (rpcc *cl = h.safebind())
ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
cfg_mutex_lock.lock();
switch (ret) {
case paxos_protocol::OK:
break;
- case rpc_const::atmostonce_failure:
- case rpc_const::oldsrv_failure:
+ case rpc_protocol::atmostonce_failure:
+ case rpc_protocol::oldsrv_failure:
invalidate_handle(m);
//h.invalidate();
break;
int lock_client::stat(lock_protocol::lockid_t lid) {
VERIFY(0);
int r;
- auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, cl->id(), lid);
+ auto ret = (lock_protocol::status)cl->call(lock_protocol::stat, r, lid, id);
VERIFY (ret == lock_protocol::OK);
return r;
}
int main(int argc, char *argv[]) {
if(argc != 2) {
- cerr << "Usage: " << argv[0] << " [host:]port" << endl;
+ LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port");
return 1;
}
#include "types.h"
#include "rpc/rpc.h"
-class lock_protocol {
- public:
- enum status : status_t { OK, RETRY, RPCERR, NOENT, IOERR };
- using lockid_t = string;
- using xid_t = uint64_t;
- enum rpc_numbers : proc_t {
- acquire = 0x7001,
- release,
- stat,
- };
+typedef string callback_t;
+
+namespace lock_protocol {
+ enum status : rpc_protocol::status { OK, RETRY, RPCERR, NOENT, IOERR };
+ using lockid_t = string;
+ using xid_t = uint64_t;
+ REMOTE_PROCEDURE_BASE(0x7000);
+ REMOTE_PROCEDURE(1, acquire, (int &, lockid_t, callback_t, xid_t));
+ REMOTE_PROCEDURE(2, release, (int &, lockid_t, callback_t, xid_t));
+ REMOTE_PROCEDURE(3, stat, (int &, lockid_t, callback_t));
};
-class rlock_protocol {
- public:
- enum status : status_t { OK, RPCERR };
- enum rpc_numbers : proc_t {
- revoke = 0x8001,
- retry,
- };
+namespace rlock_protocol {
+ using lockid_t = lock_protocol::lockid_t;
+ using xid_t = lock_protocol::xid_t;
+ enum status : rpc_protocol::status { OK, RPCERR };
+ REMOTE_PROCEDURE_BASE(0x8000);
+ REMOTE_PROCEDURE(1, revoke, (int &, lockid_t, xid_t));
+ REMOTE_PROCEDURE(2, retry, (int &, lockid_t, xid_t));
};
#endif
}
}
-int lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
holder_t h = holder_t(id, xid);
lock_state &st = get_lock_state(lid);
return lock_protocol::RETRY;
}
-int lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
+lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
rep >> nacquire >> lock_table;
}
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) {
+lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) {
LOG("stat request for " << lid);
VERIFY(0);
r = nacquire;
#include "rsm.h"
#include "rpc/fifo.h"
-typedef string callback_t;
typedef pair<callback_t, lock_protocol::xid_t> holder_t;
class lock_state {
rsm *rsm_;
public:
lock_server(rsm *r = 0);
- lock_protocol::status stat(int &, lock_protocol::lockid_t);
void revoker();
void retryer();
string marshal_state();
void unmarshal_state(const string & state);
- int acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
- int release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+ lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+ lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
+ lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id);
};
#endif
srandom((uint32_t)getpid());
if(argc != 3){
- cerr << "Usage: " << argv[0] << " [master:]port [me:]port" << endl;
+ LOG_NONMEMBER("Usage: " << argv[0] << " [master:]port [me:]port");
exit(1);
}
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 0) {
- cout << "error: server granted " << lid << " twice" << endl;
- cerr << "error: server granted " << lid << " twice" << endl;
+ LOG_NONMEMBER("error: server granted " << lid << " twice");
exit(1);
}
ct[x] += 1;
lock ml(count_mutex);
int x = lid[0] & 0x0f;
if (ct[x] != 1) {
- cerr << "error: client released un-held lock " << lid << endl;
+ LOG_NONMEMBER("error: client released un-held lock " << lid);
exit(1);
}
ct[x] -= 1;
srandom((uint32_t)getpid());
if (argc < 2) {
- cerr << "Usage: " << argv[0] << " [host:]port [test]" << endl;
+ LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [test]");
exit(1);
}
void proposer_acceptor::breakpoint(int b) {
if (b == 3) {
- LOG("Proposer: breakpoint 1");
+ LOG("breakpoint 1");
break1 = true;
} else if (b == 4) {
- LOG("Proposer: breakpoint 2");
+ LOG("breakpoint 2");
break2 = true;
}
}
#include "log.h"
using prepareres = paxos_protocol::prepareres;
-
-using node_t = string;
-using nodes_t = vector<node_t>;
-using value_t = string;
+using node_t = paxos_protocol::node_t;
+using nodes_t = paxos_protocol::nodes_t;
+using value_t = paxos_protocol::value_t;
class paxos_change {
public:
MARSHALLABLE(prop_t)
-class paxos_protocol {
- public:
- enum status : status_t { OK, ERR };
- enum rpc_numbers : proc_t {
- preparereq = 0x11001,
- acceptreq,
- decidereq,
- heartbeat,
- };
-
- struct prepareres {
- bool oldinstance;
- bool accept;
- prop_t n_a;
- string v_a;
-
- MEMBERS(oldinstance, accept, n_a, v_a)
- };
+namespace paxos_protocol {
+ enum status : rpc_protocol::status { OK, ERR };
+ struct prepareres {
+ bool oldinstance;
+ bool accept;
+ prop_t n_a;
+ string v_a;
+
+ MEMBERS(oldinstance, accept, n_a, v_a)
+ };
+ using node_t = string;
+ using nodes_t = vector<node_t>;
+ using value_t = string;
+
+ REMOTE_PROCEDURE_BASE(0x11000);
+ REMOTE_PROCEDURE(1, preparereq, (prepareres &, node_t, unsigned, prop_t));
+ REMOTE_PROCEDURE(2, acceptreq, (bool &, node_t, unsigned, prop_t, value_t));
+ REMOTE_PROCEDURE(3, decidereq, (int &, node_t, unsigned, value_t));
+ REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
};
MARSHALLABLE(paxos_protocol::prepareres)
bool connection::readpdu() {
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
- rpc_sz_t sz1;
+ rpc_protocol::rpc_sz_t sz1;
ssize_t n = fd_.read(sz1);
if (n == 0)
size_t sz = ntoh(sz1);
- if (sz > MAX_PDU) {
+ if (sz > rpc_protocol::MAX_PDU) {
IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1);
return false;
}
class marshall {
private:
- string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer
- size_t index_ = RPC_HEADER_SZ; // Read/write head position
+ string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); // Raw bytes buffer
+ size_t index_ = rpc_protocol::RPC_HEADER_SZ; // Read/write head position
public:
template <typename... Args>
// with header
inline operator string() const { return buf_.substr(0,index_); }
// without header
- inline string content() const { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); }
+ inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
// letting S be a defaulted template parameter forces the compiler to
// delay looking up operator<<(marshall&, rpc_sz_t) until we define it
// (i.e. we define an operator for marshalling uint32_t)
- template <class T, class S=rpc_sz_t> inline void
+ template <class T, class S=rpc_protocol::rpc_sz_t> inline void
pack_header(const T & h) {
- VERIFY(sizeof(T)+sizeof(S) <= RPC_HEADER_SZ);
+ VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ);
size_t saved_sz = index_;
index_ = 0;
*this << (S)(saved_sz - sizeof(S)) << (T)h;
public:
unmarshall(const string &s, bool has_header)
- : buf_(s),index_(RPC_HEADER_SZ) {
+ : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) {
if (!has_header)
- buf_.insert(0, RPC_HEADER_SZ, 0);
- ok_ = (buf_.size() >= RPC_HEADER_SZ);
+ buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
+ ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
}
bool ok() const { return ok_; }
template <class T> inline void
unpack_header(T & h) {
- VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ);
+ VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ);
// first 4 bytes hold length field
- index_ = sizeof(rpc_sz_t);
+ index_ = sizeof(rpc_protocol::rpc_sz_t);
*this >> h;
- index_ = RPC_HEADER_SZ;
+ index_ = rpc_protocol::RPC_HEADER_SZ;
}
template <class T> inline T _grab() { T t; *this >> t; return t; }
inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
// our first two marshallable structs...
-MARSHALLABLE(request_header)
-MARSHALLABLE(reply_header)
+MARSHALLABLE(rpc_protocol::request_header)
+MARSHALLABLE(rpc_protocol::reply_header)
//
// Marshalling for STL containers
int rpcc::bind(milliseconds to) {
unsigned int r;
- int ret = call_timeout(rpc_const::bind, to, r, 0);
+ int ret = call_timeout(rpc_protocol::bind, to, r, 0);
if (ret == 0) {
lock ml(m_);
bind_done_ = true;
lock cl(ca->m);
ca->done = true;
- ca->intret = rpc_const::cancel_failure;
+ ca->intret = rpc_protocol::cancel_failure;
ca->c.notify_one();
}
}
}
-int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
+int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
caller ca(0, &rep);
int xid_rep;
{
lock ml(m_);
- if ((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)) {
+ if ((proc != rpc_protocol::bind.id && !bind_done_) || (proc == rpc_protocol::bind.id && bind_done_)) {
IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice");
- return rpc_const::bind_failure;
+ return rpc_protocol::bind_failure;
}
if (destroy_wait_)
- return rpc_const::cancel_failure;
+ return rpc_protocol::cancel_failure;
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()});
+ req.pack_header(rpc_protocol::request_header{
+ ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
+ });
xid_rep = xid_rep_window_.front();
}
ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret);
// destruction of req automatically frees its buffer
- return (ca.done? ca.intret : rpc_const::timeout_failure);
+ return (ca.done? ca.intret : rpc_protocol::timeout_failure);
}
void rpcc::get_refconn(shared_ptr<connection> & ch) {
rpcc::got_pdu(const shared_ptr<connection> &, const string & b)
{
unmarshall rep(b, true);
- reply_header h;
+ rpc_protocol::reply_header h;
rep.unpack_header(h);
if (!rep.ok()) {
nonce_ = (unsigned int)random();
IF_LEVEL(2) LOG("created with nonce " << nonce_);
- reg(rpc_const::bind, &rpcs::rpcbind, this);
+ reg(rpc_protocol::bind, &rpcs::rpcbind, this);
dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
}
return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
}
-void rpcs::reg1(proc_t proc, handler *h) {
+void rpcs::reg1(proc_id_t proc, handler *h) {
lock pl(procs_m_);
VERIFY(procs_.count(proc) == 0);
procs_[proc] = h;
VERIFY(procs_.count(proc) >= 1);
}
-void rpcs::updatestat(proc_t proc) {
+void rpcs::updatestat(proc_id_t proc) {
lock cl(count_m_);
counts_[proc]++;
curr_counts_--;
void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
unmarshall req(buf, true);
- request_header h;
+ rpc_protocol::request_header h;
req.unpack_header(h);
- proc_t proc = h.proc;
+ proc_id_t proc = h.proc;
if (!req.ok()) {
IF_LEVEL(1) LOG("unmarshall header failed");
dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce);
marshall rep;
- reply_header rh{h.xid,0};
+ rpc_protocol::reply_header rh{h.xid,0};
// is client sending to an old instance of server?
if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce <<
" (current " << nonce_ << ") proc " << hex << h.proc);
- rh.ret = rpc_const::oldsrv_failure;
+ rh.ret = rpc_protocol::oldsrv_failure;
rep.pack_header(rh);
c->send(rep);
return;
updatestat(proc);
rh.ret = (*f)(req, rep);
- if (rh.ret == rpc_const::unmarshal_args_failure) {
- cerr << "failed to unmarshall the arguments. You are " <<
- "probably calling RPC 0x" << hex << proc << " with the wrong " <<
- "types of arguments." << endl;
+ if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+ LOG("failed to unmarshall the arguments. You are " <<
+ "probably calling RPC 0x" << hex << proc << " with the wrong " <<
+ "types of arguments.");
VERIFY(0);
}
VERIFY(rh.ret >= 0);
break;
case FORGOTTEN: // very old request and we don't have the response anymore
IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce);
- rh.ret = rpc_const::atmostonce_failure;
+ rh.ret = rpc_protocol::atmostonce_failure;
rep.pack_header(rh);
c->send(rep);
break;
for (it++; it != l.end() && it->xid < xid; it++);
// there should already be an entry, so whine if there isn't
if (it == l.end() || it->xid != xid) {
- cerr << "Could not find reply struct in add_reply" << endl;
+ LOG("Could not find reply struct in add_reply");
l.insert(it, reply_t(xid, b));
} else {
*it = reply_t(xid, b);
struct hostent *hp = gethostbyname(host.c_str());
if (!hp || hp->h_length != 4 || hp->h_addrtype != AF_INET) {
- cerr << "cannot find host name " << host << endl;
+ LOG_NONMEMBER("cannot find host name " << host);
exit(1);
}
memcpy(&a, hp->h_addr_list[0], sizeof(in_addr_t));
#include <sys/socket.h>
#include <netinet/in.h>
+#include "rpc_protocol.h"
#include "thr_pool.h"
#include "marshall.h"
#include "marshall_wrap.h"
static constexpr milliseconds to_min{100};
}
-class rpc_const {
- public:
- static const unsigned int bind = 1; // handler number reserved for bind
- static const int timeout_failure = -1;
- static const int unmarshal_args_failure = -2;
- static const int unmarshal_reply_failure = -3;
- static const int atmostonce_failure = -4;
- static const int oldsrv_failure = -5;
- static const int bind_failure = -6;
- static const int cancel_failure = -7;
-};
+template<class P, class R, class ...Args> struct is_valid_call : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_call<S(R &, Args...), R, Args...> : true_type {};
+
+template<class P, class F> struct is_valid_registration : false_type {};
+
+template<class S, class R, class ...Args>
+struct is_valid_registration<S(R &, typename decay<Args>::type...), S(R &, Args...)> : true_type {};
+
+template<class P, class C, class S, class R, class ...Args>
+struct is_valid_registration<P, S(C::*)(R &, Args...)> : is_valid_registration<P, S(R &, Args...)> {};
// rpc client endpoint.
// manages a xid space per destination socket
// threaded: multiple threads can be sending RPCs,
class rpcc : private connection_delegate {
private:
+ using proc_id_t = rpc_protocol::proc_id_t;
+ template <class S>
+ using proc_t = rpc_protocol::proc_t<S>;
// manages per rpc info
struct caller {
request dup_req_;
int xid_rep_done_;
- int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
+ int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to);
template<class R>
- int call_m(proc_t proc, marshall &req, R & r, milliseconds to) {
+ int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) {
string rep;
int intret = call1(proc, req, rep, to);
unmarshall u(rep, true);
if (intret < 0) return intret;
u >> r;
if (u.okdone() != true) {
- cerr << "rpcc::call_m: failed to unmarshall the reply. You are probably " <<
- "calling RPC 0x" << hex << proc << " with the wrong return type." << endl;
+ LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " <<
+ "calling RPC 0x" << hex << proc << " with the wrong return type.");
VERIFY(0);
- return rpc_const::unmarshal_reply_failure;
+ return rpc_protocol::unmarshal_reply_failure;
}
return intret;
}
void cancel();
- template<class R, typename ...Args>
- inline int call(proc_t proc, R & r, const Args&... args) {
+ template<class P, class R, typename ...Args>
+ inline int call(proc_t<P> proc, R & r, const Args&... args) {
return call_timeout(proc, rpc::to_max, r, args...);
}
- template<class R, typename ...Args>
- inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args) {
+ template<class P, class R, typename ...Args>
+ inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args&... args) {
+ static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
marshall m{args...};
- return call_m(proc, m, r, to);
+ return call_m(proc.id, m, r, to);
}
};
// rpc server endpoint.
class rpcs : private connection_delegate {
private:
+ using proc_id_t = rpc_protocol::proc_id_t;
+ template <class S>
+ using proc_t = rpc_protocol::proc_t<S>;
typedef enum {
NEW, // new RPC, not a duplicate
rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
int xid, int rep_xid, string & b);
- void updatestat(proc_t proc);
+ void updatestat(proc_id_t proc);
// latest connection to the client
map<unsigned int, shared_ptr<connection>> conns_;
// counting
const size_t counting_;
size_t curr_counts_;
- map<proc_t, size_t> counts_;
+ map<proc_id_t, size_t> counts_;
bool reachable_;
// map proc # to function
- map<proc_t, handler *> procs_;
+ map<proc_id_t, handler *> procs_;
mutex procs_m_; // protect insert/delete to procs[]
mutex count_m_; // protect modification of counts
void dispatch(shared_ptr<connection> c, const string & buf);
// internal handler registration
- void reg1(proc_t proc, handler *);
+ void reg1(proc_id_t proc, handler *);
unique_ptr<thread_pool> dispatchpool_;
unique_ptr<tcpsconn> listener_;
// RPC handler for clients binding
- int rpcbind(unsigned int &r, int a);
+ rpc_protocol::status rpcbind(unsigned int &r, int a);
bool got_pdu(const shared_ptr<connection> & c, const string & b);
void set_reachable(bool r) { reachable_ = r; }
- template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
+ template<class P, class F, class C=void> void reg(proc_t<P> proc, F f, C *c=nullptr) {
+ static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
- return rpc_const::unmarshal_args_failure;
+ return rpc_protocol::unmarshal_args_failure;
}
};
- reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
+ reg1(proc.id, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
void start();
#include "types.h"
-using proc_t = uint32_t;
-using status_t = int32_t;
-using rpc_sz_t = uint32_t;
-
-struct request_header {
- int xid;
- proc_t proc;
- unsigned int clt_nonce;
- unsigned int srv_nonce;
- int xid_rep;
-
- MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+namespace rpc_protocol {
+ using proc_id_t = uint32_t;
+
+ using status = int32_t;
+ using rpc_sz_t = uint32_t;
+
+ enum : status {
+ timeout_failure = -1,
+ unmarshal_args_failure = -2,
+ unmarshal_reply_failure = -3,
+ atmostonce_failure = -4,
+ oldsrv_failure = -5,
+ bind_failure = -6,
+ cancel_failure = -7
+ };
+
+ struct request_header {
+ int xid;
+ proc_id_t proc;
+ unsigned int clt_nonce;
+ unsigned int srv_nonce;
+ int xid_rep;
+
+ MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
+ };
+
+ struct reply_header {
+ int xid;
+ int ret;
+
+ MEMBERS(xid, ret)
+ };
+
+ template <typename Signature>
+ struct proc_t {
+ using signature = Signature;
+ proc_id_t id;
+ };
+
+ const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+ const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
+ const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+
+#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ };
+#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
+
+ REMOTE_PROCEDURE_BASE(0);
+ REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind
};
-ENDIAN_SWAPPABLE(request_header)
-
-struct reply_header {
- int xid;
- int ret;
-
- MEMBERS(xid, ret)
-};
-
-ENDIAN_SWAPPABLE(reply_header)
-
-const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
-const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
-const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+ENDIAN_SWAPPABLE(rpc_protocol::request_header)
+ENDIAN_SWAPPABLE(rpc_protocol::reply_header)
#endif
int handle_bigrep(string &r, const size_t a);
};
+namespace srv_protocol {
+ using status = rpc_protocol::status;
+ REMOTE_PROCEDURE_BASE(0);
+ REMOTE_PROCEDURE(22, _22, (string &, string, string));
+ REMOTE_PROCEDURE(23, fast, (int &, int));
+ REMOTE_PROCEDURE(24, slow, (int &, int));
+ REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
+};
+
// a handler. a and b are arguments, r is the result.
// there can be multiple arguments but only one result.
// the caller also gets to see the int return value
// rpcs::reg() decides how to unmarshall by looking
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
-int
-srv::handle_22(string &r, const string a, string b)
-{
+int srv::handle_22(string &r, const string a, string b) {
r = a + b;
return 0;
}
-int
-srv::handle_fast(int &r, const int a)
-{
+int srv::handle_fast(int &r, const int a) {
r = a + 1;
return 0;
}
-int
-srv::handle_slow(int &r, const int a)
-{
+int srv::handle_slow(int &r, const int a) {
usleep(random() % 500);
r = a + 2;
return 0;
}
-int
-srv::handle_bigrep(string &r, const size_t len)
-{
+int srv::handle_bigrep(string &r, const size_t len) {
r = string((size_t)len, 'x');
return 0;
}
srv service;
-void startserver()
-{
+void startserver() {
server = new rpcs(port);
- server->reg(22, &srv::handle_22, &service);
- server->reg(23, &srv::handle_fast, &service);
- server->reg(24, &srv::handle_slow, &service);
- server->reg(25, &srv::handle_bigrep, &service);
+ server->reg(srv_protocol::_22, &srv::handle_22, &service);
+ server->reg(srv_protocol::fast, &srv::handle_fast, &service);
+ server->reg(srv_protocol::slow, &srv::handle_slow, &service);
+ server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
server->start();
}
-void
-testmarshall()
-{
+void testmarshall() {
marshall m;
- request_header rh{1,2,3,4,5};
+ rpc_protocol::request_header rh{1,2,3,4,5};
m.pack_header(rh);
- VERIFY(((string)m).size()==RPC_HEADER_SZ);
+ VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
string s = "hallo....";
m << s;
string b = m;
- VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+ VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
unmarshall un(b, true);
- request_header rh1;
+ rpc_protocol::request_header rh1;
un.unpack_header(rh1);
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
VERIFY(i1==i && l1==l && s1==s);
}
-void
-client1(size_t cl)
-{
+void client1(size_t cl) {
// test concurrency.
size_t which_cl = cl % NUM_CL;
for(int i = 0; i < 100; i++){
- int arg = (random() % 2000);
+ unsigned long arg = (random() % 2000);
string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
+ int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
VERIFY(ret == 0);
- if ((int)rep.size()!=arg)
+ if ((unsigned long)rep.size()!=arg)
cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
- VERIFY((int)rep.size() == arg);
+ VERIFY((unsigned long)rep.size() == arg);
}
// test rpc replies coming back not in the order of
auto start = steady_clock::now();
- int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+ int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
auto end = steady_clock::now();
auto diff = duration_cast<milliseconds>(end - start).count();
if (ret != 0)
}
}
-void
-client2(size_t cl)
-{
+void client2(size_t cl) {
size_t which_cl = cl % NUM_CL;
time_t t1;
time(&t1);
while(time(0) - t1 < 10){
- int arg = (random() % 2000);
+ unsigned long arg = (random() % 2000);
string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg)
+ int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
+ if ((unsigned long)rep.size()!=arg)
cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
- VERIFY((int)rep.size() == arg);
+ VERIFY((unsigned long)rep.size() == arg);
}
}
-void
-client3(void *xx)
-{
+void client3(void *xx) {
rpcc *c = (rpcc *) xx;
for(int i = 0; i < 4; i++){
int rep = 0;
- int ret = c->call_timeout(24, milliseconds(300), rep, i);
- VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+ int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
+ VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
}
}
-
-void
-simple_tests(rpcc *c)
-{
+void simple_tests(rpcc *c) {
cout << "simple_tests" << endl;
// an RPC call to procedure #22.
// rpcc::call() looks at the argument types to decide how
// to marshall the RPC call packet, and how to unmarshall
// the reply packet.
string rep;
- int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+ int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0); // this is what handle_22 returns
VERIFY(rep == "hello goodbye");
cout << " -- string concat RPC .. ok" << endl;
// small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
+ intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
VERIFY(intret == 0);
VERIFY(rep.size() == 70000);
cout << " -- small request, big reply .. ok" << endl;
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
- intret = c->call_timeout(23, milliseconds(300), xx, 77);
+ intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
VERIFY(intret == 0 && xx == 78);
cout << " -- no spurious timeout .. ok" << endl;
{
string arg(1000, 'x');
string rep2;
- c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
+ c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
VERIFY(rep2.size() == 1001);
cout << " -- no spurious timeout .. ok" << endl;
}
// huge RPC
string big(1000000, 'x');
- intret = c->call(22, rep, big, (string)"z");
+ intret = c->call(srv_protocol::_22, rep, big, (string)"z");
VERIFY(rep.size() == 1000001);
cout << " -- huge 1M rpc request .. ok" << endl;
cout << "simple_tests OK" << endl;
}
-void
-concurrent_test(size_t nt)
-{
+void concurrent_test(size_t nt) {
// create threads that make lots of calls in parallel,
// to test thread synchronization for concurrent calls
// and dispatches.
cout << " OK" << endl;
}
-void
-lossy_test()
-{
+void lossy_test() {
cout << "start lossy_test ...";
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
-void
-failure_test()
-{
+void failure_test() {
rpcc *client1;
rpcc *client = clients[0];
startserver();
string rep;
- int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
- VERIFY(intret == rpc_const::oldsrv_failure);
+ int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == rpc_protocol::oldsrv_failure);
cout << " -- call recovered server with old client .. failed ok" << endl;
delete client;
VERIFY (client->bind() >= 0);
VERIFY (client->bind() < 0);
- intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+ intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0);
VERIFY(rep == "hello goodbye");
cout << "failure_test OK" << endl;
}
-int
-main(int argc, char *argv[])
-{
+int main(int argc, char *argv[]) {
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
testmarshall();
if (isserver) {
- cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
startserver();
}
exit(0);
}
- while (1) {
+ while (1)
usleep(100000);
- }
}
thread(&rsm::recovery, this).detach();
}
-void rsm::reg1(int proc, handler *h) {
+void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
lock ml(rsm_mutex);
procs[proc] = h;
}
// XXX iannucci 2013/09/15 -- I don't understand whether accessing
// cfg->view_id in this manner involves a race. I suspect not.
if (join(primary, ml)) {
- LOG("recovery: joined");
+ LOG("joined");
commit_change(cfg->view_id(), ml);
} else {
ml.unlock();
}
}
vid_insync = vid_commit;
- LOG("recovery: sync vid_insync " << vid_insync);
+ LOG("sync vid_insync " << vid_insync);
if (primary == cfg->myaddr()) {
r = sync_with_backups(ml);
} else {
r = sync_with_primary(ml);
}
- LOG("recovery: sync done");
+ LOG("sync done");
// If there was a commited viewchange during the synchronization, restart
// the recovery
myvs.seqno = 1;
inviewchange = false;
}
- LOG("recovery: go to sleep " << insync << " " << inviewchange);
+ LOG("go to sleep " << insync << " " << inviewchange);
recovery_cond.wait(ml);
}
}
void rsm::commit_change(unsigned vid, lock &) {
if (vid <= vid_commit)
return;
- LOG("commit_change: new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
+ LOG("new view (" << vid << ") last vs (" << last_myvs.vid << "," <<
last_myvs.seqno << ") " << primary << " insync " << insync);
vid_commit = vid;
inviewchange = true;
}
-void rsm::execute(int procno, const string & req, string & r) {
+void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r) {
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
// number, and invokes it on all members of the replicated state
// machine.
//
-rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const string & req) {
+rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
LOG("invoke procno 0x" << hex << procno);
lock ml(invoke_mutex);
vector<string> m;
// the replica must execute requests in order (with no gaps)
// according to requests' seqno
-rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, const string & req) {
+rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
LOG("invoke procno 0x" << hex << proc);
lock ml(invoke_mutex);
vector<string> m;
VERIFY (c.size() > 0);
if (isamember(primary,c)) {
- LOG("set_primary: primary stays " << primary);
+ LOG("primary stays " << primary);
return;
}
for (unsigned i = 0; i < p.size(); i++) {
if (isamember(p[i], c)) {
primary = p[i];
- LOG("set_primary: primary is " << primary);
+ LOG("primary is " << primary);
return;
}
}
class rsm : public config_view_change {
private:
- void reg1(int proc, handler *);
+ void reg1(rpc_protocol::proc_id_t proc, handler *);
protected:
- map<int, handler *> procs;
+ map<rpc_protocol::proc_id_t, handler *> procs;
unique_ptr<config> cfg;
rsm_state_transfer *stf = nullptr;
rpcs *rsmrpc;
bool break2;
rsm_client_protocol::status client_members(vector<string> &r, int i);
- rsm_protocol::status invoke(int &, int proc, viewstamp vs, const string & mreq);
+ rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
viewstamp last, unsigned vid);
rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
mutex rsm_mutex, invoke_mutex;
cond recovery_cond, sync_cond;
- void execute(int procno, const string & req, string & r);
- rsm_client_protocol::status client_invoke(string & r, int procno, const string & req);
+ void execute(rpc_protocol::proc_id_t procno, const string & req, string & r);
+ rsm_client_protocol::status client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req);
bool statetransfer(const string & m, lock & rsm_mutex_lock);
bool statetransferdone(const string & m, lock & rsm_mutex_lock);
bool join(const string & m, lock & rsm_mutex_lock);
void recovery();
void commit_change(unsigned vid);
- template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
+ template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
+ static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
+ reg1(proc.id, marshalled_func<F>::wrap(f, c));
+ }
void start();
};
-template<class F, class C> void rsm::reg(int proc, F f, C *c) {
- reg1(proc, marshalled_func<F>::wrap(f, c));
-}
-
#endif /* rsm_h */
LOG("create rsm_client");
lock ml(rsm_client_mutex);
VERIFY (init_members(ml));
- LOG("rsm_client: done");
+ LOG("done");
}
void rsm_client::primary_failure(lock &) {
mutex rsm_client_mutex;
void primary_failure(lock & rsm_client_mutex_lock);
bool init_members(lock & rsm_client_mutex_lock);
+ rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
+ template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
public:
rsm_client(string dst);
- rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
- template<class R, class ...Args>
- int call(unsigned int proc, R & r, const Args & ...a1);
- private:
- template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
+ template<class P, class R, class ...Args>
+ int call(rpc_protocol::proc_t<P> proc, R & r, const Args & ...a1) {
+ static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
+ return call_m(proc.id, r, marshall{a1...});
+ }
};
inline string hexify(const string & s) {
"0x" << hex << proc << " with the wrong return type");
LOG("here's what I got: \"" << hexify(rep) << "\"");
VERIFY(0);
- return rpc_const::unmarshal_reply_failure;
+ return rpc_protocol::unmarshal_reply_failure;
}
unmarshall u1(res, false);
u1 >> r;
" with the wrong return type.");
LOG("here's what I got: \"" << hexify(res) << "\"");
VERIFY(0);
- return rpc_const::unmarshal_reply_failure;
+ return rpc_protocol::unmarshal_reply_failure;
}
return intret;
}
-template<class R, class ...Args>
-int rsm_client::call(unsigned int proc, R & r, const Args & ...a1) {
- return call_m(proc, r, marshall{a1...});
-}
-
#endif
#include "types.h"
#include "rpc/rpc.h"
-class rsm_client_protocol {
- public:
- enum status : status_t {OK, ERR, NOTPRIMARY, BUSY};
- enum rpc_numbers : proc_t {
- invoke = 0x9001,
- members,
- };
+namespace rsm_client_protocol {
+ enum status : rpc_protocol::status {OK, ERR, NOTPRIMARY, BUSY};
+ REMOTE_PROCEDURE_BASE(0x9000);
+ REMOTE_PROCEDURE(1, invoke, (string &, rpc_protocol::proc_id_t, string));
+ REMOTE_PROCEDURE(2, members, (vector<string> &, int));
};
struct viewstamp {
MARSHALLABLE(viewstamp)
-class rsm_protocol {
- public:
- enum status : status_t { OK, ERR, BUSY};
- enum rpc_numbers : proc_t {
- invoke = 0xa001,
- transferreq,
- transferdonereq,
- joinreq,
- };
-
- struct transferres {
- string state;
- viewstamp last;
-
- MEMBERS(state, last)
- };
+namespace rsm_protocol {
+ enum status : rpc_protocol::status { OK, ERR, BUSY};
+
+ struct transferres {
+ string state;
+ viewstamp last;
+
+ MEMBERS(state, last)
+ };
+
+ REMOTE_PROCEDURE_BASE(0xa000);
+ REMOTE_PROCEDURE(1, invoke, (int &, rpc_protocol::proc_id_t, viewstamp, string));
+ REMOTE_PROCEDURE(2, transferreq, (transferres &, string, viewstamp, unsigned));
+ REMOTE_PROCEDURE(3, transferdonereq, (int &, string, unsigned));
+ REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp));
};
MARSHALLABLE(rsm_protocol::transferres)
-class rsm_test_protocol {
- public:
- enum status : status_t {OK, ERR};
- enum rpc_numbers : proc_t {
- net_repair = 0x12001,
- breakpoint = 0x12002,
- };
+namespace rsm_test_protocol {
+ enum status : rpc_protocol::status {OK, ERR};
+ REMOTE_PROCEDURE_BASE(0x12000);
+ REMOTE_PROCEDURE(1, net_repair, (status &, int));
+ REMOTE_PROCEDURE(2, breakpoint, (status &, int));
};
#endif
int main(int argc, char *argv[]) {
if(argc != 4){
- cerr << "Usage: " << argv[0] << " [host:]port [partition] arg" << endl;
+ LOG_NONMEMBER("Usage: " << argv[0] << " [host:]port [partition] arg");
return 1;
}
int b = stoi(argv[3]);
cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
} else {
- cerr << "Unknown command " << argv[2] << endl;
+ LOG_NONMEMBER("Unknown command " << argv[2]);
}
return 0;
}