X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/b86bce0900f88c530f23dd602a8f2ef9ce008f8a..603bac8fcb3697f283e6537d81b4a92e457ebbad:/rpc/rpctest.cc diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index c381745..2f58e5d 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -1,33 +1,31 @@ // RPC test and pseudo-documentation. // generates print statements on failures, but eventually says "rpctest OK" +#include "types.h" #include "rpc.h" #include -#include -#include -#include #include #include #include -#include "jsl_log.h" -#include "lang/verify.h" #define NUM_CL 2 +char log_thread_prefix = 'r'; + rpcs *server; // server rpc object rpcc *clients[NUM_CL]; // client rpc object -struct sockaddr_in dst; //server's ip address -int port; +string dst; //server's ip address +in_port_t port; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers // from multiple classes. class srv { - public: - int handle_22(std::string & r, const std::string a, const std::string b); - int handle_fast(int &r, const int a); - int handle_slow(int &r, const int a); - int handle_bigrep(std::string &r, const int a); + public: + int handle_22(string & r, const string a, const string b); + int handle_fast(int &r, const int a); + int handle_slow(int &r, const int a); + int handle_bigrep(string &r, const size_t a); }; // a handler. a and b are arguments, r is the result. @@ -38,404 +36,395 @@ class srv { // at these argument types, so this function definition // does what a .x file does in SunRPC. int -srv::handle_22(std::string &r, const std::string a, std::string b) +srv::handle_22(string &r, const string a, string b) { - r = a + b; - return 0; + r = a + b; + return 0; } int srv::handle_fast(int &r, const int a) { - r = a + 1; - return 0; + r = a + 1; + return 0; } int srv::handle_slow(int &r, const int a) { - usleep(random() % 5000); - r = a + 2; - return 0; + usleep(random() % 500); + r = a + 2; + return 0; } int -srv::handle_bigrep(std::string &r, const int len) +srv::handle_bigrep(string &r, const size_t len) { - r = std::string(len, 'x'); - return 0; + r = string((size_t)len, 'x'); + return 0; } srv service; void startserver() { - server = new rpcs(port); - server->reg(22, &srv::handle_22, &service); - server->reg(23, &srv::handle_fast, &service); - server->reg(24, &srv::handle_slow, &service); - server->reg(25, &srv::handle_bigrep, &service); + server = new rpcs(port); + server->reg(22, &srv::handle_22, &service); + server->reg(23, &srv::handle_fast, &service); + server->reg(24, &srv::handle_slow, &service); + server->reg(25, &srv::handle_bigrep, &service); } void testmarshall() { - marshall m; - request_header rh{1,2,3,4,5}; - m.pack_req_header(rh); - VERIFY(m.size()==RPC_HEADER_SZ); - int i = 12345; - unsigned long long l = 1223344455L; - std::string s = std::string("hallo...."); - m << i; - m << l; - m << s; - - char *b; - int sz; - m.take_buf(&b,&sz); - VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); - - unmarshall un(b,sz); - request_header rh1; - un.unpack_req_header(&rh1); - VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); - int i1; - unsigned long long l1; - std::string s1; - un >> i1; - un >> l1; - un >> s1; - VERIFY(un.okdone()); - VERIFY(i1==i && l1==l && s1==s); + marshall m; + request_header rh{1,2,3,4,5}; + m.pack_header(rh); + VERIFY(((string)m).size()==RPC_HEADER_SZ); + int i = 12345; + unsigned long long l = 1223344455L; + string s = "hallo...."; + m << i; + m << l; + m << s; + + string b = m; + VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + + unmarshall un(b, true); + request_header rh1; + un.unpack_header(rh1); + VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); + int i1; + unsigned long long l1; + string s1; + un >> i1; + un >> l1; + un >> s1; + VERIFY(un.okdone()); + VERIFY(i1==i && l1==l && s1==s); } void -client1(int cl) +client1(size_t cl) { - // test concurrency. - int which_cl = ((unsigned long) cl ) % NUM_CL; - - for(int i = 0; i < 100; i++){ - int arg = (random() % 2000); - std::string rep; - int ret = clients[which_cl]->call(25, rep, arg); - VERIFY(ret == 0); - if ((int)rep.size()!=arg) { - printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); - } - VERIFY((int)rep.size() == arg); - } - - // test rpc replies coming back not in the order of - // the original calls -- i.e. does xid reply dispatch work. - for(int i = 0; i < 100; i++){ - int which = (random() % 2); - int arg = (random() % 1000); - int rep; - - auto start = std::chrono::steady_clock::now(); - - int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); - auto end = std::chrono::steady_clock::now(); - int diff = std::chrono::duration_cast(end - start).count(); - if (ret != 0) - printf("%d ms have elapsed!!!\n", diff); - VERIFY(ret == 0); - VERIFY(rep == (which ? arg+1 : arg+2)); - } + // test concurrency. + size_t which_cl = cl % NUM_CL; + + for(int i = 0; i < 100; i++){ + int arg = (random() % 2000); + string rep; + int ret = clients[which_cl]->call(25, rep, arg); + VERIFY(ret == 0); + if ((int)rep.size()!=arg) + cout << "repsize wrong " << rep.size() << "!=" << arg << endl; + VERIFY((int)rep.size() == arg); + } + + // test rpc replies coming back not in the order of + // the original calls -- i.e. does xid reply dispatch work. + for(int i = 0; i < 100; i++){ + int which = (random() % 2); + int arg = (random() % 1000); + int rep; + + auto start = steady_clock::now(); + + int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); + auto end = steady_clock::now(); + auto diff = duration_cast(end - start).count(); + if (ret != 0) + cout << diff << " ms have elapsed!!!" << endl; + VERIFY(ret == 0); + VERIFY(rep == (which ? arg+1 : arg+2)); + } } void -client2(int cl) +client2(size_t cl) { - int which_cl = ((unsigned long) cl ) % NUM_CL; - - time_t t1; - time(&t1); - - while(time(0) - t1 < 10){ - int arg = (random() % 2000); - std::string rep; - int ret = clients[which_cl]->call(25, rep, arg); - if ((int)rep.size()!=arg) { - printf("ask for %d reply got %d ret %d\n", - arg, (int)rep.size(), ret); - } - VERIFY((int)rep.size() == arg); - } + size_t which_cl = cl % NUM_CL; + + time_t t1; + time(&t1); + + while(time(0) - t1 < 10){ + int arg = (random() % 2000); + string rep; + int ret = clients[which_cl]->call(25, rep, arg); + if ((int)rep.size()!=arg) + cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; + VERIFY((int)rep.size() == arg); + } } void client3(void *xx) { - rpcc *c = (rpcc *) xx; + rpcc *c = (rpcc *) xx; - for(int i = 0; i < 4; i++){ - int rep; - int ret = c->call_timeout(24, rpcc::to(3000), rep, i); - VERIFY(ret == rpc_const::timeout_failure || rep == i+2); - } + for(int i = 0; i < 4; i++){ + int rep = 0; + int ret = c->call_timeout(24, rpcc::to(300), rep, i); + VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + } } void simple_tests(rpcc *c) { - printf("simple_tests\n"); - // an RPC call to procedure #22. - // rpcc::call() looks at the argument types to decide how - // to marshall the RPC call packet, and how to unmarshall - // the reply packet. - std::string rep; - int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye"); - VERIFY(intret == 0); // this is what handle_22 returns - VERIFY(rep == "hello goodbye"); - printf(" -- string concat RPC .. ok\n"); - - // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); - VERIFY(intret == 0); - VERIFY(rep.size() == 70000); - printf(" -- small request, big reply .. ok\n"); - - // specify a timeout value to an RPC that should succeed (udp) - int xx = 0; - intret = c->call_timeout(23, rpcc::to(3000), xx, 77); - VERIFY(intret == 0 && xx == 78); - printf(" -- no spurious timeout .. ok\n"); - - // specify a timeout value to an RPC that should succeed (tcp) - { - std::string arg(1000, 'x'); - std::string rep; - c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x"); - VERIFY(rep.size() == 1001); - printf(" -- no spurious timeout .. ok\n"); - } - - // huge RPC - std::string big(1000000, 'x'); - intret = c->call(22, rep, big, (std::string)"z"); - VERIFY(rep.size() == 1000001); - printf(" -- huge 1M rpc request .. ok\n"); - - // specify a timeout value to an RPC that should timeout (udp) - struct sockaddr_in non_existent; - memset(&non_existent, 0, sizeof(non_existent)); - non_existent.sin_family = AF_INET; - non_existent.sin_addr.s_addr = inet_addr("127.0.0.1"); - non_existent.sin_port = htons(7661); - rpcc *c1 = new rpcc(non_existent); - time_t t0 = time(0); - intret = c1->bind(rpcc::to(3000)); - time_t t1 = time(0); - VERIFY(intret < 0 && (t1 - t0) <= 4); - printf(" -- rpc timeout .. ok\n"); - printf("simple_tests OK\n"); + cout << "simple_tests" << endl; + // an RPC call to procedure #22. + // rpcc::call() looks at the argument types to decide how + // to marshall the RPC call packet, and how to unmarshall + // the reply packet. + string rep; + int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == 0); // this is what handle_22 returns + VERIFY(rep == "hello goodbye"); + cout << " -- string concat RPC .. ok" << endl; + + // small request, big reply (perhaps req via UDP, reply via TCP) + intret = c->call_timeout(25, rpcc::to(20000), rep, 70000); + VERIFY(intret == 0); + VERIFY(rep.size() == 70000); + cout << " -- small request, big reply .. ok" << endl; + + // specify a timeout value to an RPC that should succeed (udp) + int xx = 0; + intret = c->call_timeout(23, rpcc::to(300), xx, 77); + VERIFY(intret == 0 && xx == 78); + cout << " -- no spurious timeout .. ok" << endl; + + // specify a timeout value to an RPC that should succeed (tcp) + { + string arg(1000, 'x'); + string rep2; + c->call_timeout(22, rpcc::to(300), rep2, arg, (string)"x"); + VERIFY(rep2.size() == 1001); + cout << " -- no spurious timeout .. ok" << endl; + } + + // huge RPC + string big(1000000, 'x'); + intret = c->call(22, rep, big, (string)"z"); + VERIFY(rep.size() == 1000001); + cout << " -- huge 1M rpc request .. ok" << endl; + + // specify a timeout value to an RPC that should timeout (udp) + string non_existent = "127.0.0.1:7661"; + rpcc *c1 = new rpcc(non_existent); + time_t t0 = time(0); + intret = c1->bind(rpcc::to(300)); + time_t t1 = time(0); + VERIFY(intret < 0 && (t1 - t0) <= 4); + cout << " -- rpc timeout .. ok" << endl; + cout << "simple_tests OK" << endl; } void -concurrent_test(int nt) +concurrent_test(size_t nt) { - // create threads that make lots of calls in parallel, - // to test thread synchronization for concurrent calls - // and dispatches. - printf("start concurrent_test (%d threads) ...", nt); + // create threads that make lots of calls in parallel, + // to test thread synchronization for concurrent calls + // and dispatches. + cout << "start concurrent_test (" << nt << " threads) ..."; + + vector th(nt); - std::vector th(nt); - for(int i = 0; i < nt; i++){ - th[i] = std::thread(client1, i); - } + for(size_t i = 0; i < nt; i++) + th[i] = thread(client1, i); - for(int i = 0; i < nt; i++){ + for(size_t i = 0; i < nt; i++) th[i].join(); - } - printf(" OK\n"); + + cout << " OK" << endl; } void lossy_test() { - printf("start lossy_test ..."); - VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); - - if (server) { - delete server; - startserver(); - } - - for (int i = 0; i < NUM_CL; i++) { - delete clients[i]; - clients[i] = new rpcc(dst); - VERIFY(clients[i]->bind()==0); - } - - int nt = 1; - std::vector th(nt); - for(int i = 0; i < nt; i++){ - th[i] = std::thread(client2, i); - } - for(int i = 0; i < nt; i++){ + cout << "start lossy_test ..."; + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + + if (server) { + delete server; + startserver(); + } + + for (int i = 0; i < NUM_CL; i++) { + delete clients[i]; + clients[i] = new rpcc(dst); + VERIFY(clients[i]->bind()==0); + } + + size_t nt = 1; + + vector th(nt); + + for(size_t i = 0; i < nt; i++) + th[i] = thread(client2, i); + + for(size_t i = 0; i < nt; i++) th[i].join(); - } - printf(".. OK\n"); - VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); + + cout << ".. OK" << endl; + VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } void failure_test() { - rpcc *client1; - rpcc *client = clients[0]; + rpcc *client1; + rpcc *client = clients[0]; + + cout << "failure_test" << endl; - printf("failure_test\n"); + delete server; - delete server; + client1 = new rpcc(dst); + VERIFY (client1->bind(rpcc::to(3000)) < 0); + cout << " -- create new client and try to bind to failed server .. failed ok" << endl; - client1 = new rpcc(dst); - VERIFY (client1->bind(rpcc::to(3000)) < 0); - printf(" -- create new client and try to bind to failed server .. failed ok\n"); + delete client1; - delete client1; + startserver(); - startserver(); + string rep; + int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == rpc_const::oldsrv_failure); + cout << " -- call recovered server with old client .. failed ok" << endl; - std::string rep; - int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); - VERIFY(intret == rpc_const::oldsrv_failure); - printf(" -- call recovered server with old client .. failed ok\n"); + delete client; - delete client; + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + VERIFY (client->bind() < 0); - clients[0] = client = new rpcc(dst); - VERIFY (client->bind() >= 0); - VERIFY (client->bind() < 0); + intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == 0); + VERIFY(rep == "hello goodbye"); - intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye"); - VERIFY(intret == 0); - VERIFY(rep == "hello goodbye"); + cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl; - printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); + size_t nt = 10; + cout << " -- concurrent test on new rpc client w/ " << nt << " threads .."; - int nt = 10; - printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); + vector th(nt); - std::vector th(nt); - for(int i = 0; i < nt; i++){ - th[i] = std::thread(client3, client); - } + for(size_t i = 0; i < nt; i++) + th[i] = thread(client3, client); - for(int i = 0; i < nt; i++){ + for(size_t i = 0; i < nt; i++) th[i].join(); - } - printf("ok\n"); - delete server; - delete client; + cout << "ok" << endl; - startserver(); - clients[0] = client = new rpcc(dst); - VERIFY (client->bind() >= 0); - printf(" -- delete existing rpc client and server, create replacements.. ok\n"); + delete server; + delete client; - printf(" -- concurrent test on new client and server w/ %d threads ..", nt); - for(int i = 0; i < nt; i++){ - th[i] = std::thread(client3, client); - } + startserver(); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + cout << " -- delete existing rpc client and server, create replacements.. ok" << endl; - for(int i = 0; i < nt; i++){ + cout << " -- concurrent test on new client and server w/ " << nt << " threads .."; + + for(size_t i = 0; i < nt; i++) + th[i] = thread(client3, client); + + for(size_t i = 0; i < nt; i++) th[i].join(); - } - printf("ok\n"); - printf("failure_test OK\n"); + cout << "ok" << endl; + + cout << "failure_test OK" << endl; } int main(int argc, char *argv[]) { - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); - int debug_level = 0; - - bool isclient = false; - bool isserver = false; - - srandom(getpid()); - port = 20000 + (getpid() % 10000); - - char ch = 0; - while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { - switch (ch) { - case 'c': - isclient = true; - break; - case 's': - isserver = true; - break; - case 'd': - debug_level = atoi(optarg); - break; - case 'p': - port = atoi(optarg); - break; - case 'l': - VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); - default: - break; - } - } - - if (!isserver && !isclient) { - isserver = isclient = true; - } - - if (debug_level > 0) { - JSL_DEBUG_LEVEL = debug_level; - jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); - } - - testmarshall(); - - if (isserver) { - printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ); - startserver(); - } - - if (isclient) { - // server's address. - memset(&dst, 0, sizeof(dst)); - dst.sin_family = AF_INET; - dst.sin_addr.s_addr = inet_addr("127.0.0.1"); - dst.sin_port = htons(port); - - - // start the client. bind it to the server. - // starts a thread to listen for replies and hand them to - // the correct waiting caller thread. there should probably - // be only one rpcc per process. you probably need one - // rpcc per server. - for (int i = 0; i < NUM_CL; i++) { - clients[i] = new rpcc(dst); - VERIFY (clients[i]->bind() == 0); - } - - simple_tests(clients[0]); - concurrent_test(10); - lossy_test(); - if (isserver) { - failure_test(); - } - - printf("rpctest OK\n"); - - exit(0); - } - - while (1) { - sleep(1); - } + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + int debug_level = 0; + + bool isclient = false; + bool isserver = false; + + srandom((uint32_t)getpid()); + port = 20000 + (getpid() % 10000); + + int ch = 0; + while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { + switch (ch) { + case 'c': + isclient = true; + break; + case 's': + isserver = true; + break; + case 'd': + debug_level = atoi(optarg); + break; + case 'p': + port = (in_port_t)atoi(optarg); + break; + case 'l': + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + break; + default: + break; + } + } + + if (!isserver && !isclient) { + isserver = isclient = true; + } + + if (debug_level > 0) { + DEBUG_LEVEL = debug_level; + IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level); + } + + testmarshall(); + + if (isserver) { + cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; + startserver(); + } + + if (isclient) { + // server's address. + dst = "127.0.0.1:" + to_string(port); + + + // start the client. bind it to the server. + // starts a thread to listen for replies and hand them to + // the correct waiting caller thread. there should probably + // be only one rpcc per process. you probably need one + // rpcc per server. + for (int i = 0; i < NUM_CL; i++) { + clients[i] = new rpcc(dst); + VERIFY (clients[i]->bind() == 0); + } + + simple_tests(clients[0]); + concurrent_test(10); + lossy_test(); + if (isserver) { + failure_test(); + } + + cout << "rpctest OK" << endl; + + exit(0); + } + + while (1) { + usleep(100000); + } }