1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
12 #include "lang/verify.h"
16 rpcs *server; // server rpc object
17 rpcc *clients[NUM_CL]; // client rpc object
18 struct sockaddr_in dst; //server's ip address
22 // server-side handlers. they must be methods of some class
23 // to simplify rpcs::reg(). a server process can have handlers
24 // from multiple classes.
27 int handle_22(const std::string a, const std::string b, std::string & r);
28 int handle_fast(const int a, int &r);
29 int handle_slow(const int a, int &r);
30 int handle_bigrep(const int a, std::string &r);
33 // a handler. a and b are arguments, r is the result.
34 // there can be multiple arguments but only one result.
35 // the caller also gets to see the int return value
36 // as the return value from rpcc::call().
37 // rpcs::reg() decides how to unmarshall by looking
38 // at these argument types, so this function definition
39 // does what a .x file does in SunRPC.
41 srv::handle_22(const std::string a, std::string b, std::string &r)
48 srv::handle_fast(const int a, int &r)
55 srv::handle_slow(const int a, int &r)
57 usleep(random() % 5000);
63 srv::handle_bigrep(const int len, std::string &r)
65 r = std::string(len, 'x');
73 server = new rpcs(port);
74 server->reg(22, &service, &srv::handle_22);
75 server->reg(23, &service, &srv::handle_fast);
76 server->reg(24, &service, &srv::handle_slow);
77 server->reg(25, &service, &srv::handle_bigrep);
84 req_header rh(1,2,3,4,5);
85 m.pack_req_header(rh);
86 VERIFY(m.size()==RPC_HEADER_SZ);
88 unsigned long long l = 1223344455L;
89 std::string s = std::string("hallo....");
97 VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
101 un.unpack_req_header(&rh1);
102 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
104 unsigned long long l1;
110 VERIFY(i1==i && l1==l && s1==s);
118 int which_cl = ((unsigned long) xx ) % NUM_CL;
120 for(int i = 0; i < 100; i++){
121 int arg = (random() % 2000);
123 int ret = clients[which_cl]->call(25, arg, rep);
125 if ((int)rep.size()!=arg) {
126 printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
128 VERIFY((int)rep.size() == arg);
131 // test rpc replies coming back not in the order of
132 // the original calls -- i.e. does xid reply dispatch work.
133 for(int i = 0; i < 100; i++){
134 int which = (random() % 2);
135 int arg = (random() % 1000);
138 struct timespec start,end;
139 clock_gettime(CLOCK_REALTIME, &start);
141 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
142 clock_gettime(CLOCK_REALTIME, &end);
143 int diff = diff_timespec(end, start);
145 printf("%d ms have elapsed!!!\n", diff);
147 VERIFY(rep == (which ? arg+1 : arg+2));
156 int which_cl = ((unsigned long) xx ) % NUM_CL;
161 while(time(0) - t1 < 10){
162 int arg = (random() % 2000);
164 int ret = clients[which_cl]->call(25, arg, rep);
165 if ((int)rep.size()!=arg) {
166 printf("ask for %d reply got %d ret %d\n",
167 arg, (int)rep.size(), ret);
169 VERIFY((int)rep.size() == arg);
177 rpcc *c = (rpcc *) xx;
179 for(int i = 0; i < 4; i++){
181 int ret = c->call(24, i, rep, rpcc::to(3000));
182 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
189 simple_tests(rpcc *c)
191 printf("simple_tests\n");
192 // an RPC call to procedure #22.
193 // rpcc::call() looks at the argument types to decide how
194 // to marshall the RPC call packet, and how to unmarshall
197 int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
198 VERIFY(intret == 0); // this is what handle_22 returns
199 VERIFY(rep == "hello goodbye");
200 printf(" -- string concat RPC .. ok\n");
202 // small request, big reply (perhaps req via UDP, reply via TCP)
203 intret = c->call(25, 70000, rep, rpcc::to(200000));
205 VERIFY(rep.size() == 70000);
206 printf(" -- small request, big reply .. ok\n");
210 intret = c->call(22, (std::string)"just one", rep);
212 printf(" -- too few arguments .. failed ok\n");
214 // too many arguments; proc #23 expects just one.
215 intret = c->call(23, 1001, 1002, rep);
217 printf(" -- too many arguments .. failed ok\n");
219 // wrong return value size
221 intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
223 printf(" -- wrong ret value size .. failed ok\n");
226 // specify a timeout value to an RPC that should succeed (udp)
228 intret = c->call(23, 77, xx, rpcc::to(3000));
229 VERIFY(intret == 0 && xx == 78);
230 printf(" -- no suprious timeout .. ok\n");
232 // specify a timeout value to an RPC that should succeed (tcp)
234 std::string arg(1000, 'x');
236 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
237 VERIFY(rep.size() == 1001);
238 printf(" -- no suprious timeout .. ok\n");
242 std::string big(1000000, 'x');
243 intret = c->call(22, big, (std::string)"z", rep);
244 VERIFY(rep.size() == 1000001);
245 printf(" -- huge 1M rpc request .. ok\n");
247 // specify a timeout value to an RPC that should timeout (udp)
248 struct sockaddr_in non_existent;
249 memset(&non_existent, 0, sizeof(non_existent));
250 non_existent.sin_family = AF_INET;
251 non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
252 non_existent.sin_port = htons(7661);
253 rpcc *c1 = new rpcc(non_existent);
255 intret = c1->bind(rpcc::to(3000));
257 VERIFY(intret < 0 && (t1 - t0) <= 4);
258 printf(" -- rpc timeout .. ok\n");
259 printf("simple_tests OK\n");
263 concurrent_test(int nt)
265 // create threads that make lots of calls in parallel,
266 // to test thread synchronization for concurrent calls
270 printf("start concurrent_test (%d threads) ...", nt);
273 for(int i = 0; i < nt; i++){
274 ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
278 for(int i = 0; i < nt; i++){
279 VERIFY(pthread_join(th[i], NULL) == 0);
289 printf("start lossy_test ...");
290 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
297 for (int i = 0; i < NUM_CL; i++) {
299 clients[i] = new rpcc(dst);
300 VERIFY(clients[i]->bind()==0);
305 for(int i = 0; i < nt; i++){
306 ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
309 for(int i = 0; i < nt; i++){
310 VERIFY(pthread_join(th[i], NULL) == 0);
313 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
320 rpcc *client = clients[0];
322 printf("failure_test\n");
326 client1 = new rpcc(dst);
327 VERIFY (client1->bind(rpcc::to(3000)) < 0);
328 printf(" -- create new client and try to bind to failed server .. failed ok\n");
335 int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
336 VERIFY(intret == rpc_const::oldsrv_failure);
337 printf(" -- call recovered server with old client .. failed ok\n");
341 clients[0] = client = new rpcc(dst);
342 VERIFY (client->bind() >= 0);
343 VERIFY (client->bind() < 0);
345 intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
347 VERIFY(rep == "hello goodbye");
349 printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
354 printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
357 for(int i = 0; i < nt; i++){
358 ret = pthread_create(&th[i], &attr, client3, (void *) client);
362 for(int i = 0; i < nt; i++){
363 VERIFY(pthread_join(th[i], NULL) == 0);
371 clients[0] = client = new rpcc(dst);
372 VERIFY (client->bind() >= 0);
373 printf(" -- delete existing rpc client and server, create replacements.. ok\n");
375 printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
376 for(int i = 0; i < nt; i++){
377 ret = pthread_create(&th[i], &attr, client3, (void *)client);
381 for(int i = 0; i < nt; i++){
382 VERIFY(pthread_join(th[i], NULL) == 0);
386 printf("failure_test OK\n");
390 main(int argc, char *argv[])
393 setvbuf(stdout, NULL, _IONBF, 0);
394 setvbuf(stderr, NULL, _IONBF, 0);
397 bool isclient = false;
398 bool isserver = false;
401 port = 20000 + (getpid() % 10000);
404 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
413 debug_level = atoi(optarg);
419 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
425 if (!isserver && !isclient) {
426 isserver = isclient = true;
429 if (debug_level > 0) {
430 //__loginit.initNow();
431 jsl_set_debug(debug_level);
432 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
437 pthread_attr_init(&attr);
438 // set stack size to 32K, so we don't run out of memory
439 pthread_attr_setstacksize(&attr, 32*1024);
442 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
448 memset(&dst, 0, sizeof(dst));
449 dst.sin_family = AF_INET;
450 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
451 dst.sin_port = htons(port);
454 // start the client. bind it to the server.
455 // starts a thread to listen for replies and hand them to
456 // the correct waiting caller thread. there should probably
457 // be only one rpcc per process. you probably need one
459 for (int i = 0; i < NUM_CL; i++) {
460 clients[i] = new rpcc(dst);
461 VERIFY (clients[i]->bind() == 0);
464 simple_tests(clients[0]);
471 printf("rpctest OK\n");