Variadic templates for RPCs
authorPeter Iannucci <iannucci@mit.edu>
Tue, 17 Sep 2013 01:57:08 +0000 (21:57 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Wed, 25 Sep 2013 01:46:12 +0000 (21:46 -0400)
13 files changed:
.gitignore
config.cc
lock_client.cc
lock_client_cache_rsm.cc
lock_server_cache_rsm.cc
paxos.cc
rpc/marshall.h
rpc/rpc.cc
rpc/rpc.h
rpc/rpctest.cc
rsm.cc
rsm_client.cc
rsmtest_client.cc

index 510791b..2dde284 100644 (file)
@@ -10,3 +10,4 @@ lock_server
 *.a
 *.log
 rsm_tester
+config
index 5127cb2..96e6cd7 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -298,8 +298,7 @@ config::doheartbeat(const std::string &m)
         ml.unlock();
         rpcc *cl = h.safebind();
         if (cl) {
-            ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
-                                         rpcc::to(1000));
+            ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
         }
         ml.lock();
     }
index 11bc476..a71a206 100644 (file)
@@ -22,7 +22,7 @@ int
 lock_client::stat(lock_protocol::lockid_t lid)
 {
     int r;
-    lock_protocol::status ret = cl->call(lock_protocol::stat, cl->id(), lid, r);
+    lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid);
     VERIFY (ret == lock_protocol::OK);
     return r;
 }
@@ -31,14 +31,14 @@ lock_protocol::status
 lock_client::acquire(lock_protocol::lockid_t lid)
 {
     int r;
-    return cl->call(lock_protocol::acquire, cl->id(), lid, r);
+    return cl->call(lock_protocol::acquire, r, cl->id(), lid);
 }
 
 lock_protocol::status
 lock_client::release(lock_protocol::lockid_t lid)
 {
     int r;
-    return cl->call(lock_protocol::release, cl->id(), lid, r);
+    return cl->call(lock_protocol::release, r, cl->id(), lid);
 }
 
 t4_lock_client *t4_lock_client_new(const char *dst) {
index 80bc87f..c0be985 100644 (file)
@@ -82,7 +82,7 @@ void lock_client_cache_rsm::releaser() {
         {
             sl.unlock();
             int r;
-            rsmc->call(lock_protocol::release, lid, id, st.xid, r);
+            rsmc->call(lock_protocol::release, r, lid, id, st.xid);
             sl.lock();
         }
         st.state = lock_state::none;
@@ -117,7 +117,7 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid
             {
                 sl.unlock();
                 int r;
-                result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r);
+                result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid);
                 sl.lock();
             }
             LOG("acquire returned " << result);
index 0e43ec5..00d3f54 100644 (file)
@@ -81,7 +81,7 @@ void lock_server_cache_rsm::revoker() {
         proxy = handle(held_by.first).safebind();
         if (proxy) {
             int r;
-            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r);
+            rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second);
             LOG("Revoke returned " << ret);
         }
     }
@@ -113,7 +113,7 @@ void lock_server_cache_rsm::retryer() {
         proxy = handle(front.first).safebind();
         if (proxy) {
             int r;
-            ret = proxy->call(rlock_protocol::retry, lid, front.second, r);
+            ret = proxy->call(rlock_protocol::retry, r, lid, front.second);
             LOG("Retry returned " << ret);
         }
     }
index 89f1714..4434788 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -157,7 +157,7 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
         handle h(*i);
         if (!(r = h.safebind()))
             continue;
-        int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
+        int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 tprintf("commiting old instance!\n");
@@ -190,7 +190,7 @@ proposer::accept(unsigned instance, std::vector<std::string> &accepts,
         if (!(r = h.safebind()))
             continue;
         bool accept = false;
-        int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
+        int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
         if (status == paxos_protocol::OK) {
             if (accept)
                 accepts.push_back(*i);
@@ -209,7 +209,7 @@ proposer::decide(unsigned instance, std::vector<std::string> accepts,
         if (!(r = h.safebind()))
             continue;
         int res = 0;
-        r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
+        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
     }
 }
 
index fcb5bab..27cebbb 100644 (file)
@@ -34,6 +34,10 @@ typedef int rpc_sz_t;
 #define DEFAULT_RPC_SZ 1024
 #define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t))
 
+struct pass {
+    template<typename... Args> inline pass(Args&&...) {}
+};
+
 class marshall {
        private:
                char *_buf;     // Base of the raw bytes buffer (dynamically readjusted)
@@ -48,6 +52,10 @@ class marshall {
                        _ind = RPC_HEADER_SZ;
                }
 
+        template <typename... Args> marshall(const Args&... args) : marshall() {
+            (void)pass{(*this << args)...};
+        }
+
                ~marshall() { 
                        if (_buf) 
                                free(_buf); 
@@ -210,6 +218,15 @@ class unmarshall {
                        unpack(&h->ret);
                        _ind = RPC_HEADER_SZ;
                }
+
+        template <class OutputIterator>
+        void iterate(OutputIterator i, int n) {
+            while (n--) {
+                typename OutputIterator::value_type t;
+                *this >> t;
+                *i++ = t;
+            }
+        }
 };
 
 unmarshall& operator>>(unmarshall &, bool &);
index d53776a..a5d5b1f 100644 (file)
@@ -128,7 +128,7 @@ int
 rpcc::bind(TO to)
 {
     int r;
-    int ret = call(rpc_const::bind, 0, r, to);
+    int ret = call_timeout(rpc_const::bind, to, r, 0);
     if(ret == 0){
         lock ml(m_);
         bind_done_ = true;
index 723121c..f7245ad 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -111,144 +111,40 @@ class rpcc : public chanmgr {
                template<class R>
                        int call_m(unsigned int proc, marshall &req, R & r, TO to);
 
-               template<class R>
-                       int call(unsigned int proc, R & r, TO to = to_max); 
-               template<class R, class A1>
-                       int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); 
-               template<class R, class A1, class A2>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, 
-                                       TO to = to_max); 
-               template<class R, class A1, class A2, class A3>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       const A4 & a4, R & r, TO to = to_max);
-               template<class R, class A1, class A2, class A3, class A4, class A5>
-                       int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                       const A4 & a4, const A5 & a5, R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4, class A5,
-                       class A6>
-                               int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                               const A4 & a4, const A5 & a5, const A6 & a6,
-                                               R & r, TO to = to_max); 
-               template<class R, class A1, class A2, class A3, class A4, class A5, 
-                       class A6, class A7>
-                               int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, 
-                                               const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7,
-                                               R & r, TO to = to_max); 
+               template<class R, typename ...Args>
+                       inline int call(unsigned int proc, R & r, const Args&... args);
 
+               template<class R, typename ...Args>
+                       inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args);
 };
 
 template<class R> int 
 rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) 
 {
-       unmarshall u;
-       int intret = call1(proc, req, u, to);
-       if (intret < 0) return intret;
-       u >> r;
-       if(u.okdone() != true) {
-                fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
-                       "You are probably calling RPC 0x%x with wrong return "
-                       "type.\n", proc);
-                VERIFY(0);
-               return rpc_const::unmarshal_reply_failure;
-        }
-       return intret;
-}
-
-template<class R> int
-rpcc::call(unsigned int proc, R & r, TO to) 
-{
-       marshall m;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1> int
-rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1, class A2> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1, class A2, class A3> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       return call_m(proc, m, r, to);
+    unmarshall u;
+    int intret = call1(proc, req, u, to);
+    if (intret < 0) return intret;
+    u >> r;
+    if (u.okdone() != true) {
+        fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply."
+                "You are probably calling RPC 0x%x with wrong return "
+                "type.\n", proc);
+        VERIFY(0);
+        return rpc_const::unmarshal_reply_failure;
+    }
+    return intret;
 }
 
-template<class R, class A1, class A2, class A3, class A4> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, R & r, TO to) 
+template<class R, typename... Args> inline int
+rpcc::call(unsigned int proc, R & r, const Args&... args)
 {
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1, class A2, class A3, class A4, class A5> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       return call_m(proc, m, r, to);
-}
-
-template<class R, class A1, class A2, class A3, class A4, class A5,
-       class A6> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, 
-               const A6 & a6, R & r, TO to) 
-{
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       m << a6;
-       return call_m(proc, m, r, to);
+    return call_timeout(proc, rpcc::to_max, r, args...);
 }
 
-template<class R, class A1, class A2, class A3, class A4, class A5,
-       class A6, class A7> int
-rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2,
-               const A3 & a3, const A4 & a4, const A5 & a5, 
-               const A6 & a6, const A7 & a7,
-               R & r, TO to) 
+template<class R, typename... Args> inline int
+rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args)
 {
-       marshall m;
-       m << a1;
-       m << a2;
-       m << a3;
-       m << a4;
-       m << a5;
-       m << a6;
-       m << a7;
+       marshall m{args...};
        return call_m(proc, m, r, to);
 }
 
index 115f484..c43a9da 100644 (file)
@@ -119,7 +119,7 @@ client1(int cl)
        for(int i = 0; i < 100; i++){
                int arg = (random() % 2000);
                std::string rep;
-               int ret = clients[which_cl]->call(25, arg, rep);
+               int ret = clients[which_cl]->call(25, rep, arg);
                VERIFY(ret == 0);
                if ((int)rep.size()!=arg) {
                        printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
@@ -136,7 +136,7 @@ client1(int cl)
 
                auto start = std::chrono::steady_clock::now();
 
-               int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
+               int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
                auto end = std::chrono::steady_clock::now();
                int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
                if (ret != 0)
@@ -157,7 +157,7 @@ client2(int cl)
        while(time(0) - t1 < 10){
                int arg = (random() % 2000);
                std::string rep;
-               int ret = clients[which_cl]->call(25, arg, rep);
+               int ret = clients[which_cl]->call(25, rep, arg);
                if ((int)rep.size()!=arg) {
                        printf("ask for %d reply got %d ret %d\n",
                                arg, (int)rep.size(), ret);
@@ -173,7 +173,7 @@ client3(void *xx)
 
        for(int i = 0; i < 4; i++){
                int rep;
-               int ret = c->call(24, i, rep, rpcc::to(3000));
+               int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
                VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
        }
 }
@@ -188,20 +188,20 @@ simple_tests(rpcc *c)
        // to marshall the RPC call packet, and how to unmarshall
        // the reply packet.
        std::string rep;
-       int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+       int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
        VERIFY(intret == 0); // this is what handle_22 returns
        VERIFY(rep == "hello goodbye");
        printf("   -- string concat RPC .. ok\n");
 
        // small request, big reply (perhaps req via UDP, reply via TCP)
-       intret = c->call(25, 70000, rep, rpcc::to(200000));
+       intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
        VERIFY(intret == 0);
        VERIFY(rep.size() == 70000);
        printf("   -- small request, big reply .. ok\n");
 
        // specify a timeout value to an RPC that should succeed (udp)
        int xx = 0;
-       intret = c->call(23, 77, xx, rpcc::to(3000));
+       intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
        VERIFY(intret == 0 && xx == 78);
        printf("   -- no suprious timeout .. ok\n");
 
@@ -209,14 +209,14 @@ simple_tests(rpcc *c)
        {
                std::string arg(1000, 'x');
                std::string rep;
-               c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
+               c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x");
                VERIFY(rep.size() == 1001);
                printf("   -- no suprious timeout .. ok\n");
        }
 
        // huge RPC
        std::string big(1000000, 'x');
-       intret = c->call(22, big, (std::string)"z", rep);
+       intret = c->call(22, rep, big, (std::string)"z");
        VERIFY(rep.size() == 1000001);
        printf("   -- huge 1M rpc request .. ok\n");
 
@@ -302,7 +302,7 @@ failure_test()
        startserver();
 
        std::string rep;
-       int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+       int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
        VERIFY(intret == rpc_const::oldsrv_failure);
        printf("   -- call recovered server with old client .. failed ok\n");
 
@@ -312,7 +312,7 @@ failure_test()
        VERIFY (client->bind() >= 0);
        VERIFY (client->bind() < 0);
 
-       intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
+       intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
        VERIFY(intret == 0);
        VERIFY(rep == "hello goodbye");
 
diff --git a/rsm.cc b/rsm.cc
index b93c701..df2b2fc 100644 (file)
--- a/rsm.cc
+++ b/rsm.cc
@@ -239,8 +239,8 @@ bool rsm::statetransfer(std::string m)
         ml.unlock();
         cl = h.safebind();
         if (cl) {
-            ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(),
-                    last_myvs, vid_insync, r, rpcc::to(1000));
+            ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000),
+                    r, cfg->myaddr(), last_myvs, vid_insync);
         }
         ml.lock();
     }
@@ -266,7 +266,7 @@ bool rsm::statetransferdone(std::string m) {
     bool done = false;
     if (cl) {
         int r;
-        rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r);
+        rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync);
         done = (ret == rsm_protocol::OK);
     }
     ml.lock();
@@ -287,8 +287,8 @@ bool rsm::join(std::string m) {
         ml.unlock();
         cl = h.safebind();
         if (cl != 0) {
-            ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs,
-                    r, rpcc::to(120000));
+            ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r,
+                    cfg->myaddr(), last_myvs);
         }
         ml.lock();
     }
@@ -383,7 +383,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std:
                 return rsm_client_protocol::BUSY;
             rsm_protocol::status ret;
             int r;
-            ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000));
+            ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), r, procno, vs, req);
             LOG("Invoke returned " << ret);
             if (ret != rsm_protocol::OK)
                 return rsm_client_protocol::BUSY;
index eed356f..7062cea 100644 (file)
@@ -38,7 +38,7 @@ rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &
         ml.unlock();
         rpcc *cl = h.safebind();
         if (cl)
-            ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
+            ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
         ml.lock();
 
         if (!cl)
@@ -77,8 +77,7 @@ bool rsm_client::init_members() {
         ml.unlock();
         cl = h.safebind();
         if (cl) {
-            ret = cl->call(rsm_client_protocol::members, 0, known_mems,
-                    rpcc::to(1000));
+            ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
         }
         ml.lock();
     }
index e27e8e5..c61194e 100644 (file)
@@ -22,7 +22,7 @@ int
 rsmtest_client::net_repair(int heal)
 {
     int r;
-    int ret = cl->call(rsm_test_protocol::net_repair, heal, r);
+    int ret = cl->call(rsm_test_protocol::net_repair, r, heal);
     VERIFY (ret == rsm_test_protocol::OK);
     return r;
 }
@@ -31,7 +31,7 @@ int
 rsmtest_client::breakpoint(int b)
 {
     int r;
-    int ret = cl->call(rsm_test_protocol::breakpoint, b, r);
+    int ret = cl->call(rsm_test_protocol::breakpoint, r, b);
     VERIFY (ret == rsm_test_protocol::OK);
     return r;
 }