lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
lock_tester : $(lock_tester) rpc/librpc.a
-lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
lock_server : $(lock_server) rpc/librpc.a
rsm_tester=rsm_tester.o rsmtest_client.o
-include *.d
-include rpc/*.d
-clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester
+clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester lock_demo rsm_tester
.PHONY: clean $(EXTRA_TARGETS)
clean:
rm -rf $(clean_files)
paxos_proposer = new proposer(this, paxos_acceptor, me);
// XXX hack; maybe should have its own port number
- paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+ paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
{
lock ml(cfg_mutex);
}
paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
+config::heartbeat(int &r, std::string m, unsigned vid)
{
lock ml(cfg_mutex);
int ret = paxos_protocol::ERR;
std::vector<std::string> mems;
mutex cfg_mutex;
std::condition_variable config_cond;
- paxos_protocol::status heartbeat(
- std::string m,
- unsigned instance,
- int &r);
+ paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance);
std::string value(const std::vector<std::string> &mems) const;
void members(const std::string &v, std::vector<std::string> &m) const;
void get_view_wo(unsigned instance, std::vector<std::string> &m);
id = host.str();
last_port = rlock_port;
rpcs *rlsrpc = new rpcs(rlock_port);
- rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
- rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
+ rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this);
+ rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this);
{
lock sl(xid_mutex);
xid = 0;
return lock_protocol::OK;
}
-rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
LOG("Revoke handler " << lid << " " << xid);
lock_state &st = get_lock_state(lid);
lock sl(st.m);
return rlock_protocol::OK;
}
-rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
lock_state &st = get_lock_state(lid);
lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
lock_protocol::status acquire(lock_protocol::lockid_t);
virtual lock_protocol::status release(lock_protocol::lockid_t);
void releaser();
- rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
- rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
+ rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
+ rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
};
-// the lock server implementation
-
-#include "lock_server.h"
-#include <sstream>
+#include "rpc/rpc.h"
+#include <arpa/inet.h>
+#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
-#include <arpa/inet.h>
-#include "lock.h"
+#include "lock_server_cache_rsm.h"
+#include "paxos.h"
+#include "rsm.h"
-lock_server::lock_server():
- nacquire (0)
-{
-}
+// Main loop of lock_server
-// caller must hold lock_lock
-mutex &
-lock_server::get_lock(lock_protocol::lockid_t lid) {
- lock ml(lock_lock);
- // by the semantics of std::map, this will create
- // the mutex if it doesn't already exist
- mutex &l = locks[lid];
- return l;
-}
+char tprintf_thread_prefix = 's';
-lock_protocol::status
-lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r)
+int
+main(int argc, char *argv[])
{
- lock_protocol::status ret = lock_protocol::OK;
- printf("stat request from clt %d\n", clt);
- r = nacquire;
- return ret;
-}
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
-lock_protocol::status
-lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
-{
- get_lock(lid).lock();
- return lock_protocol::OK;
-}
+ srandom(getpid());
-lock_protocol::status
-lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
-{
- get_lock(lid).unlock();
- return lock_protocol::OK;
+ if(argc != 3){
+ fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+ exit(1);
+ }
+
+ rsm rsm(argv[1], argv[2]);
+ lock_server_cache_rsm ls(&rsm);
+ rsm.set_state_transfer(&ls);
+
+ rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls);
+ rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls);
+ rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls);
+
+ while(1)
+ sleep(1000);
}
+++ /dev/null
-// this is the lock server
-// the lock client has a similar interface
-
-#ifndef lock_server_h
-#define lock_server_h
-
-#include <string>
-#include "lock_protocol.h"
-#include "lock_client.h"
-#include "rpc/rpc.h"
-#include <list>
-#include <map>
-
-using std::map;
-
-typedef map<lock_protocol::lockid_t, mutex> lock_map;
-
-class lock_server {
-
- protected:
- int nacquire;
- mutex lock_lock;
- lock_map locks;
- mutex &get_lock(lock_protocol::lockid_t lid);
-
- public:
- lock_server();
- ~lock_server() {};
- lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &);
- lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &);
- lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &);
-};
-
-#endif
}
}
-int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) {
+int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
LOG_FUNC_ENTER_SERVER;
holder h = holder(id, xid);
lock_state &st = get_lock_state(lid);
return lock_protocol::RETRY;
}
-int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
+int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
LOG_FUNC_ENTER_SERVER;
lock_state &st = get_lock_state(lid);
lock sl(st.m);
rep >> lock_table;
}
-lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) {
+lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) {
printf("stat request\n");
r = nacquire;
return lock_protocol::OK;
class rsm *rsm;
public:
lock_server_cache_rsm(class rsm *rsm = 0);
- lock_protocol::status stat(lock_protocol::lockid_t, int &);
+ lock_protocol::status stat(int &, lock_protocol::lockid_t);
void revoker();
void retryer();
string marshal_state();
void unmarshal_state(string state);
- int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
- int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
+ int acquire(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
+ int release(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
};
#endif
+++ /dev/null
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <unistd.h>
-#include "lock_server_cache_rsm.h"
-#include "paxos.h"
-#include "rsm.h"
-
-// Main loop of lock_server
-
-char tprintf_thread_prefix = 's';
-
-int
-main(int argc, char *argv[])
-{
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
-
- srandom(getpid());
-
- if(argc != 3){
- fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
- exit(1);
- }
-
- rsm rsm(argv[1], argv[2]);
- lock_server_cache_rsm ls(&rsm);
- rsm.set_state_transfer((rsm_state_transfer *)&ls);
- rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
- rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
- rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-
- while(1)
- sleep(1000);
-}
// check if the servers in l2 contains a majority of servers in l1
bool
-proposer::majority(const std::vector<std::string> &l1,
+proposer::majority(const std::vector<std::string> &l1,
const std::vector<std::string> &l2)
{
unsigned n = 0;
return n >= (l1.size() >> 1) + 1;
}
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
std::string _me)
- : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
+ : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
stable (true)
{
my_n.n = 0;
// otherwise fill in accepts with set of nodes that accepted,
// set v to the v_a with the highest n_a, and return true.
bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
+proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
std::vector<std::string> nodes,
std::string &v)
{
}
void
-proposer::decide(unsigned instance, std::vector<std::string> accepts,
+proposer::decide(unsigned instance, std::vector<std::string> accepts,
std::string v)
{
struct paxos_protocol::decidearg arg = { instance, v };
}
}
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
+acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
std::string _value)
: cfg(_cfg), me (_me), instance_h(0)
{
}
pxs = new rpcs(atoi(_me.c_str()));
- pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
- pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
- pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+ pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+ pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+ pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
}
paxos_protocol::status
-acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
- paxos_protocol::prepareres &r)
+acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
{
lock ml(pxs_mutex);
r.oldinstance = false;
}
paxos_protocol::status
-acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
+acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
{
lock ml(pxs_mutex);
r = false;
// the src argument is only for debug purpose
paxos_protocol::status
-acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
+acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
{
lock ml(pxs_mutex);
- tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
+ tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
a.instance, instance_h, v_a.c_str());
if (a.instance == instance_h + 1) {
VERIFY(v_a == a.v);
std::map<unsigned,std::string> values; // vals of each instance
void commit_wo(unsigned instance, std::string v);
- paxos_protocol::status preparereq(std::string src,
- paxos_protocol::preparearg a,
- paxos_protocol::prepareres &r);
- paxos_protocol::status acceptreq(std::string src,
- paxos_protocol::acceptarg a, bool &r);
- paxos_protocol::status decidereq(std::string src,
- paxos_protocol::decidearg a, int &r);
+ paxos_protocol::status preparereq(paxos_protocol::prepareres &r,
+ std::string src, paxos_protocol::preparearg a);
+ paxos_protocol::status acceptreq(bool &r, std::string src,
+ paxos_protocol::acceptarg a);
+ paxos_protocol::status decidereq(int &r, std::string src,
+ paxos_protocol::decidearg a);
friend class log;
public:
- acceptor(class paxos_change *cfg, bool _first, std::string _me,
+ acceptor(class paxos_change *cfg, bool _first, std::string _me,
std::string _value);
~acceptor() {};
void commit(unsigned instance, std::string v);
prop_t my_n; // number of the last proposal used in this instance
void setn();
- bool prepare(unsigned instance, std::vector<std::string> &accepts,
+ bool prepare(unsigned instance, std::vector<std::string> &accepts,
std::vector<std::string> nodes,
std::string &v);
- void accept(unsigned instance, std::vector<std::string> &accepts,
+ void accept(unsigned instance, std::vector<std::string> &accepts,
std::vector<std::string> nodes, std::string v);
void decide(unsigned instance, std::vector<std::string> accepts,
std::string v);
_ind = RPC_HEADER_SZ;
}
- template <class OutputIterator>
- void iterate(OutputIterator i, int n) {
- while (n--) {
- typename OutputIterator::value_type t;
- *this >> t;
- *i++ = t;
- }
+ template <class A>
+ inline A grab() {
+ A a;
+ *this >> a;
+ return a;
}
};
return u >> d.first >> d.second;
}
+template <size_t...> struct tuple_indices {};
+template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
+template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
+ typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
+};
+template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
+ typedef tuple_indices<Indices...> type;
+};
+template <size_t E, size_t S = 0> struct make_tuple_indices {
+ typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
+};
+
+struct VerifyOnFailure {
+ static inline int unmarshall_args_failure() {
+ VERIFY(0);
+ return 0;
+ }
+};
+
+typedef std::function<int(unmarshall &, marshall &)> handler;
+
+using std::move;
+using std::get;
+using std::tuple;
+using std::decay;
+
+#include <iostream>
+
+template <class F, class R, class args_type, size_t ...Indices>
+typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
+ return f(r, move(get<Indices>(t))...);
+}
+
+template <class F, class C, class R, class args_type, size_t ...Indices>
+typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
+ return (c->*f)(r, move(get<Indices>(t))...);
+}
+
+template <class Functor,
+ class Instance,
+ class Signature,
+ class ErrorHandler=VerifyOnFailure> struct marshalled_func_imp;
+
+template <class F, class C, class ErrorHandler, class R, class... Args>
+struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
+ using result_type = R;
+ using args_type = tuple<typename decay<Args>::type...>;
+ using index_type = typename make_tuple_indices<sizeof...(Args)>::type;
+
+ static inline int call(F f, C *c, unmarshall &u, marshall &m) {
+ args_type t{std::move(std::tuple<Args...>{u.grab<Args>()...})};
+ if (!u.okdone())
+ return ErrorHandler::unmarshall_args_failure();
+ R r;
+ int b = invoke(f, c, r, t, index_type());
+ m << r;
+ return b;
+ }
+
+ static inline handler *wrap(F f, C *c=nullptr) {
+ typename decay<F>::type f_ = f;
+ return new handler([f_, c](unmarshall &u, marshall &m) -> int {
+ return call(f_, c, u, m);
+ });
+ }
+};
+
+template <class Functor,
+ class Signature,
+ class ErrorHandler=VerifyOnFailure> struct marshalled_func;
+
+template <class F, class ErrorHandler, class... Args>
+struct marshalled_func<F, int(*)(Args...), ErrorHandler> :
+ public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class C, class... Args>
+struct marshalled_func<F, int(C::*)(Args...), ErrorHandler> :
+ public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class Signature>
+struct marshalled_func<F, std::function<Signature>, ErrorHandler> :
+ public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
+
#endif
}
}
-
rpcs::rpcs(unsigned int p1, int count)
: port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
{
lossytest_ = atoi(loss_env);
}
- reg(rpc_const::bind, this, &rpcs::rpcbind);
+ reg(rpc_const::bind, &rpcs::rpcbind, this);
dispatchpool_ = new ThrPool(6,false);
listener_ = new tcpsconn(this, port_, lossytest_);
updatestat(proc);
}
- rh.ret = f->fn(req, rep);
- if (rh.ret == rpc_const::unmarshal_args_failure) {
- fprintf(stderr, "rpcs::dispatch: failed to"
- " unmarshall the arguments. You are"
- " probably calling RPC 0x%x with wrong"
- " types of arguments.\n", proc);
- VERIFY(0);
- }
+ rh.ret = (*f)(req, rep);
+ if (rh.ret == rpc_const::unmarshal_args_failure) {
+ fprintf(stderr, "rpcs::dispatch: failed to"
+ " unmarshall the arguments. You are"
+ " probably calling RPC 0x%x with wrong"
+ " types of arguments.\n", proc);
+ VERIFY(0);
+ }
VERIFY(rh.ret >= 0);
rep.pack_reply_header(rh);
// rpc handler
int
-rpcs::rpcbind(int a, int &r)
+rpcs::rpcbind(int &r, int a)
{
jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
r = nonce_;
#endif
class rpc_const {
- public:
- static const unsigned int bind = 1; // handler number reserved for bind
- static const int timeout_failure = -1;
- static const int unmarshal_args_failure = -2;
- static const int unmarshal_reply_failure = -3;
- static const int atmostonce_failure = -4;
- static const int oldsrv_failure = -5;
- static const int bind_failure = -6;
- static const int cancel_failure = -7;
+ public:
+ static const unsigned int bind = 1; // handler number reserved for bind
+ static const int timeout_failure = -1;
+ static const int unmarshal_args_failure = -2;
+ static const int unmarshal_reply_failure = -3;
+ static const int atmostonce_failure = -4;
+ static const int oldsrv_failure = -5;
+ static const int bind_failure = -6;
+ static const int cancel_failure = -7;
+};
+
+struct ReturnOnFailure {
+ static inline int unmarshall_args_failure() {
+ return rpc_const::unmarshal_args_failure;
+ }
};
// rpc client endpoint.
// threaded: multiple threads can be sending RPCs,
class rpcc : public chanmgr {
- private:
+ private:
- //manages per rpc info
- struct caller {
- caller(unsigned int xxid, unmarshall *un);
- ~caller();
+ //manages per rpc info
+ struct caller {
+ caller(unsigned int xxid, unmarshall *un);
+ ~caller();
- unsigned int xid;
- unmarshall *un;
- int intret;
- bool done;
+ unsigned int xid;
+ unmarshall *un;
+ int intret;
+ bool done;
std::mutex m;
std::condition_variable c;
- };
+ };
- void get_refconn(connection **ch);
- void update_xid_rep(unsigned int xid);
+ void get_refconn(connection **ch);
+ void update_xid_rep(unsigned int xid);
- sockaddr_in dst_;
- unsigned int clt_nonce_;
- unsigned int srv_nonce_;
- bool bind_done_;
- unsigned int xid_;
- int lossytest_;
- bool retrans_;
- bool reachable_;
+ sockaddr_in dst_;
+ unsigned int clt_nonce_;
+ unsigned int srv_nonce_;
+ bool bind_done_;
+ unsigned int xid_;
+ int lossytest_;
+ bool retrans_;
+ bool reachable_;
- connection *chan_;
+ connection *chan_;
std::mutex m_; // protect insert/delete to calls[]
- std::mutex chan_m_;
+ std::mutex chan_m_;
- bool destroy_wait_;
+ bool destroy_wait_;
std::condition_variable destroy_wait_c_;
- std::map<int, caller *> calls_;
- std::list<unsigned int> xid_rep_window_;
+ std::map<int, caller *> calls_;
+ std::list<unsigned int> xid_rep_window_;
struct request {
request() { clear(); }
};
struct request dup_req_;
int xid_rep_done_;
- public:
+ public:
- rpcc(sockaddr_in d, bool retrans=true);
- ~rpcc();
+ rpcc(sockaddr_in d, bool retrans=true);
+ ~rpcc();
- struct TO {
- int to;
- };
- static const TO to_max;
- static const TO to_min;
- static TO to(int x) { TO t; t.to = x; return t;}
+ struct TO {
+ int to;
+ };
+ static const TO to_max;
+ static const TO to_min;
+ static TO to(int x) { TO t; t.to = x; return t;}
- unsigned int id() { return clt_nonce_; }
+ unsigned int id() { return clt_nonce_; }
- int bind(TO to = to_max);
+ int bind(TO to = to_max);
- void set_reachable(bool r) { reachable_ = r; }
+ void set_reachable(bool r) { reachable_ = r; }
- void cancel();
+ void cancel();
int islossy() { return lossytest_ > 0; }
- int call1(unsigned int proc,
- marshall &req, unmarshall &rep, TO to);
+ int call1(unsigned int proc,
+ marshall &req, unmarshall &rep, TO to);
- bool got_pdu(connection *c, char *b, int sz);
+ bool got_pdu(connection *c, char *b, int sz);
- template<class R>
- int call_m(unsigned int proc, marshall &req, R & r, TO to);
+ template<class R>
+ int call_m(unsigned int proc, marshall &req, R & r, TO to);
- template<class R, typename ...Args>
- inline int call(unsigned int proc, R & r, const Args&... args);
+ template<class R, typename ...Args>
+ inline int call(unsigned int proc, R & r, const Args&... args);
- template<class R, typename ...Args>
- inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
+ template<class R, typename ...Args>
+ inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
};
template<class R> int
template<class R, typename... Args> inline int
rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
{
- marshall m{args...};
- return call_m(proc, m, r, to);
+ marshall m{args...};
+ return call_m(proc, m, r, to);
}
bool operator<(const sockaddr_in &a, const sockaddr_in &b);
-class handler {
- public:
- handler() { }
- virtual ~handler() { }
- virtual int fn(unmarshall &, marshall &) = 0;
-};
-
-
// rpc server endpoint.
class rpcs : public chanmgr {
- typedef enum {
- NEW, // new RPC, not a duplicate
- INPROGRESS, // duplicate of an RPC we're still processing
- DONE, // duplicate of an RPC we already replied to (have reply)
- FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
- } rpcstate_t;
+ typedef enum {
+ NEW, // new RPC, not a duplicate
+ INPROGRESS, // duplicate of an RPC we're still processing
+ DONE, // duplicate of an RPC we already replied to (have reply)
+ FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten
+ } rpcstate_t;
- private:
+ private:
// state about an in-progress or completed RPC, for at-most-once.
// if cb_present is true, then the RPC is complete and a reply
// has been sent; in that case buf points to a copy of the reply,
// and sz holds the size of the reply.
- struct reply_t {
- reply_t (unsigned int _xid) {
- xid = _xid;
- cb_present = false;
- buf = NULL;
- sz = 0;
- }
- reply_t (unsigned int _xid, char *_buf, int _sz) {
- xid = _xid;
- cb_present = true;
- buf = _buf;
- sz = _sz;
- }
- unsigned int xid;
- bool cb_present; // whether the reply buffer is valid
- char *buf; // the reply buffer
- int sz; // the size of reply buffer
- };
-
- int port_;
- unsigned int nonce_;
-
- // provide at most once semantics by maintaining a window of replies
- // per client that that client hasn't acknowledged receiving yet.
+ struct reply_t {
+ reply_t (unsigned int _xid) {
+ xid = _xid;
+ cb_present = false;
+ buf = NULL;
+ sz = 0;
+ }
+ reply_t (unsigned int _xid, char *_buf, int _sz) {
+ xid = _xid;
+ cb_present = true;
+ buf = _buf;
+ sz = _sz;
+ }
+ unsigned int xid;
+ bool cb_present; // whether the reply buffer is valid
+ char *buf; // the reply buffer
+ int sz; // the size of reply buffer
+ };
+
+ int port_;
+ unsigned int nonce_;
+
+ // provide at most once semantics by maintaining a window of replies
+ // per client that that client hasn't acknowledged receiving yet.
// indexed by client nonce.
- std::map<unsigned int, std::list<reply_t> > reply_window_;
-
- void free_reply_window(void);
- void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
-
- rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
- unsigned int xid, unsigned int rep_xid,
- char **b, int *sz);
-
- void updatestat(unsigned int proc);
-
- // latest connection to the client
- std::map<unsigned int, connection *> conns_;
-
- // counting
- const int counting_;
- int curr_counts_;
- std::map<int, int> counts_;
-
- int lossytest_;
- bool reachable_;
-
- // map proc # to function
- std::map<int, handler *> procs_;
-
- std::mutex procs_m_; // protect insert/delete to procs[]
- std::mutex count_m_; //protect modification of counts
- std::mutex reply_window_m_; // protect reply window et al
- std::mutex conss_m_; // protect conns_
-
-
- protected:
-
- struct djob_t {
- djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
- char *buf;
- int sz;
- connection *conn;
- };
- void dispatch(djob_t *);
-
- // internal handler registration
- void reg1(unsigned int proc, handler *);
-
- ThrPool* dispatchpool_;
- tcpsconn* listener_;
-
- public:
- rpcs(unsigned int port, int counts=0);
- ~rpcs();
- inline int port() { return listener_->port(); }
- //RPC handler for clients binding
- int rpcbind(int a, int &r);
-
- void set_reachable(bool r) { reachable_ = r; }
-
- bool got_pdu(connection *c, char *b, int sz);
-
- // register a handler
- template<class S, class A1, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
- template<class S, class A1, class A2, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2,
- R & r));
- template<class S, class A1, class A2, class A3, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, R & r));
- template<class S, class A1, class A2, class A3, class A4, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
- class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- const A6, R & r));
- template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
- class A7, class R>
- void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2,
- const A3, const A4, const A5,
- const A6, const A7,
- R & r));
-};
+ std::map<unsigned int, std::list<reply_t> > reply_window_;
-template<class S, class A1, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- R r;
- args >> a1;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ void free_reply_window(void);
+ void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
-template<class S, class A1, class A2, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- R r;
- args >> a1;
- args >> a2;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
+ unsigned int xid, unsigned int rep_xid,
+ char **b, int *sz);
-template<class S, class A1, class A2, class A3, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ void updatestat(unsigned int proc);
-template<class S, class A1, class A2, class A3, class A4, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ // latest connection to the client
+ std::map<unsigned int, connection *> conns_;
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ // counting
+ const int counting_;
+ int curr_counts_;
+ std::map<int, int> counts_;
-template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, const A6 a6,
- R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, const A6 a6, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, const A6 a6, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- A6 a6;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- args >> a6;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ int lossytest_;
+ bool reachable_;
-template<class S, class A1, class A2, class A3, class A4, class A5,
- class A6, class A7, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, const A6 a6,
- const A7 a7, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, const A6 a6, const A7 a7, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, const A6 a6,
- const A7 a7, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- A6 a6;
- A7 a7;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- args >> a5;
- args >> a6;
- args >> a7;
- if(!args.okdone())
- return rpc_const::unmarshal_args_failure;
- int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
+ // map proc # to function
+ std::map<int, handler *> procs_;
+
+ std::mutex procs_m_; // protect insert/delete to procs[]
+ std::mutex count_m_; //protect modification of counts
+ std::mutex reply_window_m_; // protect reply window et al
+ std::mutex conss_m_; // protect conns_
+
+
+ protected:
+
+ struct djob_t {
+ djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
+ char *buf;
+ int sz;
+ connection *conn;
+ };
+ void dispatch(djob_t *);
+ // internal handler registration
+ void reg1(unsigned int proc, handler *);
+
+ ThrPool* dispatchpool_;
+ tcpsconn* listener_;
+
+ public:
+ rpcs(unsigned int port, int counts=0);
+ ~rpcs();
+ inline int port() { return listener_->port(); }
+ //RPC handler for clients binding
+ int rpcbind(int &r, int a);
+
+ void set_reachable(bool r) { reachable_ = r; }
+
+ bool got_pdu(connection *c, char *b, int sz);
+
+ template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
+};
+
+template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
+ reg1(proc, marshalled_func<F, F, ReturnOnFailure>::wrap(f, c));
+}
void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
void make_sockaddr(const char *host, const char *port,
- struct sockaddr_in *dst);
+ struct sockaddr_in *dst);
#endif
// from multiple classes.
class srv {
public:
- int handle_22(const std::string a, const std::string b, std::string & r);
- int handle_fast(const int a, int &r);
- int handle_slow(const int a, int &r);
- int handle_bigrep(const int a, std::string &r);
+ int handle_22(std::string & r, const std::string a, const std::string b);
+ int handle_fast(int &r, const int a);
+ int handle_slow(int &r, const int a);
+ int handle_bigrep(std::string &r, const int a);
};
// a handler. a and b are arguments, r is the result.
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
int
-srv::handle_22(const std::string a, std::string b, std::string &r)
+srv::handle_22(std::string &r, const std::string a, std::string b)
{
r = a + b;
return 0;
}
int
-srv::handle_fast(const int a, int &r)
+srv::handle_fast(int &r, const int a)
{
r = a + 1;
return 0;
}
int
-srv::handle_slow(const int a, int &r)
+srv::handle_slow(int &r, const int a)
{
usleep(random() % 5000);
r = a + 2;
}
int
-srv::handle_bigrep(const int len, std::string &r)
+srv::handle_bigrep(std::string &r, const int len)
{
r = std::string(len, 'x');
return 0;
void startserver()
{
server = new rpcs(port);
- server->reg(22, &service, &srv::handle_22);
- server->reg(23, &service, &srv::handle_fast);
- server->reg(24, &service, &srv::handle_slow);
- server->reg(25, &service, &srv::handle_bigrep);
+ server->reg(22, &srv::handle_22, &service);
+ server->reg(23, &srv::handle_fast, &service);
+ server->reg(24, &srv::handle_slow, &service);
+ server->reg(25, &srv::handle_bigrep, &service);
}
void
commit_change(1);
}
rsmrpc = cfg->get_rpcs();
- rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke);
- rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members);
- rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke);
- rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq);
- rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq);
- rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq);
+ rsmrpc->reg(rsm_client_protocol::invoke, &rsm::client_invoke, this);
+ rsmrpc->reg(rsm_client_protocol::members, &rsm::client_members, this);
+ rsmrpc->reg(rsm_protocol::invoke, &rsm::invoke, this);
+ rsmrpc->reg(rsm_protocol::transferreq, &rsm::transferreq, this);
+ rsmrpc->reg(rsm_protocol::transferdonereq, &rsm::transferdonereq, this);
+ rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
testsvr = new rpcs(atoi(_me.c_str()) + 1);
- testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq);
- testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq);
+ testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
+ testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
{
lock ml(rsm_mutex);
unmarshall args(req);
marshall rep;
std::string reps;
- rsm_protocol::status ret = h->fn(args, rep);
+ rsm_protocol::status ret = (*h)(args, rep);
marshall rep1;
rep1 << ret;
rep1 << rep.str();
// number, and invokes it on all members of the replicated state
// machine.
//
-rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) {
+rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) {
LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
lock ml(invoke_mutex);
std::vector<std::string> m;
// the replica must execute requests in order (with no gaps)
// according to requests' seqno
-rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) {
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) {
LOG("rsm::invoke: procno 0x" << std::hex << proc);
lock ml(invoke_mutex);
std::vector<std::string> m;
/**
* RPC handler: Send back the local node's state to the caller
*/
-rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid,
- rsm_protocol::transferres &r) {
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
+ viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
int ret = rsm_protocol::OK;
tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
* RPC handler: Inform the local node (the primary) that node m has synchronized
* for view vid
*/
-rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
+rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
// a node that wants to join an RSM as a server sends a
// joinreq to the RSM's current primary; this is the
// handler for that RPC.
-rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) {
+rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
int ret = rsm_protocol::OK;
lock ml(rsm_mutex);
* so the client can switch to a different primary
* when it existing primary fails
*/
-rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string> &r) {
+rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int i) {
std::vector<std::string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
rsmrpc->set_reachable(heal);
}
-rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) {
+rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) {
lock ml(rsm_mutex);
tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
heal, dopartition, partitioned);
}
}
-rsm_test_protocol::status rsm::breakpointreq(int b, int &r) {
+rsm_test_protocol::status rsm::breakpointreq(int &r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
tprintf("rsm::breakpointreq: %d\n", b);
#include <arpa/inet.h>
#include "config.h"
-
class rsm : public config_view_change {
private:
void reg1(int proc, handler *);
bool break2;
- rsm_client_protocol::status client_members(int i,
- std::vector<std::string> &r);
- rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq,
- int &dummy);
- rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid,
- rsm_protocol::transferres &r);
- rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &);
- rsm_protocol::status joinreq(std::string src, viewstamp last,
- rsm_protocol::joinres &r);
- rsm_test_protocol::status test_net_repairreq(int heal, int &r);
- rsm_test_protocol::status breakpointreq(int b, int &r);
+ rsm_client_protocol::status client_members(std::vector<std::string> &r, int i);
+ rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq);
+ rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src,
+ viewstamp last, unsigned vid);
+ rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
+ rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
+ viewstamp last);
+ rsm_test_protocol::status test_net_repairreq(int &r, int heal);
+ rsm_test_protocol::status breakpointreq(int &r, int b);
std::mutex rsm_mutex;
std::mutex invoke_mutex;
std::condition_variable sync_cond;
void execute(int procno, std::string req, std::string &r);
- rsm_client_protocol::status client_invoke(int procno, std::string req,
- std::string &r);
+ rsm_client_protocol::status client_invoke(std::string &r, int procno,
+ std::string req);
bool statetransfer(std::string m);
bool statetransferdone(std::string m);
bool join(std::string m);
void recovery();
void commit_change(unsigned vid);
- template<class S, class A1, class R>
- void reg(int proc, S*, int (S::*meth)(const A1 a1, R &));
- template<class S, class A1, class A2, class R>
- void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &));
- template<class S, class A1, class A2, class A3, class R>
- void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, R &));
- template<class S, class A1, class A2, class A3, class A4, class R>
- void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4, R &));
- template<class S, class A1, class A2, class A3, class A4, class A5, class R>
- void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4, const A5 a5, R &));
+ template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
};
-template<class S, class A1, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) {
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- R r;
- args >> a1;
- VERIFY(args.okdone());
- int b = (sob->*meth)(a1,r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) {
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- R r;
- args >> a1;
- args >> a2;
- VERIFY(args.okdone());
- int b = (sob->*meth)(a1,a2,r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, R & r)) {
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- VERIFY(args.okdone());
- int b = (sob->*meth)(a1,a2,a3,r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4, R & r)) {
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- VERIFY(args.okdone());
- int b = (sob->*meth)(a1,a2,a3,a4,r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
-}
-
-
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
- rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
- const A3 a3, const A4 a4,
- const A5 a5, R & r))
-{
- class h1 : public handler {
- private:
- S * sob;
- int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
- const A5 a5, R & r);
- public:
- h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
- const A4 a4, const A5 a5, R & r))
- : sob(xsob), meth(xmeth) { }
- int fn(unmarshall &args, marshall &ret) {
- A1 a1;
- A2 a2;
- A3 a3;
- A4 a4;
- A5 a5;
- R r;
- args >> a1;
- args >> a2;
- args >> a3;
- args >> a4;
- VERIFY(args.okdone());
- int b = (sob->*meth)(a1,a2,a3,a4,a5,r);
- ret << r;
- return b;
- }
- };
- reg1(proc, new h1(sob, meth));
+template<class F, class C> void rsm::reg(int proc, F f, C *c) {
+ reg1(proc, marshalled_func<F, F>::wrap(f, c));
}
#endif /* rsm_h */