From 130f2d53438eb6193accb445aca52fa8e2fe4158 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Mon, 16 Sep 2013 21:57:08 -0400 Subject: [PATCH] Variadic templates for RPCs --- .gitignore | 1 + config.cc | 3 +- lock_client.cc | 6 +- lock_client_cache_rsm.cc | 4 +- lock_server_cache_rsm.cc | 4 +- paxos.cc | 6 +- rpc/marshall.h | 17 ++++++ rpc/rpc.cc | 2 +- rpc/rpc.h | 148 +++++++--------------------------------------- rpc/rpctest.cc | 22 +++---- rsm.cc | 12 ++-- rsm_client.cc | 5 +- rsmtest_client.cc | 4 +- 13 files changed, 73 insertions(+), 161 deletions(-) diff --git a/.gitignore b/.gitignore index 510791b..2dde284 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ lock_server *.a *.log rsm_tester +config diff --git a/config.cc b/config.cc index 5127cb2..96e6cd7 100644 --- a/config.cc +++ b/config.cc @@ -298,8 +298,7 @@ config::doheartbeat(const std::string &m) ml.unlock(); rpcc *cl = h.safebind(); if (cl) { - ret = cl->call(paxos_protocol::heartbeat, me, vid, r, - rpcc::to(1000)); + ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid); } ml.lock(); } diff --git a/lock_client.cc b/lock_client.cc index 11bc476..a71a206 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -22,7 +22,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) { int r; - lock_protocol::status ret = cl->call(lock_protocol::stat, cl->id(), lid, r); + lock_protocol::status ret = cl->call(lock_protocol::stat, r, cl->id(), lid); VERIFY (ret == lock_protocol::OK); return r; } @@ -31,14 +31,14 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { int r; - return cl->call(lock_protocol::acquire, cl->id(), lid, r); + return cl->call(lock_protocol::acquire, r, cl->id(), lid); } lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { int r; - return cl->call(lock_protocol::release, cl->id(), lid, r); + return cl->call(lock_protocol::release, r, cl->id(), lid); } t4_lock_client *t4_lock_client_new(const char *dst) { diff --git a/lock_client_cache_rsm.cc b/lock_client_cache_rsm.cc index 80bc87f..c0be985 100644 --- a/lock_client_cache_rsm.cc +++ b/lock_client_cache_rsm.cc @@ -82,7 +82,7 @@ void lock_client_cache_rsm::releaser() { { sl.unlock(); int r; - rsmc->call(lock_protocol::release, lid, id, st.xid, r); + rsmc->call(lock_protocol::release, r, lid, id, st.xid); sl.lock(); } st.state = lock_state::none; @@ -117,7 +117,7 @@ lock_protocol::status lock_client_cache_rsm::acquire(lock_protocol::lockid_t lid { sl.unlock(); int r; - result = rsmc->call(lock_protocol::acquire, lid, id, st.xid, r); + result = rsmc->call(lock_protocol::acquire, r, lid, id, st.xid); sl.lock(); } LOG("acquire returned " << result); diff --git a/lock_server_cache_rsm.cc b/lock_server_cache_rsm.cc index 0e43ec5..00d3f54 100644 --- a/lock_server_cache_rsm.cc +++ b/lock_server_cache_rsm.cc @@ -81,7 +81,7 @@ void lock_server_cache_rsm::revoker() { proxy = handle(held_by.first).safebind(); if (proxy) { int r; - rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, lid, held_by.second, r); + rlock_protocol::status ret = proxy->call(rlock_protocol::revoke, r, lid, held_by.second); LOG("Revoke returned " << ret); } } @@ -113,7 +113,7 @@ void lock_server_cache_rsm::retryer() { proxy = handle(front.first).safebind(); if (proxy) { int r; - ret = proxy->call(rlock_protocol::retry, lid, front.second, r); + ret = proxy->call(rlock_protocol::retry, r, lid, front.second); LOG("Retry returned " << ret); } } diff --git a/paxos.cc b/paxos.cc index 89f1714..4434788 100644 --- a/paxos.cc +++ b/paxos.cc @@ -157,7 +157,7 @@ proposer::prepare(unsigned instance, std::vector &accepts, handle h(*i); if (!(r = h.safebind())) continue; - int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000)); + int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg); if (status == paxos_protocol::OK) { if (res.oldinstance) { tprintf("commiting old instance!\n"); @@ -190,7 +190,7 @@ proposer::accept(unsigned instance, std::vector &accepts, if (!(r = h.safebind())) continue; bool accept = false; - int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000)); + int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg); if (status == paxos_protocol::OK) { if (accept) accepts.push_back(*i); @@ -209,7 +209,7 @@ proposer::decide(unsigned instance, std::vector accepts, if (!(r = h.safebind())) continue; int res = 0; - r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000)); + r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg); } } diff --git a/rpc/marshall.h b/rpc/marshall.h index fcb5bab..27cebbb 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -34,6 +34,10 @@ typedef int rpc_sz_t; #define DEFAULT_RPC_SZ 1024 #define RPC_HEADER_SZ (std::max(sizeof(req_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) +struct pass { + template inline pass(Args&&...) {} +}; + class marshall { private: char *_buf; // Base of the raw bytes buffer (dynamically readjusted) @@ -48,6 +52,10 @@ class marshall { _ind = RPC_HEADER_SZ; } + template marshall(const Args&... args) : marshall() { + (void)pass{(*this << args)...}; + } + ~marshall() { if (_buf) free(_buf); @@ -210,6 +218,15 @@ class unmarshall { unpack(&h->ret); _ind = RPC_HEADER_SZ; } + + template + void iterate(OutputIterator i, int n) { + while (n--) { + typename OutputIterator::value_type t; + *this >> t; + *i++ = t; + } + } }; unmarshall& operator>>(unmarshall &, bool &); diff --git a/rpc/rpc.cc b/rpc/rpc.cc index d53776a..a5d5b1f 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -128,7 +128,7 @@ int rpcc::bind(TO to) { int r; - int ret = call(rpc_const::bind, 0, r, to); + int ret = call_timeout(rpc_const::bind, to, r, 0); if(ret == 0){ lock ml(m_); bind_done_ = true; diff --git a/rpc/rpc.h b/rpc/rpc.h index 723121c..f7245ad 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -111,144 +111,40 @@ class rpcc : public chanmgr { template int call_m(unsigned int proc, marshall &req, R & r, TO to); - template - int call(unsigned int proc, R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, - TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, const A5 & a5, R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, const A5 & a5, const A6 & a6, - R & r, TO to = to_max); - template - int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, - const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7, - R & r, TO to = to_max); + template + inline int call(unsigned int proc, R & r, const Args&... args); + template + inline int call_timeout(unsigned int proc, TO to, R & r, const Args&... args); }; template int rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) { - unmarshall u; - int intret = call1(proc, req, u, to); - if (intret < 0) return intret; - u >> r; - if(u.okdone() != true) { - fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." - "You are probably calling RPC 0x%x with wrong return " - "type.\n", proc); - VERIFY(0); - return rpc_const::unmarshal_reply_failure; - } - return intret; -} - -template int -rpcc::call(unsigned int proc, R & r, TO to) -{ - marshall m; - return call_m(proc, m, r, to); -} - -template int -rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) -{ - marshall m; - m << a1; - return call_m(proc, m, r, to); -} - -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - R & r, TO to) -{ - marshall m; - m << a1; - m << a2; - return call_m(proc, m, r, to); -} - -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - const A3 & a3, R & r, TO to) -{ - marshall m; - m << a1; - m << a2; - m << a3; - return call_m(proc, m, r, to); + unmarshall u; + int intret = call1(proc, req, u, to); + if (intret < 0) return intret; + u >> r; + if (u.okdone() != true) { + fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; } -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - const A3 & a3, const A4 & a4, R & r, TO to) +template inline int +rpcc::call(unsigned int proc, R & r, const Args&... args) { - marshall m; - m << a1; - m << a2; - m << a3; - m << a4; - return call_m(proc, m, r, to); -} - -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) -{ - marshall m; - m << a1; - m << a2; - m << a3; - m << a4; - m << a5; - return call_m(proc, m, r, to); -} - -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - const A3 & a3, const A4 & a4, const A5 & a5, - const A6 & a6, R & r, TO to) -{ - marshall m; - m << a1; - m << a2; - m << a3; - m << a4; - m << a5; - m << a6; - return call_m(proc, m, r, to); + return call_timeout(proc, rpcc::to_max, r, args...); } -template int -rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, - const A3 & a3, const A4 & a4, const A5 & a5, - const A6 & a6, const A7 & a7, - R & r, TO to) +template inline int +rpcc::call_timeout(unsigned int proc, const rpcc::TO to, R & r, const Args&... args) { - marshall m; - m << a1; - m << a2; - m << a3; - m << a4; - m << a5; - m << a6; - m << a7; + marshall m{args...}; return call_m(proc, m, r, to); } diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 115f484..c43a9da 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -119,7 +119,7 @@ client1(int cl) for(int i = 0; i < 100; i++){ int arg = (random() % 2000); std::string rep; - int ret = clients[which_cl]->call(25, arg, rep); + int ret = clients[which_cl]->call(25, rep, arg); VERIFY(ret == 0); if ((int)rep.size()!=arg) { printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); @@ -136,7 +136,7 @@ client1(int cl) auto start = std::chrono::steady_clock::now(); - int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); + int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); auto end = std::chrono::steady_clock::now(); int diff = std::chrono::duration_cast(end - start).count(); if (ret != 0) @@ -157,7 +157,7 @@ client2(int cl) while(time(0) - t1 < 10){ int arg = (random() % 2000); std::string rep; - int ret = clients[which_cl]->call(25, arg, rep); + int ret = clients[which_cl]->call(25, rep, arg); if ((int)rep.size()!=arg) { printf("ask for %d reply got %d ret %d\n", arg, (int)rep.size(), ret); @@ -173,7 +173,7 @@ client3(void *xx) for(int i = 0; i < 4; i++){ int rep; - int ret = c->call(24, i, rep, rpcc::to(3000)); + int ret = c->call_timeout(24, rpcc::to(3000), rep, i); VERIFY(ret == rpc_const::timeout_failure || rep == i+2); } } @@ -188,20 +188,20 @@ simple_tests(rpcc *c) // to marshall the RPC call packet, and how to unmarshall // the reply packet. std::string rep; - int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep); + int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == 0); // this is what handle_22 returns VERIFY(rep == "hello goodbye"); printf(" -- string concat RPC .. ok\n"); // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call(25, 70000, rep, rpcc::to(200000)); + intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); VERIFY(intret == 0); VERIFY(rep.size() == 70000); printf(" -- small request, big reply .. ok\n"); // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call(23, 77, xx, rpcc::to(3000)); + intret = c->call_timeout(23, rpcc::to(3000), xx, 77); VERIFY(intret == 0 && xx == 78); printf(" -- no suprious timeout .. ok\n"); @@ -209,14 +209,14 @@ simple_tests(rpcc *c) { std::string arg(1000, 'x'); std::string rep; - c->call(22, arg, (std::string)"x", rep, rpcc::to(3000)); + c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x"); VERIFY(rep.size() == 1001); printf(" -- no suprious timeout .. ok\n"); } // huge RPC std::string big(1000000, 'x'); - intret = c->call(22, big, (std::string)"z", rep); + intret = c->call(22, rep, big, (std::string)"z"); VERIFY(rep.size() == 1000001); printf(" -- huge 1M rpc request .. ok\n"); @@ -302,7 +302,7 @@ failure_test() startserver(); std::string rep; - int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == rpc_const::oldsrv_failure); printf(" -- call recovered server with old client .. failed ok\n"); @@ -312,7 +312,7 @@ failure_test() VERIFY (client->bind() >= 0); VERIFY (client->bind() < 0); - intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == 0); VERIFY(rep == "hello goodbye"); diff --git a/rsm.cc b/rsm.cc index b93c701..df2b2fc 100644 --- a/rsm.cc +++ b/rsm.cc @@ -239,8 +239,8 @@ bool rsm::statetransfer(std::string m) ml.unlock(); cl = h.safebind(); if (cl) { - ret = cl->call(rsm_protocol::transferreq, cfg->myaddr(), - last_myvs, vid_insync, r, rpcc::to(1000)); + ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(1000), + r, cfg->myaddr(), last_myvs, vid_insync); } ml.lock(); } @@ -266,7 +266,7 @@ bool rsm::statetransferdone(std::string m) { bool done = false; if (cl) { int r; - rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, cfg->myaddr(), vid_insync, r); + rsm_protocol::status ret = cl->call(rsm_protocol::transferdonereq, r, cfg->myaddr(), vid_insync); done = (ret == rsm_protocol::OK); } ml.lock(); @@ -287,8 +287,8 @@ bool rsm::join(std::string m) { ml.unlock(); cl = h.safebind(); if (cl != 0) { - ret = cl->call(rsm_protocol::joinreq, cfg->myaddr(), last_myvs, - r, rpcc::to(120000)); + ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(120000), r, + cfg->myaddr(), last_myvs); } ml.lock(); } @@ -383,7 +383,7 @@ rsm_client_protocol::status rsm::client_invoke(int procno, std::string req, std: return rsm_client_protocol::BUSY; rsm_protocol::status ret; int r; - ret = cl->call(rsm_protocol::invoke, procno, vs, req, r, rpcc::to(1000)); + ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), r, procno, vs, req); LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; diff --git a/rsm_client.cc b/rsm_client.cc index eed356f..7062cea 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -38,7 +38,7 @@ rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string & ml.unlock(); rpcc *cl = h.safebind(); if (cl) - ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000)); + ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req); ml.lock(); if (!cl) @@ -77,8 +77,7 @@ bool rsm_client::init_members() { ml.unlock(); cl = h.safebind(); if (cl) { - ret = cl->call(rsm_client_protocol::members, 0, known_mems, - rpcc::to(1000)); + ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0); } ml.lock(); } diff --git a/rsmtest_client.cc b/rsmtest_client.cc index e27e8e5..c61194e 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -22,7 +22,7 @@ int rsmtest_client::net_repair(int heal) { int r; - int ret = cl->call(rsm_test_protocol::net_repair, heal, r); + int ret = cl->call(rsm_test_protocol::net_repair, r, heal); VERIFY (ret == rsm_test_protocol::OK); return r; } @@ -31,7 +31,7 @@ int rsmtest_client::breakpoint(int b) { int r; - int ret = cl->call(rsm_test_protocol::breakpoint, b, r); + int ret = cl->call(rsm_test_protocol::breakpoint, r, b); VERIFY (ret == rsm_test_protocol::OK); return r; } -- 1.7.9.5