From: Peter Iannucci Date: Thu, 19 Sep 2013 21:16:20 +0000 (-0400) Subject: TEMPLATE MAGIC FOR GREAT JUSTICE X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/4b9798f44ae94deabf87dd534337b55259272950?hp=130f2d53438eb6193accb445aca52fa8e2fe4158 TEMPLATE MAGIC FOR GREAT JUSTICE --- diff --git a/Makefile b/Makefile index b160065..a528737 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ lock_demo : $(lock_demo) rpc/librpc.a lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o lock_tester : $(lock_tester) rpc/librpc.a -lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o +lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o lock_server : $(lock_server) rpc/librpc.a rsm_tester=rsm_tester.o rsmtest_client.o @@ -31,7 +31,7 @@ rsm_tester: $(rsm_tester) rpc/librpc.a -include *.d -include rpc/*.d -clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester +clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester lock_demo rsm_tester .PHONY: clean $(EXTRA_TARGETS) clean: rm -rf $(clean_files) diff --git a/config.cc b/config.cc index 96e6cd7..04c869e 100644 --- a/config.cc +++ b/config.cc @@ -50,7 +50,7 @@ config::config( paxos_proposer = new proposer(this, paxos_acceptor, me); // XXX hack; maybe should have its own port number - paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat); + paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this); { lock ml(cfg_mutex); @@ -265,7 +265,7 @@ config::heartbeater() } paxos_protocol::status -config::heartbeat(std::string m, unsigned vid, int &r) +config::heartbeat(int &r, std::string m, unsigned vid) { lock ml(cfg_mutex); int ret = paxos_protocol::ERR; diff --git a/config.h b/config.h index cd276fb..f3fe216 100644 --- a/config.h +++ b/config.h @@ -23,10 +23,7 @@ class config : public paxos_change { std::vector mems; mutex cfg_mutex; std::condition_variable config_cond; - paxos_protocol::status heartbeat( - std::string m, - unsigned instance, - int &r); + paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance); std::string value(const std::vector &mems) const; void members(const std::string &v, std::vector &m) const; void get_view_wo(unsigned instance, std::vector &m); diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc index c0be985..2a6d6e3 100644 --- a/lock_client_cache_rsm.cc +++ b/lock_client_cache_rsm.cc @@ -59,8 +59,8 @@ lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_use id = host.str(); last_port = rlock_port; rpcs *rlsrpc = new rpcs(rlock_port); - rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler); - rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler); + rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this); + rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this); { lock sl(xid_mutex); xid = 0; @@ -178,7 +178,7 @@ lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid return lock_protocol::OK; } -rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { +rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { LOG("Revoke handler " << lid << " " << xid); lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -201,7 +201,7 @@ rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lock return rlock_protocol::OK; } -rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) { +rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) { lock_state &st = get_lock_state(lid); lock sl(st.m); VERIFY(st.state == lock_state::acquiring); diff --git a/lock_client_cache_rsm.h b/lock_client_cache_rsm.h index 049d18a..815224c 100644 --- a/lock_client_cache_rsm.h +++ b/lock_client_cache_rsm.h @@ -73,8 +73,8 @@ class lock_client_cache_rsm : public lock_client { lock_protocol::status acquire(lock_protocol::lockid_t); virtual lock_protocol::status release(lock_protocol::lockid_t); void releaser(); - rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); - rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &); + rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); + rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); }; diff --git a/lock_server.cc b/lock_server.cc index be0c1c9..0f82080 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -1,46 +1,37 @@ -// the lock server implementation - -#include "lock_server.h" -#include +#include "rpc/rpc.h" +#include +#include #include #include -#include -#include "lock.h" +#include "lock_server_cache_rsm.h" +#include "paxos.h" +#include "rsm.h" -lock_server::lock_server(): - nacquire (0) -{ -} +// Main loop of lock_server -// caller must hold lock_lock -mutex & -lock_server::get_lock(lock_protocol::lockid_t lid) { - lock ml(lock_lock); - // by the semantics of std::map, this will create - // the mutex if it doesn't already exist - mutex &l = locks[lid]; - return l; -} +char tprintf_thread_prefix = 's'; -lock_protocol::status -lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r) +int +main(int argc, char *argv[]) { - lock_protocol::status ret = lock_protocol::OK; - printf("stat request from clt %d\n", clt); - r = nacquire; - return ret; -} + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); -lock_protocol::status -lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r) -{ - get_lock(lid).lock(); - return lock_protocol::OK; -} + srandom(getpid()); -lock_protocol::status -lock_server::release(int clt, lock_protocol::lockid_t lid, int &r) -{ - get_lock(lid).unlock(); - return lock_protocol::OK; + if(argc != 3){ + fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); + exit(1); + } + + rsm rsm(argv[1], argv[2]); + lock_server_cache_rsm ls(&rsm); + rsm.set_state_transfer(&ls); + + rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls); + rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls); + rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls); + + while(1) + sleep(1000); } diff --git a/lock_server.h b/lock_server.h deleted file mode 100644 index f03a717..0000000 --- a/lock_server.h +++ /dev/null @@ -1,34 +0,0 @@ -// this is the lock server -// the lock client has a similar interface - -#ifndef lock_server_h -#define lock_server_h - -#include -#include "lock_protocol.h" -#include "lock_client.h" -#include "rpc/rpc.h" -#include -#include - -using std::map; - -typedef map lock_map; - -class lock_server { - - protected: - int nacquire; - mutex lock_lock; - lock_map locks; - mutex &get_lock(lock_protocol::lockid_t lid); - - public: - lock_server(); - ~lock_server() {}; - lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &); - lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &); - lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &); -}; - -#endif diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc index 00d3f54..8f3cf2b 100644 --- a/lock_server_cache_rsm.cc +++ b/lock_server_cache_rsm.cc @@ -119,7 +119,7 @@ void lock_server_cache_rsm::retryer() { } } -int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) { +int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) { LOG_FUNC_ENTER_SERVER; holder h = holder(id, xid); lock_state &st = get_lock_state(lid); @@ -177,7 +177,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_ return lock_protocol::RETRY; } -int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) { +int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) { LOG_FUNC_ENTER_SERVER; lock_state &st = get_lock_state(lid); lock sl(st.m); @@ -205,7 +205,7 @@ void lock_server_cache_rsm::unmarshal_state(string state) { rep >> lock_table; } -lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) { +lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) { printf("stat request\n"); r = nacquire; return lock_protocol::OK; diff --git a/lock_server_cache_rsm.h b/lock_server_cache_rsm.h index a765e52..c33b51e 100644 --- a/lock_server_cache_rsm.h +++ b/lock_server_cache_rsm.h @@ -45,13 +45,13 @@ class lock_server_cache_rsm : public rsm_state_transfer { class rsm *rsm; public: lock_server_cache_rsm(class rsm *rsm = 0); - lock_protocol::status stat(lock_protocol::lockid_t, int &); + lock_protocol::status stat(int &, lock_protocol::lockid_t); void revoker(); void retryer(); string marshal_state(); void unmarshal_state(string state); - int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); - int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &); + int acquire(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t); + int release(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t); }; #endif diff --git a/lock_smain.cc b/lock_smain.cc deleted file mode 100644 index 69cc433..0000000 --- a/lock_smain.cc +++ /dev/null @@ -1,36 +0,0 @@ -#include "rpc/rpc.h" -#include -#include -#include -#include -#include "lock_server_cache_rsm.h" -#include "paxos.h" -#include "rsm.h" - -// Main loop of lock_server - -char tprintf_thread_prefix = 's'; - -int -main(int argc, char *argv[]) -{ - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); - - srandom(getpid()); - - if(argc != 3){ - fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]); - exit(1); - } - - rsm rsm(argv[1], argv[2]); - lock_server_cache_rsm ls(&rsm); - rsm.set_state_transfer((rsm_state_transfer *)&ls); - rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire); - rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release); - rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat); - - while(1) - sleep(1000); -} diff --git a/paxos.cc b/paxos.cc index 4434788..b0ec640 100644 --- a/paxos.cc +++ b/paxos.cc @@ -59,7 +59,7 @@ proposer::isrunning() // check if the servers in l2 contains a majority of servers in l1 bool -proposer::majority(const std::vector &l1, +proposer::majority(const std::vector &l1, const std::vector &l2) { unsigned n = 0; @@ -71,9 +71,9 @@ proposer::majority(const std::vector &l1, return n >= (l1.size() >> 1) + 1; } -proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, +proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, std::string _me) - : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), + : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), stable (true) { my_n.n = 0; @@ -145,7 +145,7 @@ proposer::run(int instance, std::vector cur_nodes, std::string newv // otherwise fill in accepts with set of nodes that accepted, // set v to the v_a with the highest n_a, and return true. bool -proposer::prepare(unsigned instance, std::vector &accepts, +proposer::prepare(unsigned instance, std::vector &accepts, std::vector nodes, std::string &v) { @@ -199,7 +199,7 @@ proposer::accept(unsigned instance, std::vector &accepts, } void -proposer::decide(unsigned instance, std::vector accepts, +proposer::decide(unsigned instance, std::vector accepts, std::string v) { struct paxos_protocol::decidearg arg = { instance, v }; @@ -213,7 +213,7 @@ proposer::decide(unsigned instance, std::vector accepts, } } -acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, +acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, std::string _value) : cfg(_cfg), me (_me), instance_h(0) { @@ -232,14 +232,13 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, } pxs = new rpcs(atoi(_me.c_str())); - pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq); - pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq); - pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq); + pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this); + pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this); + pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this); } paxos_protocol::status -acceptor::preparereq(std::string src, paxos_protocol::preparearg a, - paxos_protocol::prepareres &r) +acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a) { lock ml(pxs_mutex); r.oldinstance = false; @@ -260,7 +259,7 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a, } paxos_protocol::status -acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) +acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a) { lock ml(pxs_mutex); r = false; @@ -275,10 +274,10 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r) // the src argument is only for debug purpose paxos_protocol::status -acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r) +acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a) { lock ml(pxs_mutex); - tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", + tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", a.instance, instance_h, v_a.c_str()); if (a.instance == instance_h + 1) { VERIFY(v_a == a.v); diff --git a/paxos.h b/paxos.h index c7b1af4..170292a 100644 --- a/paxos.h +++ b/paxos.h @@ -30,18 +30,17 @@ class acceptor { std::map values; // vals of each instance void commit_wo(unsigned instance, std::string v); - paxos_protocol::status preparereq(std::string src, - paxos_protocol::preparearg a, - paxos_protocol::prepareres &r); - paxos_protocol::status acceptreq(std::string src, - paxos_protocol::acceptarg a, bool &r); - paxos_protocol::status decidereq(std::string src, - paxos_protocol::decidearg a, int &r); + paxos_protocol::status preparereq(paxos_protocol::prepareres &r, + std::string src, paxos_protocol::preparearg a); + paxos_protocol::status acceptreq(bool &r, std::string src, + paxos_protocol::acceptarg a); + paxos_protocol::status decidereq(int &r, std::string src, + paxos_protocol::decidearg a); friend class log; public: - acceptor(class paxos_change *cfg, bool _first, std::string _me, + acceptor(class paxos_change *cfg, bool _first, std::string _me, std::string _value); ~acceptor() {}; void commit(unsigned instance, std::string v); @@ -73,10 +72,10 @@ class proposer { prop_t my_n; // number of the last proposal used in this instance void setn(); - bool prepare(unsigned instance, std::vector &accepts, + bool prepare(unsigned instance, std::vector &accepts, std::vector nodes, std::string &v); - void accept(unsigned instance, std::vector &accepts, + void accept(unsigned instance, std::vector &accepts, std::vector nodes, std::string v); void decide(unsigned instance, std::vector accepts, std::string v); diff --git a/rpc/marshall.h b/rpc/marshall.h index 27cebbb..448240b 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -219,13 +219,11 @@ class unmarshall { _ind = RPC_HEADER_SZ; } - template - void iterate(OutputIterator i, int n) { - while (n--) { - typename OutputIterator::value_type t; - *this >> t; - *i++ = t; - } + template + inline A grab() { + A a; + *this >> a; + return a; } }; @@ -285,4 +283,89 @@ operator>>(unmarshall &u, std::pair &d) { return u >> d.first >> d.second; } +template struct tuple_indices {}; +template struct make_indices_imp; +template struct make_indices_imp, E> { + typedef typename make_indices_imp, E>::type type; +}; +template struct make_indices_imp, E> { + typedef tuple_indices type; +}; +template struct make_tuple_indices { + typedef typename make_indices_imp, E>::type type; +}; + +struct VerifyOnFailure { + static inline int unmarshall_args_failure() { + VERIFY(0); + return 0; + } +}; + +typedef std::function handler; + +using std::move; +using std::get; +using std::tuple; +using std::decay; + +#include + +template +typename std::enable_if::value, int>::type +invoke(F f, void *, R & r, args_type & t, tuple_indices) { + return f(r, move(get(t))...); +} + +template +typename std::enable_if::value, int>::type +invoke(F f, C *c, R & r, args_type & t, tuple_indices) { + return (c->*f)(r, move(get(t))...); +} + +template struct marshalled_func_imp; + +template +struct marshalled_func_imp { + using result_type = R; + using args_type = tuple::type...>; + using index_type = typename make_tuple_indices::type; + + static inline int call(F f, C *c, unmarshall &u, marshall &m) { + args_type t{std::move(std::tuple{u.grab()...})}; + if (!u.okdone()) + return ErrorHandler::unmarshall_args_failure(); + R r; + int b = invoke(f, c, r, t, index_type()); + m << r; + return b; + } + + static inline handler *wrap(F f, C *c=nullptr) { + typename decay::type f_ = f; + return new handler([f_, c](unmarshall &u, marshall &m) -> int { + return call(f_, c, u, m); + }); + } +}; + +template struct marshalled_func; + +template +struct marshalled_func : + public marshalled_func_imp {}; + +template +struct marshalled_func : + public marshalled_func_imp {}; + +template +struct marshalled_func, ErrorHandler> : + public marshalled_func_imp {}; + #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index a5d5b1f..a3ffef1 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -387,7 +387,6 @@ compress: } } - rpcs::rpcs(unsigned int p1, int count) : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) { @@ -400,7 +399,7 @@ rpcs::rpcs(unsigned int p1, int count) lossytest_ = atoi(loss_env); } - reg(rpc_const::bind, this, &rpcs::rpcbind); + reg(rpc_const::bind, &rpcs::rpcbind, this); dispatchpool_ = new ThrPool(6,false); listener_ = new tcpsconn(this, port_, lossytest_); @@ -564,14 +563,14 @@ rpcs::dispatch(djob_t *j) updatestat(proc); } - rh.ret = f->fn(req, rep); - if (rh.ret == rpc_const::unmarshal_args_failure) { - fprintf(stderr, "rpcs::dispatch: failed to" - " unmarshall the arguments. You are" - " probably calling RPC 0x%x with wrong" - " types of arguments.\n", proc); - VERIFY(0); - } + rh.ret = (*f)(req, rep); + if (rh.ret == rpc_const::unmarshal_args_failure) { + fprintf(stderr, "rpcs::dispatch: failed to" + " unmarshall the arguments. You are" + " probably calling RPC 0x%x with wrong" + " types of arguments.\n", proc); + VERIFY(0); + } VERIFY(rh.ret >= 0); rep.pack_reply_header(rh); @@ -725,7 +724,7 @@ rpcs::free_reply_window(void) // rpc handler int -rpcs::rpcbind(int a, int &r) +rpcs::rpcbind(int &r, int a) { jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); r = nonce_; diff --git a/rpc/rpc.h b/rpc/rpc.h index f7245ad..245f277 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -16,15 +16,21 @@ #endif class rpc_const { - public: - static const unsigned int bind = 1; // handler number reserved for bind - static const int timeout_failure = -1; - static const int unmarshal_args_failure = -2; - static const int unmarshal_reply_failure = -3; - static const int atmostonce_failure = -4; - static const int oldsrv_failure = -5; - static const int bind_failure = -6; - static const int cancel_failure = -7; + public: + static const unsigned int bind = 1; // handler number reserved for bind + static const int timeout_failure = -1; + static const int unmarshal_args_failure = -2; + static const int unmarshal_reply_failure = -3; + static const int atmostonce_failure = -4; + static const int oldsrv_failure = -5; + static const int bind_failure = -6; + static const int cancel_failure = -7; +}; + +struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } }; // rpc client endpoint. @@ -32,44 +38,44 @@ class rpc_const { // threaded: multiple threads can be sending RPCs, class rpcc : public chanmgr { - private: + private: - //manages per rpc info - struct caller { - caller(unsigned int xxid, unmarshall *un); - ~caller(); + //manages per rpc info + struct caller { + caller(unsigned int xxid, unmarshall *un); + ~caller(); - unsigned int xid; - unmarshall *un; - int intret; - bool done; + unsigned int xid; + unmarshall *un; + int intret; + bool done; std::mutex m; std::condition_variable c; - }; + }; - void get_refconn(connection **ch); - void update_xid_rep(unsigned int xid); + void get_refconn(connection **ch); + void update_xid_rep(unsigned int xid); - sockaddr_in dst_; - unsigned int clt_nonce_; - unsigned int srv_nonce_; - bool bind_done_; - unsigned int xid_; - int lossytest_; - bool retrans_; - bool reachable_; + sockaddr_in dst_; + unsigned int clt_nonce_; + unsigned int srv_nonce_; + bool bind_done_; + unsigned int xid_; + int lossytest_; + bool retrans_; + bool reachable_; - connection *chan_; + connection *chan_; std::mutex m_; // protect insert/delete to calls[] - std::mutex chan_m_; + std::mutex chan_m_; - bool destroy_wait_; + bool destroy_wait_; std::condition_variable destroy_wait_c_; - std::map calls_; - std::list xid_rep_window_; + std::map calls_; + std::list xid_rep_window_; struct request { request() { clear(); } @@ -80,42 +86,42 @@ class rpcc : public chanmgr { }; struct request dup_req_; int xid_rep_done_; - public: + public: - rpcc(sockaddr_in d, bool retrans=true); - ~rpcc(); + rpcc(sockaddr_in d, bool retrans=true); + ~rpcc(); - struct TO { - int to; - }; - static const TO to_max; - static const TO to_min; - static TO to(int x) { TO t; t.to = x; return t;} + struct TO { + int to; + }; + static const TO to_max; + static const TO to_min; + static TO to(int x) { TO t; t.to = x; return t;} - unsigned int id() { return clt_nonce_; } + unsigned int id() { return clt_nonce_; } - int bind(TO to = to_max); + int bind(TO to = to_max); - void set_reachable(bool r) { reachable_ = r; } + void set_reachable(bool r) { reachable_ = r; } - void cancel(); + void cancel(); int islossy() { return lossytest_ > 0; } - int call1(unsigned int proc, - marshall &req, unmarshall &rep, TO to); + int call1(unsigned int proc, + marshall &req, unmarshall &rep, TO to); - bool got_pdu(connection *c, char *b, int sz); + bool got_pdu(connection *c, char *b, int sz); - template - int call_m(unsigned int proc, marshall &req, R & r, TO to); + template + int call_m(unsigned int proc, marshall &req, R & r, TO to); - template - inline int call(unsigned int proc, R & r, const Args&... args); + template + inline int call(unsigned int proc, R & r, const Args&... args); - template - inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args); + template + inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args); }; template int @@ -144,382 +150,120 @@ rpcc::call(unsigned int proc, R & r, const Args&... args) template inline int rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args) { - marshall m{args...}; - return call_m(proc, m, r, to); + marshall m{args...}; + return call_m(proc, m, r, to); } bool operator<(const sockaddr_in &a, const sockaddr_in &b); -class handler { - public: - handler() { } - virtual ~handler() { } - virtual int fn(unmarshall &, marshall &) = 0; -}; - - // rpc server endpoint. class rpcs : public chanmgr { - typedef enum { - NEW, // new RPC, not a duplicate - INPROGRESS, // duplicate of an RPC we're still processing - DONE, // duplicate of an RPC we already replied to (have reply) - FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten - } rpcstate_t; + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; - private: + private: // state about an in-progress or completed RPC, for at-most-once. // if cb_present is true, then the RPC is complete and a reply // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. - struct reply_t { - reply_t (unsigned int _xid) { - xid = _xid; - cb_present = false; - buf = NULL; - sz = 0; - } - reply_t (unsigned int _xid, char *_buf, int _sz) { - xid = _xid; - cb_present = true; - buf = _buf; - sz = _sz; - } - unsigned int xid; - bool cb_present; // whether the reply buffer is valid - char *buf; // the reply buffer - int sz; // the size of reply buffer - }; - - int port_; - unsigned int nonce_; - - // provide at most once semantics by maintaining a window of replies - // per client that that client hasn't acknowledged receiving yet. + struct reply_t { + reply_t (unsigned int _xid) { + xid = _xid; + cb_present = false; + buf = NULL; + sz = 0; + } + reply_t (unsigned int _xid, char *_buf, int _sz) { + xid = _xid; + cb_present = true; + buf = _buf; + sz = _sz; + } + unsigned int xid; + bool cb_present; // whether the reply buffer is valid + char *buf; // the reply buffer + int sz; // the size of reply buffer + }; + + int port_; + unsigned int nonce_; + + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - std::map > reply_window_; - - void free_reply_window(void); - void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); - - rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - unsigned int xid, unsigned int rep_xid, - char **b, int *sz); - - void updatestat(unsigned int proc); - - // latest connection to the client - std::map conns_; - - // counting - const int counting_; - int curr_counts_; - std::map counts_; - - int lossytest_; - bool reachable_; - - // map proc # to function - std::map procs_; - - std::mutex procs_m_; // protect insert/delete to procs[] - std::mutex count_m_; //protect modification of counts - std::mutex reply_window_m_; // protect reply window et al - std::mutex conss_m_; // protect conns_ - - - protected: - - struct djob_t { - djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} - char *buf; - int sz; - connection *conn; - }; - void dispatch(djob_t *); - - // internal handler registration - void reg1(unsigned int proc, handler *); - - ThrPool* dispatchpool_; - tcpsconn* listener_; - - public: - rpcs(unsigned int port, int counts=0); - ~rpcs(); - inline int port() { return listener_->port(); } - //RPC handler for clients binding - int rpcbind(int a, int &r); - - void set_reachable(bool r) { reachable_ = r; } - - bool got_pdu(connection *c, char *b, int sz); - - // register a handler - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, - R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, - const A3, R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, - const A3, const A4, R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, - const A3, const A4, const A5, - R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, - const A3, const A4, const A5, - const A6, R & r)); - template - void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, - const A3, const A4, const A5, - const A6, const A7, - R & r)); -}; + std::map > reply_window_; -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - R r; - args >> a1; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + void free_reply_window(void); + void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - R r; - args >> a1; - args >> a2; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, + unsigned int xid, unsigned int rep_xid, + char **b, int *sz); -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - R r; - args >> a1; - args >> a2; - args >> a3; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, a3, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + void updatestat(unsigned int proc); -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, - R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, a3, a4, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + // latest connection to the client + std::map conns_; -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, - const A5 a5, R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, - const A5 a5, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, const A5 a5, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - A5 a5; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - args >> a5; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, a3, a4, a5, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + // counting + const int counting_; + int curr_counts_; + std::map counts_; -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, - const A5 a5, const A6 a6, - R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, - const A5 a5, const A6 a6, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, const A5 a5, const A6 a6, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - A5 a5; - A6 a6; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - args >> a5; - args >> a6; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + int lossytest_; + bool reachable_; -template void -rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, - const A5 a5, const A6 a6, - const A7 a7, R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, - const A5 a5, const A6 a6, const A7 a7, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, const A5 a5, const A6 a6, - const A7 a7, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - A5 a5; - A6 a6; - A7 a7; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - args >> a5; - args >> a6; - args >> a7; - if(!args.okdone()) - return rpc_const::unmarshal_args_failure; - int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} + // map proc # to function + std::map procs_; + + std::mutex procs_m_; // protect insert/delete to procs[] + std::mutex count_m_; //protect modification of counts + std::mutex reply_window_m_; // protect reply window et al + std::mutex conss_m_; // protect conns_ + + + protected: + + struct djob_t { + djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + char *buf; + int sz; + connection *conn; + }; + void dispatch(djob_t *); + // internal handler registration + void reg1(unsigned int proc, handler *); + + ThrPool* dispatchpool_; + tcpsconn* listener_; + + public: + rpcs(unsigned int port, int counts=0); + ~rpcs(); + inline int port() { return listener_->port(); } + //RPC handler for clients binding + int rpcbind(int &r, int a); + + void set_reachable(bool r) { reachable_ = r; } + + bool got_pdu(connection *c, char *b, int sz); + + template void reg(unsigned int proc, F f, C *c=nullptr); +}; + +template void rpcs::reg(unsigned int proc, F f, C *c) { + reg1(proc, marshalled_func::wrap(f, c)); +} void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); void make_sockaddr(const char *host, const char *port, - struct sockaddr_in *dst); + struct sockaddr_in *dst); #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index c43a9da..8aed748 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -24,10 +24,10 @@ int port; // from multiple classes. class srv { public: - int handle_22(const std::string a, const std::string b, std::string & r); - int handle_fast(const int a, int &r); - int handle_slow(const int a, int &r); - int handle_bigrep(const int a, std::string &r); + int handle_22(std::string & r, const std::string a, const std::string b); + int handle_fast(int &r, const int a); + int handle_slow(int &r, const int a); + int handle_bigrep(std::string &r, const int a); }; // a handler. a and b are arguments, r is the result. @@ -38,21 +38,21 @@ class srv { // at these argument types, so this function definition // does what a .x file does in SunRPC. int -srv::handle_22(const std::string a, std::string b, std::string &r) +srv::handle_22(std::string &r, const std::string a, std::string b) { r = a + b; return 0; } int -srv::handle_fast(const int a, int &r) +srv::handle_fast(int &r, const int a) { r = a + 1; return 0; } int -srv::handle_slow(const int a, int &r) +srv::handle_slow(int &r, const int a) { usleep(random() % 5000); r = a + 2; @@ -60,7 +60,7 @@ srv::handle_slow(const int a, int &r) } int -srv::handle_bigrep(const int len, std::string &r) +srv::handle_bigrep(std::string &r, const int len) { r = std::string(len, 'x'); return 0; @@ -71,10 +71,10 @@ srv service; void startserver() { server = new rpcs(port); - server->reg(22, &service, &srv::handle_22); - server->reg(23, &service, &srv::handle_fast); - server->reg(24, &service, &srv::handle_slow); - server->reg(25, &service, &srv::handle_bigrep); + server->reg(22, &srv::handle_22, &service); + server->reg(23, &srv::handle_fast, &service); + server->reg(24, &srv::handle_slow, &service); + server->reg(25, &srv::handle_bigrep, &service); } void diff --git a/rsm.cc b/rsm.cc index df2b2fc..66c8d97 100644 --- a/rsm.cc +++ b/rsm.cc @@ -108,17 +108,17 @@ rsm::rsm(std::string _first, std::string _me) : commit_change(1); } rsmrpc = cfg->get_rpcs(); - rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke); - rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members); - rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke); - rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq); - rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq); - rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq); + rsmrpc->reg(rsm_client_protocol::invoke, &rsm::client_invoke, this); + rsmrpc->reg(rsm_client_protocol::members, &rsm::client_members, this); + rsmrpc->reg(rsm_protocol::invoke, &rsm::invoke, this); + rsmrpc->reg(rsm_protocol::transferreq, &rsm::transferreq, this); + rsmrpc->reg(rsm_protocol::transferdonereq, &rsm::transferdonereq, this); + rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself testsvr = new rpcs(atoi(_me.c_str()) + 1); - testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq); - testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq); + testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); + testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); { lock ml(rsm_mutex); @@ -336,7 +336,7 @@ void rsm::execute(int procno, std::string req, std::string &r) { unmarshall args(req); marshall rep; std::string reps; - rsm_protocol::status ret = h->fn(args, rep); + rsm_protocol::status ret = (*h)(args, rep); marshall rep1; rep1 << ret; rep1 << rep.str(); @@ -349,7 +349,7 @@ void rsm::execute(int procno, std::string req, std::string &r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) { +rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) { LOG("rsm::client_invoke: procno 0x" << std::hex << procno); lock ml(invoke_mutex); std::vector m; @@ -403,7 +403,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) { +rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) { LOG("rsm::invoke: procno 0x" << std::hex << proc); lock ml(invoke_mutex); std::vector m; @@ -438,8 +438,8 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d /** * RPC handler: Send back the local node's state to the caller */ -rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid, - rsm_protocol::transferres &r) { +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src, + viewstamp last, unsigned vid) { lock ml(rsm_mutex); int ret = rsm_protocol::OK; tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(), @@ -457,7 +457,7 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned * RPC handler: Inform the local node (the primary) that node m has synchronized * for view vid */ -rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { +rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; @@ -470,7 +470,7 @@ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) { // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) { +rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) { int ret = rsm_protocol::OK; lock ml(rsm_mutex); @@ -508,7 +508,7 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j * so the client can switch to a different primary * when it existing primary fails */ -rsm_client_protocol::status rsm::client_members(int i, std::vector &r) { +rsm_client_protocol::status rsm::client_members(std::vector &r, int i) { std::vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); @@ -568,7 +568,7 @@ void rsm::net_repair_wo(bool heal) { rsmrpc->set_reachable(heal); } -rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) { +rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) { lock ml(rsm_mutex); tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n", heal, dopartition, partitioned); @@ -607,7 +607,7 @@ void rsm::partition1() { } } -rsm_test_protocol::status rsm::breakpointreq(int b, int &r) { +rsm_test_protocol::status rsm::breakpointreq(int &r, int b) { r = rsm_test_protocol::OK; lock ml(rsm_mutex); tprintf("rsm::breakpointreq: %d\n", b); diff --git a/rsm.h b/rsm.h index ec7632d..bf37a5d 100644 --- a/rsm.h +++ b/rsm.h @@ -11,7 +11,6 @@ #include #include "config.h" - class rsm : public config_view_change { private: void reg1(int proc, handler *); @@ -39,17 +38,15 @@ class rsm : public config_view_change { bool break2; - rsm_client_protocol::status client_members(int i, - std::vector &r); - rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq, - int &dummy); - rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid, - rsm_protocol::transferres &r); - rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &); - rsm_protocol::status joinreq(std::string src, viewstamp last, - rsm_protocol::joinres &r); - rsm_test_protocol::status test_net_repairreq(int heal, int &r); - rsm_test_protocol::status breakpointreq(int b, int &r); + rsm_client_protocol::status client_members(std::vector &r, int i); + rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq); + rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src, + viewstamp last, unsigned vid); + rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid); + rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src, + viewstamp last); + rsm_test_protocol::status test_net_repairreq(int &r, int heal); + rsm_test_protocol::status breakpointreq(int &r, int b); std::mutex rsm_mutex; std::mutex invoke_mutex; @@ -57,8 +54,8 @@ class rsm : public config_view_change { std::condition_variable sync_cond; void execute(int procno, std::string req, std::string &r); - rsm_client_protocol::status client_invoke(int procno, std::string req, - std::string &r); + rsm_client_protocol::status client_invoke(std::string &r, int procno, + std::string req); bool statetransfer(std::string m); bool statetransferdone(std::string m); bool join(std::string m); @@ -80,157 +77,11 @@ class rsm : public config_view_change { void recovery(); void commit_change(unsigned vid); - template - void reg(int proc, S*, int (S::*meth)(const A1 a1, R &)); - template - void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &)); - template - void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, R &)); - template - void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, R &)); - template - void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, const A5 a5, R &)); + template void reg(int proc, F f, C *c=nullptr); }; -template -void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) { - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - R r; - args >> a1; - VERIFY(args.okdone()); - int b = (sob->*meth)(a1,r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} - -template -void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) { - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - R r; - args >> a1; - args >> a2; - VERIFY(args.okdone()); - int b = (sob->*meth)(a1,a2,r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} - -template -void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, R & r)) { - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - R r; - args >> a1; - args >> a2; - args >> a3; - VERIFY(args.okdone()); - int b = (sob->*meth)(a1,a2,a3,r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} - -template -void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, R & r)) { - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - VERIFY(args.okdone()); - int b = (sob->*meth)(a1,a2,a3,a4,r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); -} - - -template void - rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, - const A3 a3, const A4 a4, - const A5 a5, R & r)) -{ - class h1 : public handler { - private: - S * sob; - int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, - const A5 a5, R & r); - public: - h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, - const A4 a4, const A5 a5, R & r)) - : sob(xsob), meth(xmeth) { } - int fn(unmarshall &args, marshall &ret) { - A1 a1; - A2 a2; - A3 a3; - A4 a4; - A5 a5; - R r; - args >> a1; - args >> a2; - args >> a3; - args >> a4; - VERIFY(args.okdone()); - int b = (sob->*meth)(a1,a2,a3,a4,a5,r); - ret << r; - return b; - } - }; - reg1(proc, new h1(sob, meth)); +template void rsm::reg(int proc, F f, C *c) { + reg1(proc, marshalled_func::wrap(f, c)); } #endif /* rsm_h */