X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/61809b48ade4c21b1b01931d520aa2abc7507032..eeab3e6cade87c1fe0a5f3d93522e12ccb9ec2ab:/rpc/rpctest.cc diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 4c66e43..c381745 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -10,7 +10,6 @@ #include #include #include "jsl_log.h" -#include "gettime.h" #include "lang/verify.h" #define NUM_CL 2 @@ -19,17 +18,16 @@ rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object struct sockaddr_in dst; //server's ip address int port; -pthread_attr_t attr; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers // from multiple classes. class srv { public: - int handle_22(const std::string a, const std::string b, std::string & r); - int handle_fast(const int a, int &r); - int handle_slow(const int a, int &r); - int handle_bigrep(const int a, std::string &r); + int handle_22(std::string & r, const std::string a, const std::string b); + int handle_fast(int &r, const int a); + int handle_slow(int &r, const int a); + int handle_bigrep(std::string &r, const int a); }; // a handler. a and b are arguments, r is the result. @@ -40,21 +38,21 @@ class srv { // at these argument types, so this function definition // does what a .x file does in SunRPC. int -srv::handle_22(const std::string a, std::string b, std::string &r) +srv::handle_22(std::string &r, const std::string a, std::string b) { r = a + b; return 0; } int -srv::handle_fast(const int a, int &r) +srv::handle_fast(int &r, const int a) { r = a + 1; return 0; } int -srv::handle_slow(const int a, int &r) +srv::handle_slow(int &r, const int a) { usleep(random() % 5000); r = a + 2; @@ -62,7 +60,7 @@ srv::handle_slow(const int a, int &r) } int -srv::handle_bigrep(const int len, std::string &r) +srv::handle_bigrep(std::string &r, const int len) { r = std::string(len, 'x'); return 0; @@ -73,17 +71,17 @@ srv service; void startserver() { server = new rpcs(port); - server->reg(22, &service, &srv::handle_22); - server->reg(23, &service, &srv::handle_fast); - server->reg(24, &service, &srv::handle_slow); - server->reg(25, &service, &srv::handle_bigrep); + server->reg(22, &srv::handle_22, &service); + server->reg(23, &srv::handle_fast, &service); + server->reg(24, &srv::handle_slow, &service); + server->reg(25, &srv::handle_bigrep, &service); } void testmarshall() { marshall m; - req_header rh(1,2,3,4,5); + request_header rh{1,2,3,4,5}; m.pack_req_header(rh); VERIFY(m.size()==RPC_HEADER_SZ); int i = 12345; @@ -99,7 +97,7 @@ testmarshall() VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); unmarshall un(b,sz); - req_header rh1; + request_header rh1; un.unpack_req_header(&rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; @@ -112,17 +110,16 @@ testmarshall() VERIFY(i1==i && l1==l && s1==s); } -void * -client1(void *xx) +void +client1(int cl) { - // test concurrency. - int which_cl = ((unsigned long) xx ) % NUM_CL; + int which_cl = ((unsigned long) cl ) % NUM_CL; for(int i = 0; i < 100; i++){ int arg = (random() % 2000); std::string rep; - int ret = clients[which_cl]->call(25, arg, rep); + int ret = clients[which_cl]->call(25, rep, arg); VERIFY(ret == 0); if ((int)rep.size()!=arg) { printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); @@ -137,25 +134,22 @@ client1(void *xx) int arg = (random() % 1000); int rep; - struct timespec start,end; - clock_gettime(CLOCK_REALTIME, &start); + auto start = std::chrono::steady_clock::now(); - int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); - clock_gettime(CLOCK_REALTIME, &end); - int diff = diff_timespec(end, start); + int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); + auto end = std::chrono::steady_clock::now(); + int diff = std::chrono::duration_cast(end - start).count(); if (ret != 0) printf("%d ms have elapsed!!!\n", diff); VERIFY(ret == 0); VERIFY(rep == (which ? arg+1 : arg+2)); } - - return 0; } -void * -client2(void *xx) +void +client2(int cl) { - int which_cl = ((unsigned long) xx ) % NUM_CL; + int which_cl = ((unsigned long) cl ) % NUM_CL; time_t t1; time(&t1); @@ -163,27 +157,25 @@ client2(void *xx) while(time(0) - t1 < 10){ int arg = (random() % 2000); std::string rep; - int ret = clients[which_cl]->call(25, arg, rep); + int ret = clients[which_cl]->call(25, rep, arg); if ((int)rep.size()!=arg) { printf("ask for %d reply got %d ret %d\n", arg, (int)rep.size(), ret); } VERIFY((int)rep.size() == arg); } - return 0; } -void * +void client3(void *xx) { rpcc *c = (rpcc *) xx; for(int i = 0; i < 4; i++){ int rep; - int ret = c->call(24, i, rep, rpcc::to(3000)); + int ret = c->call_timeout(24, rpcc::to(3000), rep, i); VERIFY(ret == rpc_const::timeout_failure || rep == i+2); } - return 0; } @@ -196,53 +188,35 @@ simple_tests(rpcc *c) // to marshall the RPC call packet, and how to unmarshall // the reply packet. std::string rep; - int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep); + int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == 0); // this is what handle_22 returns VERIFY(rep == "hello goodbye"); printf(" -- string concat RPC .. ok\n"); // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call(25, 70000, rep, rpcc::to(200000)); + intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); VERIFY(intret == 0); VERIFY(rep.size() == 70000); printf(" -- small request, big reply .. ok\n"); -#if 0 - // too few arguments - intret = c->call(22, (std::string)"just one", rep); - VERIFY(intret < 0); - printf(" -- too few arguments .. failed ok\n"); - - // too many arguments; proc #23 expects just one. - intret = c->call(23, 1001, 1002, rep); - VERIFY(intret < 0); - printf(" -- too many arguments .. failed ok\n"); - - // wrong return value size - int wrongrep; - intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep); - VERIFY(intret < 0); - printf(" -- wrong ret value size .. failed ok\n"); -#endif - // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call(23, 77, xx, rpcc::to(3000)); + intret = c->call_timeout(23, rpcc::to(3000), xx, 77); VERIFY(intret == 0 && xx == 78); - printf(" -- no suprious timeout .. ok\n"); + printf(" -- no spurious timeout .. ok\n"); // specify a timeout value to an RPC that should succeed (tcp) { std::string arg(1000, 'x'); std::string rep; - c->call(22, arg, (std::string)"x", rep, rpcc::to(3000)); + c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x"); VERIFY(rep.size() == 1001); - printf(" -- no suprious timeout .. ok\n"); + printf(" -- no spurious timeout .. ok\n"); } // huge RPC std::string big(1000000, 'x'); - intret = c->call(22, big, (std::string)"z", rep); + intret = c->call(22, rep, big, (std::string)"z"); VERIFY(rep.size() == 1000001); printf(" -- huge 1M rpc request .. ok\n"); @@ -267,18 +241,15 @@ concurrent_test(int nt) // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. - int ret; - printf("start concurrent_test (%d threads) ...", nt); - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i); - VERIFY(ret == 0); + th[i] = std::thread(client1, i); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf(" OK\n"); } @@ -286,8 +257,6 @@ concurrent_test(int nt) void lossy_test() { - int ret; - printf("start lossy_test ..."); VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); @@ -303,13 +272,12 @@ lossy_test() } int nt = 1; - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i); - VERIFY(ret == 0); + th[i] = std::thread(client2, i); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf(".. OK\n"); VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); @@ -334,7 +302,7 @@ failure_test() startserver(); std::string rep; - int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == rpc_const::oldsrv_failure); printf(" -- call recovered server with old client .. failed ok\n"); @@ -344,7 +312,7 @@ failure_test() VERIFY (client->bind() >= 0); VERIFY (client->bind() < 0); - intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); VERIFY(intret == 0); VERIFY(rep == "hello goodbye"); @@ -352,17 +320,15 @@ failure_test() int nt = 10; - int ret; printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); - pthread_t th[nt]; + std::vector th(nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client3, (void *) client); - VERIFY(ret == 0); + th[i] = std::thread(client3, client); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf("ok\n"); @@ -376,12 +342,11 @@ failure_test() printf(" -- concurrent test on new client and server w/ %d threads ..", nt); for(int i = 0; i < nt; i++){ - ret = pthread_create(&th[i], &attr, client3, (void *)client); - VERIFY(ret == 0); + th[i] = std::thread(client3, client); } for(int i = 0; i < nt; i++){ - VERIFY(pthread_join(th[i], NULL) == 0); + th[i].join(); } printf("ok\n"); @@ -429,19 +394,14 @@ main(int argc, char *argv[]) } if (debug_level > 0) { - //__loginit.initNow(); - jsl_set_debug(debug_level); + JSL_DEBUG_LEVEL = debug_level; jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); } testmarshall(); - pthread_attr_init(&attr); - // set stack size to 32K, so we don't run out of memory - pthread_attr_setstacksize(&attr, 32*1024); - if (isserver) { - printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ); + printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ); startserver(); }