TEMPLATE MAGIC FOR GREAT JUSTICE
authorPeter Iannucci <iannucci@mit.edu>
Thu, 19 Sep 2013 21:16:20 +0000 (17:16 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 01:46:12 +0000 (21:46 -0400)
18 files changed:
Makefile
config.cc
config.h
lock_client_cache_rsm.cc
lock_client_cache_rsm.h
lock_server.cc
lock_server.h [deleted file]
lock_server_cache_rsm.cc
lock_server_cache_rsm.h
lock_smain.cc [deleted file]
paxos.cc
paxos.h
rpc/marshall.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm.h

index b160065..a528737 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -19,7 +19,7 @@ lock_demo : $(lock_demo) rpc/librpc.a
 lock_tester=lock_tester.o lock_client.o tprintf.o rsm_client.o handle.o lock_client_cache_rsm.o
 lock_tester : $(lock_tester) rpc/librpc.a
 
-lock_server=lock_server.o lock_smain.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
+lock_server=lock_server.o tprintf.o handle.o rsm.o paxos.o config.o log.o lock_server_cache_rsm.o
 lock_server : $(lock_server) rpc/librpc.a
 
 rsm_tester=rsm_tester.o rsmtest_client.o
@@ -31,7 +31,7 @@ rsm_tester: $(rsm_tester) rpc/librpc.a
 -include *.d
 -include rpc/*.d
 
-clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o lock_server lock_tester lock_demo rsm_tester
+clean_files=rpc/rpctest rpc/*.o *.d rpc/*.d rpc/librpc.a *.o config *.log lock_server lock_tester lock_demo rsm_tester
 .PHONY: clean $(EXTRA_TARGETS)
 clean: 
        rm -rf $(clean_files)
index 96e6cd7..04c869e 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -50,7 +50,7 @@ config::config(
     paxos_proposer = new proposer(this, paxos_acceptor, me);
 
     // XXX hack; maybe should have its own port number
-    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
 
     {
         lock ml(cfg_mutex);
@@ -265,7 +265,7 @@ config::heartbeater()
 }
 
 paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
+config::heartbeat(int &r, std::string m, unsigned vid)
 {
     lock ml(cfg_mutex);
     int ret = paxos_protocol::ERR;
index cd276fb..f3fe216 100644 (file)
--- a/config.h
+++ b/config.h
@@ -23,10 +23,7 @@ class config : public paxos_change {
         std::vector<std::string> mems;
         mutex cfg_mutex;
         std::condition_variable config_cond;
-        paxos_protocol::status heartbeat(
-                std::string m,
-                unsigned instance,
-                int &r);
+        paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance);
         std::string value(const std::vector<std::string> &mems) const;
         void members(const std::string &v, std::vector<std::string> &m) const;
         void get_view_wo(unsigned instance, std::vector<std::string> &m);
index c0be985..2a6d6e3 100644 (file)
@@ -59,8 +59,8 @@ lock_client_cache_rsm::lock_client_cache_rsm(string xdst, class lock_release_use
     id = host.str();
     last_port = rlock_port;
     rpcs *rlsrpc = new rpcs(rlock_port);
-    rlsrpc->reg(rlock_protocol::revoke, this, &lock_client_cache_rsm::revoke_handler);
-    rlsrpc->reg(rlock_protocol::retry, this, &lock_client_cache_rsm::retry_handler);
+    rlsrpc->reg(rlock_protocol::revoke, &lock_client_cache_rsm::revoke_handler, this);
+    rlsrpc->reg(rlock_protocol::retry, &lock_client_cache_rsm::retry_handler, this);
     {
         lock sl(xid_mutex);
         xid = 0;
@@ -178,7 +178,7 @@ lock_protocol::status lock_client_cache_rsm::release(lock_protocol::lockid_t lid
     return lock_protocol::OK;
 }
 
-rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+rlock_protocol::status lock_client_cache_rsm::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
     LOG("Revoke handler " << lid << " " << xid);
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
@@ -201,7 +201,7 @@ rlock_protocol::status lock_client_cache_rsm::revoke_handler(lock_protocol::lock
     return rlock_protocol::OK;
 }
 
-rlock_protocol::status lock_client_cache_rsm::retry_handler(lock_protocol::lockid_t lid, lock_protocol::xid_t xid, int &) {
+rlock_protocol::status lock_client_cache_rsm::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
     VERIFY(st.state == lock_state::acquiring);
index 049d18a..815224c 100644 (file)
@@ -73,8 +73,8 @@ class lock_client_cache_rsm : public lock_client {
         lock_protocol::status acquire(lock_protocol::lockid_t);
         virtual lock_protocol::status release(lock_protocol::lockid_t);
         void releaser();
-        rlock_protocol::status revoke_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
-        rlock_protocol::status retry_handler(lock_protocol::lockid_t, lock_protocol::xid_t, int &);
+        rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
+        rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
 };
 
 
index be0c1c9..0f82080 100644 (file)
@@ -1,46 +1,37 @@
-// the lock server implementation
-
-#include "lock_server.h"
-#include <sstream>
+#include "rpc/rpc.h"
+#include <arpa/inet.h>
+#include <stdlib.h>
 #include <stdio.h>
 #include <unistd.h>
-#include <arpa/inet.h>
-#include "lock.h"
+#include "lock_server_cache_rsm.h"
+#include "paxos.h"
+#include "rsm.h"
 
-lock_server::lock_server():
-    nacquire (0)
-{
-}
+// Main loop of lock_server
 
-// caller must hold lock_lock
-mutex &
-lock_server::get_lock(lock_protocol::lockid_t lid) {
-    lock ml(lock_lock);
-    // by the semantics of std::map, this will create
-    // the mutex if it doesn't already exist
-    mutex &l = locks[lid];
-    return l;
-}
+char tprintf_thread_prefix = 's';
 
-lock_protocol::status
-lock_server::stat(int clt, lock_protocol::lockid_t lid, int &r)
+int
+main(int argc, char *argv[])
 {
-    lock_protocol::status ret = lock_protocol::OK;
-    printf("stat request from clt %d\n", clt);
-    r = nacquire;
-    return ret;
-}
+    setvbuf(stdout, NULL, _IONBF, 0);
+    setvbuf(stderr, NULL, _IONBF, 0);
 
-lock_protocol::status
-lock_server::acquire(int clt, lock_protocol::lockid_t lid, int &r)
-{
-    get_lock(lid).lock();
-    return lock_protocol::OK;
-}
+    srandom(getpid());
 
-lock_protocol::status
-lock_server::release(int clt, lock_protocol::lockid_t lid, int &r)
-{
-    get_lock(lid).unlock();
-    return lock_protocol::OK;
+    if(argc != 3){
+        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
+        exit(1);
+    }
+
+    rsm rsm(argv[1], argv[2]);
+    lock_server_cache_rsm ls(&rsm);
+    rsm.set_state_transfer(&ls);
+
+    rsm.reg(lock_protocol::acquire, &lock_server_cache_rsm::acquire, &ls);
+    rsm.reg(lock_protocol::release, &lock_server_cache_rsm::release, &ls);
+    rsm.reg(lock_protocol::stat, &lock_server_cache_rsm::stat, &ls);
+
+    while(1)
+        sleep(1000);
 }
diff --git a/lock_server.h b/lock_server.h
deleted file mode 100644 (file)
index f03a717..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-// this is the lock server
-// the lock client has a similar interface
-
-#ifndef lock_server_h
-#define lock_server_h
-
-#include <string>
-#include "lock_protocol.h"
-#include "lock_client.h"
-#include "rpc/rpc.h"
-#include <list>
-#include <map>
-
-using std::map;
-
-typedef map<lock_protocol::lockid_t, mutex> lock_map;
-
-class lock_server {
-
- protected:
-  int nacquire;
-  mutex lock_lock;
-  lock_map locks;
-  mutex &get_lock(lock_protocol::lockid_t lid);
-
- public:
-  lock_server();
-  ~lock_server() {};
-  lock_protocol::status stat(int clt, lock_protocol::lockid_t lid, int &);
-  lock_protocol::status acquire(int clt, lock_protocol::lockid_t lid, int &);
-  lock_protocol::status release(int clt, lock_protocol::lockid_t lid, int &);
-};
-
-#endif
index 00d3f54..8f3cf2b 100644 (file)
@@ -119,7 +119,7 @@ void lock_server_cache_rsm::retryer() {
     }
 }
 
-int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid, int &) {
+int lock_server_cache_rsm::acquire(int &r, lock_protocol::lockid_t lid, string id, lock_protocol::xid_t xid) {
     LOG_FUNC_ENTER_SERVER;
     holder h = holder(id, xid);
     lock_state &st = get_lock_state(lid);
@@ -177,7 +177,7 @@ int lock_server_cache_rsm::acquire(lock_protocol::lockid_t lid, string id, lock_
     return lock_protocol::RETRY;
 }
 
-int lock_server_cache_rsm::release(lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid, int &r) {
+int lock_server_cache_rsm::release(int &r, lock_protocol::lockid_t lid, callback id, lock_protocol::xid_t xid) {
     LOG_FUNC_ENTER_SERVER;
     lock_state &st = get_lock_state(lid);
     lock sl(st.m);
@@ -205,7 +205,7 @@ void lock_server_cache_rsm::unmarshal_state(string state) {
     rep >> lock_table;
 }
 
-lock_protocol::status lock_server_cache_rsm::stat(lock_protocol::lockid_t lid, int &r) {
+lock_protocol::status lock_server_cache_rsm::stat(int &r, lock_protocol::lockid_t lid) {
     printf("stat request\n");
     r = nacquire;
     return lock_protocol::OK;
index a765e52..c33b51e 100644 (file)
@@ -45,13 +45,13 @@ class lock_server_cache_rsm : public rsm_state_transfer {
         class rsm *rsm;
     public:
         lock_server_cache_rsm(class rsm *rsm = 0);
-        lock_protocol::status stat(lock_protocol::lockid_t, int &);
+        lock_protocol::status stat(int &, lock_protocol::lockid_t);
         void revoker();
         void retryer();
         string marshal_state();
         void unmarshal_state(string state);
-        int acquire(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
-        int release(lock_protocol::lockid_t, string id, lock_protocol::xid_t, int &);
+        int acquire(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
+        int release(int &, lock_protocol::lockid_t, string id, lock_protocol::xid_t);
 };
 
 #endif
diff --git a/lock_smain.cc b/lock_smain.cc
deleted file mode 100644 (file)
index 69cc433..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-#include "rpc/rpc.h"
-#include <arpa/inet.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <unistd.h>
-#include "lock_server_cache_rsm.h"
-#include "paxos.h"
-#include "rsm.h"
-
-// Main loop of lock_server
-
-char tprintf_thread_prefix = 's';
-
-int
-main(int argc, char *argv[])
-{
-    setvbuf(stdout, NULL, _IONBF, 0);
-    setvbuf(stderr, NULL, _IONBF, 0);
-
-    srandom(getpid());
-
-    if(argc != 3){
-        fprintf(stderr, "Usage: %s [master:]port [me:]port\n", argv[0]);
-        exit(1);
-    }
-
-    rsm rsm(argv[1], argv[2]);
-    lock_server_cache_rsm ls(&rsm);
-    rsm.set_state_transfer((rsm_state_transfer *)&ls);
-    rsm.reg(lock_protocol::acquire, &ls, &lock_server_cache_rsm::acquire);
-    rsm.reg(lock_protocol::release, &ls, &lock_server_cache_rsm::release);
-    rsm.reg(lock_protocol::stat, &ls, &lock_server_cache_rsm::stat);
-
-    while(1)
-        sleep(1000);
-}
index 4434788..b0ec640 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -59,7 +59,7 @@ proposer::isrunning()
 
 // check if the servers in l2 contains a majority of servers in l1
 bool
-proposer::majority(const std::vector<std::string> &l1, 
+proposer::majority(const std::vector<std::string> &l1,
                const std::vector<std::string> &l2)
 {
   unsigned n = 0;
@@ -71,9 +71,9 @@ proposer::majority(const std::vector<std::string> &l1,
   return n >= (l1.size() >> 1) + 1;
 }
 
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, 
+proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
                   std::string _me)
-  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), 
+  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
     stable (true)
 {
   my_n.n = 0;
@@ -145,7 +145,7 @@ proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv
 // otherwise fill in accepts with set of nodes that accepted,
 // set v to the v_a with the highest n_a, and return true.
 bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts, 
+proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
          std::vector<std::string> nodes,
          std::string &v)
 {
@@ -199,7 +199,7 @@ proposer::accept(unsigned instance, std::vector<std::string> &accepts,
 }
 
 void
-proposer::decide(unsigned instance, std::vector<std::string> accepts, 
+proposer::decide(unsigned instance, std::vector<std::string> accepts,
              std::string v)
 {
     struct paxos_protocol::decidearg arg = { instance, v };
@@ -213,7 +213,7 @@ proposer::decide(unsigned instance, std::vector<std::string> accepts,
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, 
+acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
             std::string _value)
   : cfg(_cfg), me (_me), instance_h(0)
 {
@@ -232,14 +232,13 @@ acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
   }
 
   pxs = new rpcs(atoi(_me.c_str()));
-  pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
-  pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
-  pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+  pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
+  pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
+  pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
 }
 
 paxos_protocol::status
-acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
-    paxos_protocol::prepareres &r)
+acceptor::preparereq(paxos_protocol::prepareres &r, std::string src, paxos_protocol::preparearg a)
 {
     lock ml(pxs_mutex);
     r.oldinstance = false;
@@ -260,7 +259,7 @@ acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
 }
 
 paxos_protocol::status
-acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
+acceptor::acceptreq(bool &r, std::string src, paxos_protocol::acceptarg a)
 {
     lock ml(pxs_mutex);
     r = false;
@@ -275,10 +274,10 @@ acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
 
 // the src argument is only for debug purpose
     paxos_protocol::status
-acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
+acceptor::decidereq(int &r, std::string src, paxos_protocol::decidearg a)
 {
     lock ml(pxs_mutex);
-    tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
+    tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
             a.instance, instance_h, v_a.c_str());
     if (a.instance == instance_h + 1) {
         VERIFY(v_a == a.v);
diff --git a/paxos.h b/paxos.h
index c7b1af4..170292a 100644 (file)
--- a/paxos.h
+++ b/paxos.h
@@ -30,18 +30,17 @@ class acceptor {
   std::map<unsigned,std::string> values;       // vals of each instance
 
   void commit_wo(unsigned instance, std::string v);
-  paxos_protocol::status preparereq(std::string src, 
-          paxos_protocol::preparearg a,
-          paxos_protocol::prepareres &r);
-  paxos_protocol::status acceptreq(std::string src, 
-          paxos_protocol::acceptarg a, bool &r);
-  paxos_protocol::status decidereq(std::string src, 
-          paxos_protocol::decidearg a, int &r);
+  paxos_protocol::status preparereq(paxos_protocol::prepareres &r,
+          std::string src, paxos_protocol::preparearg a);
+  paxos_protocol::status acceptreq(bool &r, std::string src,
+          paxos_protocol::acceptarg a);
+  paxos_protocol::status decidereq(int &r, std::string src,
+          paxos_protocol::decidearg a);
 
   friend class log;
 
  public:
-  acceptor(class paxos_change *cfg, bool _first, std::string _me, 
+  acceptor(class paxos_change *cfg, bool _first, std::string _me,
        std::string _value);
   ~acceptor() {};
   void commit(unsigned instance, std::string v);
@@ -73,10 +72,10 @@ class proposer {
   prop_t my_n;         // number of the last proposal used in this instance
 
   void setn();
-  bool prepare(unsigned instance, std::vector<std::string> &accepts, 
+  bool prepare(unsigned instance, std::vector<std::string> &accepts,
          std::vector<std::string> nodes,
          std::string &v);
-  void accept(unsigned instance, std::vector<std::string> &accepts, 
+  void accept(unsigned instance, std::vector<std::string> &accepts,
         std::vector<std::string> nodes, std::string v);
   void decide(unsigned instance, std::vector<std::string> accepts,
         std::string v);
index 27cebbb..448240b 100644 (file)
@@ -219,13 +219,11 @@ class unmarshall {
                        _ind = RPC_HEADER_SZ;
                }
 
-        template <class OutputIterator>
-        void iterate(OutputIterator i, int n) {
-            while (n--) {
-                typename OutputIterator::value_type t;
-                *this >> t;
-                *i++ = t;
-            }
+        template <class A>
+        inline A grab() {
+            A a;
+            *this >> a;
+            return a;
         }
 };
 
@@ -285,4 +283,89 @@ operator>>(unmarshall &u, std::pair<A,B> &d) {
     return u >> d.first >> d.second;
 }
 
+template <size_t...> struct tuple_indices {};
+template <size_t S, class IntTuple, size_t E> struct make_indices_imp;
+template <size_t S, size_t ...Indices, size_t E> struct make_indices_imp<S, tuple_indices<Indices...>, E> {
+    typedef typename make_indices_imp<S+1, tuple_indices<Indices..., S>, E>::type type;
+};
+template <size_t E, size_t ...Indices> struct make_indices_imp<E, tuple_indices<Indices...>, E> {
+    typedef tuple_indices<Indices...> type;
+};
+template <size_t E, size_t S = 0> struct make_tuple_indices {
+    typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
+};
+
+struct VerifyOnFailure {
+    static inline int unmarshall_args_failure() {
+        VERIFY(0);
+        return 0;
+    }
+};
+
+typedef std::function<int(unmarshall &, marshall &)> handler;
+
+using std::move;
+using std::get;
+using std::tuple;
+using std::decay;
+
+#include <iostream>
+
+template <class F, class R, class args_type, size_t ...Indices>
+typename std::enable_if<!std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
+    return f(r, move(get<Indices>(t))...);
+}
+
+template <class F, class C, class R, class args_type, size_t ...Indices>
+typename std::enable_if<std::is_member_function_pointer<F>::value, int>::type
+invoke(F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
+    return (c->*f)(r, move(get<Indices>(t))...);
+}
+
+template <class Functor,
+          class Instance,
+          class Signature,
+          class ErrorHandler=VerifyOnFailure> struct marshalled_func_imp;
+
+template <class F, class C, class ErrorHandler, class R, class... Args>
+struct marshalled_func_imp<F, C, int(R&, Args...), ErrorHandler> {
+    using result_type = R;
+    using args_type = tuple<typename decay<Args>::type...>;
+    using index_type = typename make_tuple_indices<sizeof...(Args)>::type;
+
+    static inline int call(F f, C *c, unmarshall &u, marshall &m) {
+        args_type t{std::move(std::tuple<Args...>{u.grab<Args>()...})};
+        if (!u.okdone())
+            return ErrorHandler::unmarshall_args_failure();
+        R r;
+        int b = invoke(f, c, r, t, index_type());
+        m << r;
+        return b;
+    }
+
+    static inline handler *wrap(F f, C *c=nullptr) {
+        typename decay<F>::type f_ = f;
+        return new handler([f_, c](unmarshall &u, marshall &m) -> int {
+            return call(f_, c, u, m);
+        });
+    }
+};
+
+template <class Functor,
+          class Signature,
+          class ErrorHandler=VerifyOnFailure> struct marshalled_func;
+
+template <class F, class ErrorHandler, class... Args>
+struct marshalled_func<F, int(*)(Args...), ErrorHandler> :
+    public marshalled_func_imp<F, void, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class C, class... Args>
+struct marshalled_func<F, int(C::*)(Args...), ErrorHandler> :
+    public marshalled_func_imp<F, C, int(Args...), ErrorHandler> {};
+
+template <class F, class ErrorHandler, class Signature>
+struct marshalled_func<F, std::function<Signature>, ErrorHandler> :
+    public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
+
 #endif
index a5d5b1f..a3ffef1 100644 (file)
@@ -387,7 +387,6 @@ compress:
     }
 }
 
-
 rpcs::rpcs(unsigned int p1, int count)
   : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true)
 {
@@ -400,7 +399,7 @@ rpcs::rpcs(unsigned int p1, int count)
         lossytest_ = atoi(loss_env);
     }
 
-    reg(rpc_const::bind, this, &rpcs::rpcbind);
+    reg(rpc_const::bind, &rpcs::rpcbind, this);
     dispatchpool_ = new ThrPool(6,false);
 
     listener_ = new tcpsconn(this, port_, lossytest_);
@@ -564,14 +563,14 @@ rpcs::dispatch(djob_t *j)
                 updatestat(proc);
             }
 
-            rh.ret = f->fn(req, rep);
-                        if (rh.ret == rpc_const::unmarshal_args_failure) {
-                                fprintf(stderr, "rpcs::dispatch: failed to"
-                                       " unmarshall the arguments. You are"
-                                       " probably calling RPC 0x%x with wrong"
-                                       " types of arguments.\n", proc);
-                                VERIFY(0);
-                        }
+            rh.ret = (*f)(req, rep);
+            if (rh.ret == rpc_const::unmarshal_args_failure) {
+                fprintf(stderr, "rpcs::dispatch: failed to"
+                        " unmarshall the arguments. You are"
+                        " probably calling RPC 0x%x with wrong"
+                        " types of arguments.\n", proc);
+                VERIFY(0);
+            }
             VERIFY(rh.ret >= 0);
 
             rep.pack_reply_header(rh);
@@ -725,7 +724,7 @@ rpcs::free_reply_window(void)
 
 // rpc handler
 int
-rpcs::rpcbind(int a, int &r)
+rpcs::rpcbind(int &r, int a)
 {
     jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_);
     r = nonce_;
index f7245ad..245f277 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
 #endif
 
 class rpc_const {
-       public:
-               static const unsigned int bind = 1;   // handler number reserved for bind
-               static const int timeout_failure = -1;
-               static const int unmarshal_args_failure = -2;
-               static const int unmarshal_reply_failure = -3;
-               static const int atmostonce_failure = -4;
-               static const int oldsrv_failure = -5;
-               static const int bind_failure = -6;
-               static const int cancel_failure = -7;
+    public:
+        static const unsigned int bind = 1;   // handler number reserved for bind
+        static const int timeout_failure = -1;
+        static const int unmarshal_args_failure = -2;
+        static const int unmarshal_reply_failure = -3;
+        static const int atmostonce_failure = -4;
+        static const int oldsrv_failure = -5;
+        static const int bind_failure = -6;
+        static const int cancel_failure = -7;
+};
+
+struct ReturnOnFailure {
+    static inline int unmarshall_args_failure() {
+        return rpc_const::unmarshal_args_failure;
+    }
 };
 
 // rpc client endpoint.
@@ -32,44 +38,44 @@ class rpc_const {
 // threaded: multiple threads can be sending RPCs,
 class rpcc : public chanmgr {
 
-       private:
+    private:
 
-               //manages per rpc info
-               struct caller {
-                       caller(unsigned int xxid, unmarshall *un);
-                       ~caller();
+        //manages per rpc info
+        struct caller {
+            caller(unsigned int xxid, unmarshall *un);
+            ~caller();
 
-                       unsigned int xid;
-                       unmarshall *un;
-                       int intret;
-                       bool done;
+            unsigned int xid;
+            unmarshall *un;
+            int intret;
+            bool done;
             std::mutex m;
             std::condition_variable c;
-               };
+        };
 
-               void get_refconn(connection **ch);
-               void update_xid_rep(unsigned int xid);
+        void get_refconn(connection **ch);
+        void update_xid_rep(unsigned int xid);
 
 
-               sockaddr_in dst_;
-               unsigned int clt_nonce_;
-               unsigned int srv_nonce_;
-               bool bind_done_;
-               unsigned int xid_;
-               int lossytest_;
-               bool retrans_;
-               bool reachable_;
+        sockaddr_in dst_;
+        unsigned int clt_nonce_;
+        unsigned int srv_nonce_;
+        bool bind_done_;
+        unsigned int xid_;
+        int lossytest_;
+        bool retrans_;
+        bool reachable_;
 
-               connection *chan_;
+        connection *chan_;
 
         std::mutex m_; // protect insert/delete to calls[]
-               std::mutex chan_m_;
+        std::mutex chan_m_;
 
-               bool destroy_wait_;
+        bool destroy_wait_;
         std::condition_variable destroy_wait_c_;
 
-               std::map<int, caller *> calls_;
-               std::list<unsigned int> xid_rep_window_;
+        std::map<int, caller *> calls_;
+        std::list<unsigned int> xid_rep_window_;
                 
                 struct request {
                     request() { clear(); }
@@ -80,42 +86,42 @@ class rpcc : public chanmgr {
                 };
                 struct request dup_req_;
                 int xid_rep_done_;
-       public:
+    public:
 
-               rpcc(sockaddr_in d, bool retrans=true);
-               ~rpcc();
+        rpcc(sockaddr_in d, bool retrans=true);
+        ~rpcc();
 
-               struct TO {
-                       int to;
-               };
-               static const TO to_max;
-               static const TO to_min;
-               static TO to(int x) { TO t; t.to = x; return t;}
+        struct TO {
+            int to;
+        };
+        static const TO to_max;
+        static const TO to_min;
+        static TO to(int x) { TO t; t.to = x; return t;}
 
-               unsigned int id() { return clt_nonce_; }
+        unsigned int id() { return clt_nonce_; }
 
-               int bind(TO to = to_max);
+        int bind(TO to = to_max);
 
-               void set_reachable(bool r) { reachable_ = r; }
+        void set_reachable(bool r) { reachable_ = r; }
 
-               void cancel();
+        void cancel();
                 
                 int islossy() { return lossytest_ > 0; }
 
-               int call1(unsigned int proc, 
-                               marshall &req, unmarshall &rep, TO to);
+        int call1(unsigned int proc, 
+                marshall &req, unmarshall &rep, TO to);
 
-               bool got_pdu(connection *c, char *b, int sz);
+        bool got_pdu(connection *c, char *b, int sz);
 
 
-               template<class R>
-                       int call_m(unsigned int proc, marshall &req, R & r, TO to);
+        template<class R>
+            int call_m(unsigned int proc, marshall &req, R & r, TO to);
 
-               template<class R, typename ...Args>
-                       inline int call(unsigned int proc, R & r, const Args&... args);
+        template<class R, typename ...Args>
+            inline int call(unsigned int proc, R & r, const Args&... args);
 
-               template<class R, typename ...Args>
-                       inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
+        template<class R, typename ...Args>
+            inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
 };
 
 template<class R> int 
@@ -144,382 +150,120 @@ rpcc::call(unsigned int proc, R & r, const Args&... args)
 template<class R, typename... Args> inline int
 rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
 {
-       marshall m{args...};
-       return call_m(proc, m, r, to);
+    marshall m{args...};
+    return call_m(proc, m, r, to);
 }
 
 bool operator<(const sockaddr_in &a, const sockaddr_in &b);
 
-class handler {
-       public:
-               handler() { }
-               virtual ~handler() { }
-               virtual int fn(unmarshall &, marshall &) = 0;
-};
-
-
 // rpc server endpoint.
 class rpcs : public chanmgr {
 
-       typedef enum {
-               NEW,  // new RPC, not a duplicate
-               INPROGRESS, // duplicate of an RPC we're still processing
-               DONE, // duplicate of an RPC we already replied to (have reply)
-               FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
-       } rpcstate_t;
+    typedef enum {
+        NEW,  // new RPC, not a duplicate
+        INPROGRESS, // duplicate of an RPC we're still processing
+        DONE, // duplicate of an RPC we already replied to (have reply)
+        FORGOTTEN,  // duplicate of an old RPC whose reply we've forgotten
+    } rpcstate_t;
 
-       private:
+    private:
 
         // state about an in-progress or completed RPC, for at-most-once.
         // if cb_present is true, then the RPC is complete and a reply
         // has been sent; in that case buf points to a copy of the reply,
         // and sz holds the size of the reply.
-       struct reply_t {
-               reply_t (unsigned int _xid) {
-                       xid = _xid;
-                       cb_present = false;
-                       buf = NULL;
-                       sz = 0;
-               }
-               reply_t (unsigned int _xid, char *_buf, int _sz) {
-                       xid = _xid;
-                       cb_present = true;
-                       buf = _buf;
-                       sz = _sz;
-               }
-               unsigned int xid;
-               bool cb_present; // whether the reply buffer is valid
-               char *buf;      // the reply buffer
-               int sz;         // the size of reply buffer
-       };
-
-       int port_;
-       unsigned int nonce_;
-
-       // provide at most once semantics by maintaining a window of replies
-       // per client that that client hasn't acknowledged receiving yet.
+    struct reply_t {
+        reply_t (unsigned int _xid) {
+            xid = _xid;
+            cb_present = false;
+            buf = NULL;
+            sz = 0;
+        }
+        reply_t (unsigned int _xid, char *_buf, int _sz) {
+            xid = _xid;
+            cb_present = true;
+            buf = _buf;
+            sz = _sz;
+        }
+        unsigned int xid;
+        bool cb_present; // whether the reply buffer is valid
+        char *buf;      // the reply buffer
+        int sz;         // the size of reply buffer
+    };
+
+    int port_;
+    unsigned int nonce_;
+
+    // provide at most once semantics by maintaining a window of replies
+    // per client that that client hasn't acknowledged receiving yet.
         // indexed by client nonce.
-       std::map<unsigned int, std::list<reply_t> > reply_window_;
-
-       void free_reply_window(void);
-       void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
-
-       rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
-                       unsigned int xid, unsigned int rep_xid,
-                       char **b, int *sz);
-
-       void updatestat(unsigned int proc);
-
-       // latest connection to the client
-       std::map<unsigned int, connection *> conns_;
-
-       // counting
-       const int counting_;
-       int curr_counts_;
-       std::map<int, int> counts_;
-
-       int lossytest_; 
-       bool reachable_;
-
-       // map proc # to function
-       std::map<int, handler *> procs_;
-
-       std::mutex procs_m_; // protect insert/delete to procs[]
-       std::mutex count_m_;  //protect modification of counts
-       std::mutex reply_window_m_; // protect reply window et al
-       std::mutex conss_m_; // protect conns_
-
-
-       protected:
-
-       struct djob_t {
-               djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
-               char *buf;
-               int sz;
-               connection *conn;
-       };
-       void dispatch(djob_t *);
-
-       // internal handler registration
-       void reg1(unsigned int proc, handler *);
-
-       ThrPool* dispatchpool_;
-       tcpsconn* listener_;
-
-       public:
-       rpcs(unsigned int port, int counts=0);
-       ~rpcs();
-        inline int port() { return listener_->port(); }
-       //RPC handler for clients binding
-       int rpcbind(int a, int &r);
-
-       void set_reachable(bool r) { reachable_ = r; }
-
-       bool got_pdu(connection *c, char *b, int sz);
-
-       // register a handler
-       template<class S, class A1, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r));
-       template<class S, class A1, class A2, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, 
-                                       R & r));
-       template<class S, class A1, class A2, class A3, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, const A4, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class R>
-               void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                       const A3, const A4, const A5, 
-                                       R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
-               class R>
-                       void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                               const A3, const A4, const A5, 
-                                               const A6, R & r));
-       template<class S, class A1, class A2, class A3, class A4, class A5, class A6,
-               class A7, class R>
-                       void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, 
-                                               const A3, const A4, const A5, 
-                                               const A6, const A7,
-                                               R & r));
-};
+    std::map<unsigned int, std::list<reply_t> > reply_window_;
 
-template<class S, class A1, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               R r;
-                               args >> a1;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    void free_reply_window(void);
+    void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz);
 
-template<class S, class A1, class A2, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, 
+            unsigned int xid, unsigned int rep_xid,
+            char **b, int *sz);
 
-template<class S, class A1, class A2, class A3, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    void updatestat(unsigned int proc);
 
-template<class S, class A1, class A2, class A3, class A4, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, R & r))
-                               : sob(xsob), meth(xmeth)  { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    // latest connection to the client
+    std::map<unsigned int, connection *> conns_;
 
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    // counting
+    const int counting_;
+    int curr_counts_;
+    std::map<int, int> counts_;
 
-template<class S, class A1, class A2, class A3, class A4, class A5, class A6, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, const A6 a6, 
-                       R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, const A6 a6, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, const A6 a6, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               A6 a6;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               args >> a6;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    int lossytest_; 
+    bool reachable_;
 
-template<class S, class A1, class A2, class A3, class A4, class A5, 
-       class A6, class A7, class R> void
-rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, 
-                       const A3 a3, const A4 a4, 
-                       const A5 a5, const A6 a6,
-                       const A7 a7, R & r))
-{
-       class h1 : public handler {
-               private:
-                       S * sob;
-                       int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, 
-                                       const A5 a5, const A6 a6, const A7 a7, R & r);
-               public:
-                       h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, 
-                                               const A4 a4, const A5 a5, const A6 a6,
-                                               const A7 a7, R & r))
-                               : sob(xsob), meth(xmeth) { }
-                       int fn(unmarshall &args, marshall &ret) {
-                               A1 a1;
-                               A2 a2;
-                               A3 a3;
-                               A4 a4;
-                               A5 a5;
-                               A6 a6;
-                               A7 a7;
-                               R r;
-                               args >> a1;
-                               args >> a2;
-                               args >> a3;
-                               args >> a4;
-                               args >> a5;
-                               args >> a6;
-                               args >> a7;
-                               if(!args.okdone())
-                                       return rpc_const::unmarshal_args_failure;
-                               int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r);
-                               ret << r;
-                               return b;
-                       }
-       };
-       reg1(proc, new h1(sob, meth));
-}
+    // map proc # to function
+    std::map<int, handler *> procs_;
+
+    std::mutex procs_m_; // protect insert/delete to procs[]
+    std::mutex count_m_;  //protect modification of counts
+    std::mutex reply_window_m_; // protect reply window et al
+    std::mutex conss_m_; // protect conns_
+
+
+    protected:
+
+    struct djob_t {
+        djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {}
+        char *buf;
+        int sz;
+        connection *conn;
+    };
+    void dispatch(djob_t *);
 
+    // internal handler registration
+    void reg1(unsigned int proc, handler *);
+
+    ThrPool* dispatchpool_;
+    tcpsconn* listener_;
+
+    public:
+    rpcs(unsigned int port, int counts=0);
+    ~rpcs();
+    inline int port() { return listener_->port(); }
+    //RPC handler for clients binding
+    int rpcbind(int &r, int a);
+
+    void set_reachable(bool r) { reachable_ = r; }
+
+    bool got_pdu(connection *c, char *b, int sz);
+
+    template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
+};
+
+template<class F, class C> void rpcs::reg(unsigned int proc, F f, C *c) {
+    reg1(proc, marshalled_func<F, F, ReturnOnFailure>::wrap(f, c));
+}
 
 void make_sockaddr(const char *hostandport, struct sockaddr_in *dst);
 void make_sockaddr(const char *host, const char *port,
-               struct sockaddr_in *dst);
+        struct sockaddr_in *dst);
 
 #endif
index c43a9da..8aed748 100644 (file)
@@ -24,10 +24,10 @@ int port;
 // from multiple classes.
 class srv {
        public:
-               int handle_22(const std::string a, const std::string b, std::string & r);
-               int handle_fast(const int a, int &r);
-               int handle_slow(const int a, int &r);
-               int handle_bigrep(const int a, std::string &r);
+               int handle_22(std::string & r, const std::string a, const std::string b);
+               int handle_fast(int &r, const int a);
+               int handle_slow(int &r, const int a);
+               int handle_bigrep(std::string &r, const int a);
 };
 
 // a handler. a and b are arguments, r is the result.
@@ -38,21 +38,21 @@ class srv {
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
 int
-srv::handle_22(const std::string a, std::string b, std::string &r)
+srv::handle_22(std::string &r, const std::string a, std::string b)
 {
        r = a + b;
        return 0;
 }
 
 int
-srv::handle_fast(const int a, int &r)
+srv::handle_fast(int &r, const int a)
 {
        r = a + 1;
        return 0;
 }
 
 int
-srv::handle_slow(const int a, int &r)
+srv::handle_slow(int &r, const int a)
 {
        usleep(random() % 5000);
        r = a + 2;
@@ -60,7 +60,7 @@ srv::handle_slow(const int a, int &r)
 }
 
 int
-srv::handle_bigrep(const int len, std::string &r)
+srv::handle_bigrep(std::string &r, const int len)
 {
        r = std::string(len, 'x');
        return 0;
@@ -71,10 +71,10 @@ srv service;
 void startserver()
 {
        server = new rpcs(port);
-       server->reg(22, &service, &srv::handle_22);
-       server->reg(23, &service, &srv::handle_fast);
-       server->reg(24, &service, &srv::handle_slow);
-       server->reg(25, &service, &srv::handle_bigrep);
+       server->reg(22, &srv::handle_22, &service);
+       server->reg(23, &srv::handle_fast, &service);
+       server->reg(24, &srv::handle_slow, &service);
+       server->reg(25, &srv::handle_bigrep, &service);
 }
 
 void
diff --git a/rsm.cc b/rsm.cc
index df2b2fc..66c8d97 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -108,17 +108,17 @@ rsm::rsm(std::string _first, std::string _me) :
         commit_change(1);
     }
     rsmrpc = cfg->get_rpcs();
-    rsmrpc->reg(rsm_client_protocol::invoke, this, &rsm::client_invoke);
-    rsmrpc->reg(rsm_client_protocol::members, this, &rsm::client_members);
-    rsmrpc->reg(rsm_protocol::invoke, this, &rsm::invoke);
-    rsmrpc->reg(rsm_protocol::transferreq, this, &rsm::transferreq);
-    rsmrpc->reg(rsm_protocol::transferdonereq, this, &rsm::transferdonereq);
-    rsmrpc->reg(rsm_protocol::joinreq, this, &rsm::joinreq);
+    rsmrpc->reg(rsm_client_protocol::invoke, &rsm::client_invoke, this);
+    rsmrpc->reg(rsm_client_protocol::members, &rsm::client_members, this);
+    rsmrpc->reg(rsm_protocol::invoke, &rsm::invoke, this);
+    rsmrpc->reg(rsm_protocol::transferreq, &rsm::transferreq, this);
+    rsmrpc->reg(rsm_protocol::transferdonereq, &rsm::transferdonereq, this);
+    rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
 
     // tester must be on different port, otherwise it may partition itself
     testsvr = new rpcs(atoi(_me.c_str()) + 1);
-    testsvr->reg(rsm_test_protocol::net_repair, this, &rsm::test_net_repairreq);
-    testsvr->reg(rsm_test_protocol::breakpoint, this, &rsm::breakpointreq);
+    testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
+    testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
 
     {
         lock ml(rsm_mutex);
@@ -336,7 +336,7 @@ void rsm::execute(int procno, std::string req, std::string &r) {
     unmarshall args(req);
     marshall rep;
     std::string reps;
-    rsm_protocol::status ret = h->fn(args, rep);
+    rsm_protocol::status ret = (*h)(args, rep);
     marshall rep1;
     rep1 << ret;
     rep1 << rep.str();
@@ -349,7 +349,7 @@ void rsm::execute(int procno, std::string req, std::string &r) {
 // number, and invokes it on all members of the replicated state
 // machine.
 //
-rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std::string &r) {
+rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) {
     LOG("rsm::client_invoke: procno 0x" << std::hex << procno);
     lock ml(invoke_mutex);
     std::vector<std::string> m;
@@ -403,7 +403,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std:
 // the replica must execute requests in order (with no gaps)
 // according to requests' seqno
 
-rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &dummy) {
+rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) {
     LOG("rsm::invoke: procno 0x" << std::hex << proc);
     lock ml(invoke_mutex);
     std::vector<std::string> m;
@@ -438,8 +438,8 @@ rsm_protocol::status rsm::invoke(int proc, viewstamp vs, std::string req, int &d
 /**
  * RPC handler: Send back the local node's state to the caller
  */
-rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned vid,
-        rsm_protocol::transferres &r) {
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src,
+        viewstamp last, unsigned vid) {
     lock ml(rsm_mutex);
     int ret = rsm_protocol::OK;
     tprintf("transferreq from %s (%d,%d) vs (%d,%d)\n", src.c_str(),
@@ -457,7 +457,7 @@ rsm_protocol::status rsm::transferreq(std::string src, viewstamp last, unsigned
  * RPC handler: Inform the local node (the primary) that node m has synchronized
  * for view vid
  */
-rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
+rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) {
     lock ml(rsm_mutex);
     if (!insync || vid != vid_insync)
         return rsm_protocol::BUSY;
@@ -470,7 +470,7 @@ rsm_protocol::status rsm::transferdonereq(std::string m, unsigned vid, int &) {
 // a node that wants to join an RSM as a server sends a
 // joinreq to the RSM's current primary; this is the
 // handler for that RPC.
-rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::joinres &r) {
+rsm_protocol::status rsm::joinreq(rsm_protocol::joinres &r, std::string m, viewstamp last) {
     int ret = rsm_protocol::OK;
 
     lock ml(rsm_mutex);
@@ -508,7 +508,7 @@ rsm_protocol::status rsm::joinreq(std::string m, viewstamp last, rsm_protocol::j
  * so the client can switch to a different primary
  * when it existing primary fails
  */
-rsm_client_protocol::status rsm::client_members(int i, std::vector<std::string> &r) {
+rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int i) {
     std::vector<std::string> m;
     lock ml(rsm_mutex);
     cfg->get_view(vid_commit, m);
@@ -568,7 +568,7 @@ void rsm::net_repair_wo(bool heal) {
     rsmrpc->set_reachable(heal);
 }
 
-rsm_test_protocol::status rsm::test_net_repairreq(int heal, int &r) {
+rsm_test_protocol::status rsm::test_net_repairreq(int &r, int heal) {
     lock ml(rsm_mutex);
     tprintf("rsm::test_net_repairreq: %d (dopartition %d, partitioned %d)\n",
             heal, dopartition, partitioned);
@@ -607,7 +607,7 @@ void rsm::partition1() {
     }
 }
 
-rsm_test_protocol::status rsm::breakpointreq(int b, int &r) {
+rsm_test_protocol::status rsm::breakpointreq(int &r, int b) {
     r = rsm_test_protocol::OK;
     lock ml(rsm_mutex);
     tprintf("rsm::breakpointreq: %d\n", b);
diff --git a/rsm.h b/rsm.h
index ec7632d..bf37a5d 100644 (file)
--- a/rsm.h
+++ b/rsm.h
@@ -11,7 +11,6 @@
 #include <arpa/inet.h>
 #include "config.h"
 
-
 class rsm : public config_view_change {
     private:
         void reg1(int proc, handler *);
@@ -39,17 +38,15 @@ class rsm : public config_view_change {
         bool break2;
 
 
-        rsm_client_protocol::status client_members(int i,
-                std::vector<std::string> &r);
-        rsm_protocol::status invoke(int proc, viewstamp vs, std::string mreq,
-                int &dummy);
-        rsm_protocol::status transferreq(std::string src, viewstamp last, unsigned vid,
-                rsm_protocol::transferres &r);
-        rsm_protocol::status transferdonereq(std::string m, unsigned vid, int &);
-        rsm_protocol::status joinreq(std::string src, viewstamp last,
-                rsm_protocol::joinres &r);
-        rsm_test_protocol::status test_net_repairreq(int heal, int &r);
-        rsm_test_protocol::status breakpointreq(int b, int &r);
+        rsm_client_protocol::status client_members(std::vector<std::string> &r, int i);
+        rsm_protocol::status invoke(int &, int proc, viewstamp vs, std::string mreq);
+        rsm_protocol::status transferreq(rsm_protocol::transferres &r, std::string src,
+                viewstamp last, unsigned vid);
+        rsm_protocol::status transferdonereq(int &, std::string m, unsigned vid);
+        rsm_protocol::status joinreq(rsm_protocol::joinres &r, std::string src,
+                viewstamp last);
+        rsm_test_protocol::status test_net_repairreq(int &r, int heal);
+        rsm_test_protocol::status breakpointreq(int &r, int b);
 
         std::mutex rsm_mutex;
         std::mutex invoke_mutex;
@@ -57,8 +54,8 @@ class rsm : public config_view_change {
         std::condition_variable sync_cond;
 
         void execute(int procno, std::string req, std::string &r);
-        rsm_client_protocol::status client_invoke(int procno, std::string req,
-                std::string &r);
+        rsm_client_protocol::status client_invoke(std::string &r, int procno,
+                std::string req);
         bool statetransfer(std::string m);
         bool statetransferdone(std::string m);
         bool join(std::string m);
@@ -80,157 +77,11 @@ class rsm : public config_view_change {
         void recovery();
         void commit_change(unsigned vid);
 
-        template<class S, class A1, class R>
-        void reg(int proc, S*, int (S::*meth)(const A1 a1, R &));
-        template<class S, class A1, class A2, class R>
-        void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2, R &));
-        template<class S, class A1, class A2, class A3, class R>
-        void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
-                    const A3 a3, R &));
-        template<class S, class A1, class A2, class A3, class A4, class R>
-        void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
-                    const A3 a3, const A4 a4, R &));
-        template<class S, class A1, class A2, class A3, class A4, class A5, class R>
-        void reg(int proc, S*, int (S::*meth)(const A1 a1, const A2 a2,
-                    const A3 a3, const A4 a4, const A5 a5, R &));
+        template<class F, class C=void> void reg(int proc, F f, C *c=nullptr);
 };
 
-template<class S, class A1, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) {
-  class h1 : public handler {
-  private:
-    S * sob;
-    int (S::*meth)(const A1 a1, R & r);
-  public:
-  h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r))
-      : sob(xsob), meth(xmeth) { }
-    int fn(unmarshall &args, marshall &ret) {
-      A1 a1;
-      R r;
-      args >> a1;
-      VERIFY(args.okdone());
-      int b = (sob->*meth)(a1,r);
-      ret << r;
-      return b;
-    }
-  };
-  reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, R & r)) {
- class h1 : public handler {
-  private:
-    S * sob;
-    int (S::*meth)(const A1 a1, const A2 a2, R & r);
-  public:
-  h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r))
-    : sob(xsob), meth(xmeth) { }
-    int fn(unmarshall &args, marshall &ret) {
-      A1 a1;
-      A2 a2;
-      R r;
-      args >> a1;
-      args >> a2;
-      VERIFY(args.okdone());
-      int b = (sob->*meth)(a1,a2,r);
-      ret << r;
-      return b;
-    }
-  };
-  reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
-             const A3 a3, R & r)) {
- class h1 : public handler {
-  private:
-    S * sob;
-    int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r);
-  public:
-  h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r))
-    : sob(xsob), meth(xmeth) { }
-    int fn(unmarshall &args, marshall &ret) {
-      A1 a1;
-      A2 a2;
-      A3 a3;
-      R r;
-      args >> a1;
-      args >> a2;
-      args >> a3;
-      VERIFY(args.okdone());
-      int b = (sob->*meth)(a1,a2,a3,r);
-      ret << r;
-      return b;
-    }
-  };
-  reg1(proc, new h1(sob, meth));
-}
-
-template<class S, class A1, class A2, class A3, class A4, class R>
-void rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
-             const A3 a3, const A4 a4, R & r)) {
- class h1 : public handler {
-  private:
-    S * sob;
-    int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r);
-  public:
-  h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
-            const A4 a4, R & r))
-    : sob(xsob), meth(xmeth) { }
-    int fn(unmarshall &args, marshall &ret) {
-      A1 a1;
-      A2 a2;
-      A3 a3;
-      A4 a4;
-      R r;
-      args >> a1;
-      args >> a2;
-      args >> a3;
-      args >> a4;
-      VERIFY(args.okdone());
-      int b = (sob->*meth)(a1,a2,a3,a4,r);
-      ret << r;
-      return b;
-    }
-  };
-  reg1(proc, new h1(sob, meth));
-}
-
-
-template<class S, class A1, class A2, class A3, class A4, class A5, class R> void
-  rsm::reg(int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2,
-             const A3 a3, const A4 a4,
-             const A5 a5, R & r))
-{
- class h1 : public handler {
-  private:
-    S * sob;
-    int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4,
-       const A5 a5, R & r);
-  public:
-  h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3,
-            const A4 a4, const A5 a5, R & r))
-    : sob(xsob), meth(xmeth) { }
-    int fn(unmarshall &args, marshall &ret) {
-      A1 a1;
-      A2 a2;
-      A3 a3;
-      A4 a4;
-      A5 a5;
-      R r;
-      args >> a1;
-      args >> a2;
-      args >> a3;
-      args >> a4;
-      VERIFY(args.okdone());
-      int b = (sob->*meth)(a1,a2,a3,a4,a5,r);
-      ret << r;
-      return b;
-    }
-  };
-  reg1(proc, new h1(sob, meth));
+template<class F, class C> void rsm::reg(int proc, F f, C *c) {
+    reg1(proc, marshalled_func<F, F>::wrap(f, c));
 }
 
 #endif /* rsm_h */