X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/blobdiff_plain/24bebc0ecf83446c7371eff69042322aab34976a..ab6c1548ac2b1907bca92c8ce43e919c1a649a6f:/rpc/rpctest.cc diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 7217b25..0435ab1 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -5,17 +5,21 @@ #include "rpc.h" #include #include -#include #include +#include +#include "threaded_log.h" #define NUM_CL 2 -char log_thread_prefix = 'r'; +static rpcs *server; // server rpc object +static rpcc *clients[NUM_CL]; // client rpc object +static string * dst; //server's ip address +static in_port_t port; -rpcs *server; // server rpc object -rpcc *clients[NUM_CL]; // client rpc object -string dst; //server's ip address -int port; +using std::cout; +using std::endl; +using namespace std::chrono; +using std::vector; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers @@ -23,11 +27,20 @@ int port; class srv { public: int handle_22(string & r, const string a, const string b); - int handle_fast(int &r, const int a); - int handle_slow(int &r, const int a); - int handle_bigrep(string &r, const size_t a); + int handle_fast(int & r, const int a); + int handle_slow(int & r, const int a); + int handle_bigrep(string & r, const size_t a); }; +namespace srv_protocol { + using status = rpc_protocol::status; + REMOTE_PROCEDURE_BASE(0); + REMOTE_PROCEDURE(22, _22, (string &, string, string)); + REMOTE_PROCEDURE(23, fast, (int &, int)); + REMOTE_PROCEDURE(24, slow, (int &, int)); + REMOTE_PROCEDURE(25, bigrep, (string &, size_t)); +} + // a handler. a and b are arguments, r is the result. // there can be multiple arguments but only one result. // the caller also gets to see the int return value @@ -35,107 +48,102 @@ class srv { // rpcs::reg() decides how to unmarshall by looking // at these argument types, so this function definition // does what a .x file does in SunRPC. -int -srv::handle_22(string &r, const string a, string b) -{ +int srv::handle_22(string & r, const string a, string b) { r = a + b; return 0; } -int -srv::handle_fast(int &r, const int a) -{ +int srv::handle_fast(int & r, const int a) { r = a + 1; return 0; } -int -srv::handle_slow(int &r, const int a) -{ - usleep(random() % 5000); +int srv::handle_slow(int & r, const int a) { + int us = std::uniform_int_distribution<>(0,500)(global->random_generator); + std::this_thread::sleep_for(microseconds(us)); r = a + 2; return 0; } -int -srv::handle_bigrep(string &r, const size_t len) -{ - r = string((size_t)len, 'x'); +int srv::handle_bigrep(string & r, const size_t len) { + r = string(len, 'x'); return 0; } -srv service; +static srv service; -void startserver() -{ - server = new rpcs((unsigned int)port); - server->reg(22, &srv::handle_22, &service); - server->reg(23, &srv::handle_fast, &service); - server->reg(24, &srv::handle_slow, &service); - server->reg(25, &srv::handle_bigrep, &service); +static void startserver() { + server = new rpcs(port); + server->reg(srv_protocol::_22, &srv::handle_22, &service); + server->reg(srv_protocol::fast, &srv::handle_fast, &service); + server->reg(srv_protocol::slow, &srv::handle_slow, &service); + server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service); + server->start(); } -void -testmarshall() -{ +static void testmarshall() { marshall m; - request_header rh{1,2,3,4,5}; - m.pack_req_header(rh); - VERIFY(m.size()==RPC_HEADER_SZ); + rpc_protocol::request_header rh{1,2,3,4,5}; + m.write_header(rh); + VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; + size_t sz = 101010101; string s = "hallo...."; + string bin("\x00\x00\x00\x00\x00\x00\x00\x40\x00\x00\x7f\xe5", 12); m << i; m << l; m << s; + m << sz; + m << bin; - char *b; - size_t sz; - m.take_buf(&b,&sz); - VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + string b = m; + VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size()); - unmarshall un(b,sz); - request_header rh1; - un.unpack_req_header(&rh1); + unmarshall un(b, true); + rpc_protocol::request_header rh1; + un.read_header(rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; unsigned long long l1; string s1; + string bin1; + size_t sz1; un >> i1; un >> l1; un >> s1; + un >> sz1; + un >> bin1; VERIFY(un.okdone()); - VERIFY(i1==i && l1==l && s1==s); + VERIFY(i1==i && l1==l && s1==s && sz1==sz && bin1==bin); } -void -client1(size_t cl) -{ +static void client1(size_t cl) { // test concurrency. size_t which_cl = cl % NUM_CL; for(int i = 0; i < 100; i++){ - int arg = (random() % 2000); + auto arg = std::uniform_int_distribution(0,2000)(global->random_generator); string rep; - int ret = clients[which_cl]->call(25, rep, arg); + int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg); VERIFY(ret == 0); - if ((int)rep.size()!=arg) + if ((unsigned long)rep.size()!=arg) cout << "repsize wrong " << rep.size() << "!=" << arg << endl; - VERIFY((int)rep.size() == arg); + VERIFY((unsigned long)rep.size() == arg); } // test rpc replies coming back not in the order of // the original calls -- i.e. does xid reply dispatch work. for(int i = 0; i < 100; i++){ - int which = (random() % 2); - int arg = (random() % 1000); - int rep; + bool which = std::bernoulli_distribution()(global->random_generator); + int arg = std::uniform_int_distribution<>(0,1000)(global->random_generator); + int rep = -1; - auto start = std::chrono::steady_clock::now(); + auto start = steady_clock::now(); - int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); - auto end = std::chrono::steady_clock::now(); - auto diff = std::chrono::duration_cast(end - start).count(); + int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg); + auto end = steady_clock::now(); + auto diff = duration_cast(end - start).count(); if (ret != 0) cout << diff << " ms have elapsed!!!" << endl; VERIFY(ret == 0); @@ -143,60 +151,53 @@ client1(size_t cl) } } -void -client2(size_t cl) -{ +static void client2(size_t cl) { size_t which_cl = cl % NUM_CL; time_t t1; time(&t1); while(time(0) - t1 < 10){ - int arg = (random() % 2000); + auto arg = std::uniform_int_distribution(0,2000)(global->random_generator); string rep; - int ret = clients[which_cl]->call(25, rep, arg); - if ((int)rep.size()!=arg) + int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg); + if ((unsigned long)rep.size()!=arg) cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl; - VERIFY((int)rep.size() == arg); + VERIFY((unsigned long)rep.size() == arg); } } -void -client3(void *xx) -{ +static void client3(void *xx) { rpcc *c = (rpcc *) xx; for(int i = 0; i < 4; i++){ int rep = 0; - int ret = c->call_timeout(24, rpcc::to(3000), rep, i); - VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i); + VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2); } } - -void -simple_tests(rpcc *c) -{ +static void simple_tests(rpcc *c) { cout << "simple_tests" << endl; // an RPC call to procedure #22. // rpcc::call() looks at the argument types to decide how // to marshall the RPC call packet, and how to unmarshall // the reply packet. string rep; - int intret = c->call(22, rep, (string)"hello", (string)" goodbye"); + int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); // this is what handle_22 returns VERIFY(rep == "hello goodbye"); cout << " -- string concat RPC .. ok" << endl; // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(25, rpcc::to(200000), rep, 70000); + intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul); VERIFY(intret == 0); VERIFY(rep.size() == 70000); cout << " -- small request, big reply .. ok" << endl; // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call_timeout(23, rpcc::to(3000), xx, 77); + intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77); VERIFY(intret == 0 && xx == 78); cout << " -- no spurious timeout .. ok" << endl; @@ -204,14 +205,15 @@ simple_tests(rpcc *c) { string arg(1000, 'x'); string rep2; - c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x"); + c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x"); VERIFY(rep2.size() == 1001); cout << " -- no spurious timeout .. ok" << endl; } // huge RPC string big(1000000, 'x'); - intret = c->call(22, rep, big, (string)"z"); + intret = c->call(srv_protocol::_22, rep, big, (string)"z"); + VERIFY(intret == 0); VERIFY(rep.size() == 1000001); cout << " -- huge 1M rpc request .. ok" << endl; @@ -219,16 +221,14 @@ simple_tests(rpcc *c) string non_existent = "127.0.0.1:7661"; rpcc *c1 = new rpcc(non_existent); time_t t0 = time(0); - intret = c1->bind(rpcc::to(3000)); + intret = c1->bind(milliseconds(300)); time_t t1 = time(0); VERIFY(intret < 0 && (t1 - t0) <= 4); cout << " -- rpc timeout .. ok" << endl; cout << "simple_tests OK" << endl; } -void -concurrent_test(size_t nt) -{ +static void concurrent_test(size_t nt) { // create threads that make lots of calls in parallel, // to test thread synchronization for concurrent calls // and dispatches. @@ -245,9 +245,7 @@ concurrent_test(size_t nt) cout << " OK" << endl; } -void -lossy_test() -{ +static void lossy_test() { cout << "start lossy_test ..."; VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); @@ -258,7 +256,7 @@ lossy_test() for (int i = 0; i < NUM_CL; i++) { delete clients[i]; - clients[i] = new rpcc(dst); + clients[i] = new rpcc(*dst); VERIFY(clients[i]->bind()==0); } @@ -276,9 +274,7 @@ lossy_test() VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); } -void -failure_test() -{ +static void failure_test() { rpcc *client1; rpcc *client = clients[0]; @@ -286,8 +282,8 @@ failure_test() delete server; - client1 = new rpcc(dst); - VERIFY (client1->bind(rpcc::to(3000)) < 0); + client1 = new rpcc(*dst); + VERIFY (client1->bind(milliseconds(3000)) < 0); cout << " -- create new client and try to bind to failed server .. failed ok" << endl; delete client1; @@ -295,17 +291,17 @@ failure_test() startserver(); string rep; - int intret = client->call(22, rep, (string)"hello", (string)" goodbye"); - VERIFY(intret == rpc_const::oldsrv_failure); + int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); + VERIFY(intret == rpc_protocol::oldsrv_failure); cout << " -- call recovered server with old client .. failed ok" << endl; delete client; - clients[0] = client = new rpcc(dst); + clients[0] = client = new rpcc(*dst); VERIFY (client->bind() >= 0); VERIFY (client->bind() < 0); - intret = client->call(22, rep, (string)"hello", (string)" goodbye"); + intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye"); VERIFY(intret == 0); VERIFY(rep == "hello goodbye"); @@ -329,7 +325,7 @@ failure_test() delete client; startserver(); - clients[0] = client = new rpcc(dst); + clients[0] = client = new rpcc(*dst); VERIFY (client->bind() >= 0); cout << " -- delete existing rpc client and server, create replacements.. ok" << endl; @@ -346,9 +342,8 @@ failure_test() cout << "failure_test OK" << endl; } -int -main(int argc, char *argv[]) -{ +int main(int argc, char *argv[]) { + global = new t4_state('r'); setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stderr, NULL, _IONBF, 0); @@ -357,7 +352,6 @@ main(int argc, char *argv[]) bool isclient = false; bool isserver = false; - srandom((uint32_t)getpid()); port = 20000 + (getpid() % 10000); int ch = 0; @@ -373,7 +367,7 @@ main(int argc, char *argv[]) debug_level = atoi(optarg); break; case 'p': - port = atoi(optarg); + port = (in_port_t)atoi(optarg); break; case 'l': VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); @@ -388,20 +382,20 @@ main(int argc, char *argv[]) } if (debug_level > 0) { - DEBUG_LEVEL = debug_level; - IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level); + global->DEBUG_LEVEL = debug_level; + IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level; } testmarshall(); if (isserver) { - cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl; + cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl; startserver(); } if (isclient) { // server's address. - dst = "127.0.0.1:" + std::to_string(port); + dst = new string("127.0.0.1:" + std::to_string(port)); // start the client. bind it to the server. @@ -410,7 +404,7 @@ main(int argc, char *argv[]) // be only one rpcc per process. you probably need one // rpcc per server. for (int i = 0; i < NUM_CL; i++) { - clients[i] = new rpcc(dst); + clients[i] = new rpcc(*dst); VERIFY (clients[i]->bind() == 0); } @@ -426,7 +420,6 @@ main(int argc, char *argv[]) exit(0); } - while (1) { - sleep(1); - } + while (1) + std::this_thread::sleep_for(milliseconds(100)); }