1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
10 #include <sys/types.h>
14 #include "lang/verify.h"
18 rpcs *server; // server rpc object
19 rpcc *clients[NUM_CL]; // client rpc object
20 struct sockaddr_in dst; //server's ip address
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
29 int handle_22(const std::string a, const std::string b, std::string & r);
30 int handle_fast(const int a, int &r);
31 int handle_slow(const int a, int &r);
32 int handle_bigrep(const int a, std::string &r);
35 // a handler. a and b are arguments, r is the result.
36 // there can be multiple arguments but only one result.
37 // the caller also gets to see the int return value
38 // as the return value from rpcc::call().
39 // rpcs::reg() decides how to unmarshall by looking
40 // at these argument types, so this function definition
41 // does what a .x file does in SunRPC.
43 srv::handle_22(const std::string a, std::string b, std::string &r)
50 srv::handle_fast(const int a, int &r)
57 srv::handle_slow(const int a, int &r)
59 usleep(random() % 5000);
65 srv::handle_bigrep(const int len, std::string &r)
67 r = std::string(len, 'x');
75 server = new rpcs(port);
76 server->reg(22, &service, &srv::handle_22);
77 server->reg(23, &service, &srv::handle_fast);
78 server->reg(24, &service, &srv::handle_slow);
79 server->reg(25, &service, &srv::handle_bigrep);
86 req_header rh(1,2,3,4,5);
87 m.pack_req_header(rh);
88 VERIFY(m.size()==RPC_HEADER_SZ);
90 unsigned long long l = 1223344455L;
91 std::string s = std::string("hallo....");
99 VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
103 un.unpack_req_header(&rh1);
104 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
106 unsigned long long l1;
112 VERIFY(i1==i && l1==l && s1==s);
120 int which_cl = ((unsigned long) xx ) % NUM_CL;
122 for(int i = 0; i < 100; i++){
123 int arg = (random() % 2000);
125 int ret = clients[which_cl]->call(25, arg, rep);
127 if ((int)rep.size()!=arg) {
128 printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
130 VERIFY((int)rep.size() == arg);
133 // test rpc replies coming back not in the order of
134 // the original calls -- i.e. does xid reply dispatch work.
135 for(int i = 0; i < 100; i++){
136 int which = (random() % 2);
137 int arg = (random() % 1000);
140 struct timespec start,end;
141 clock_gettime(CLOCK_REALTIME, &start);
143 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
144 clock_gettime(CLOCK_REALTIME, &end);
145 int diff = diff_timespec(end, start);
147 printf("%d ms have elapsed!!!\n", diff);
149 VERIFY(rep == (which ? arg+1 : arg+2));
158 int which_cl = ((unsigned long) xx ) % NUM_CL;
163 while(time(0) - t1 < 10){
164 int arg = (random() % 2000);
166 int ret = clients[which_cl]->call(25, arg, rep);
167 if ((int)rep.size()!=arg) {
168 printf("ask for %d reply got %d ret %d\n",
169 arg, (int)rep.size(), ret);
171 VERIFY((int)rep.size() == arg);
179 rpcc *c = (rpcc *) xx;
181 for(int i = 0; i < 4; i++){
183 int ret = c->call(24, i, rep, rpcc::to(3000));
184 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
191 simple_tests(rpcc *c)
193 printf("simple_tests\n");
194 // an RPC call to procedure #22.
195 // rpcc::call() looks at the argument types to decide how
196 // to marshall the RPC call packet, and how to unmarshall
199 int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
200 VERIFY(intret == 0); // this is what handle_22 returns
201 VERIFY(rep == "hello goodbye");
202 printf(" -- string concat RPC .. ok\n");
204 // small request, big reply (perhaps req via UDP, reply via TCP)
205 intret = c->call(25, 70000, rep, rpcc::to(200000));
207 VERIFY(rep.size() == 70000);
208 printf(" -- small request, big reply .. ok\n");
212 intret = c->call(22, (std::string)"just one", rep);
214 printf(" -- too few arguments .. failed ok\n");
216 // too many arguments; proc #23 expects just one.
217 intret = c->call(23, 1001, 1002, rep);
219 printf(" -- too many arguments .. failed ok\n");
221 // wrong return value size
223 intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
225 printf(" -- wrong ret value size .. failed ok\n");
228 // specify a timeout value to an RPC that should succeed (udp)
230 intret = c->call(23, 77, xx, rpcc::to(3000));
231 VERIFY(intret == 0 && xx == 78);
232 printf(" -- no suprious timeout .. ok\n");
234 // specify a timeout value to an RPC that should succeed (tcp)
236 std::string arg(1000, 'x');
238 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
239 VERIFY(rep.size() == 1001);
240 printf(" -- no suprious timeout .. ok\n");
244 std::string big(1000000, 'x');
245 intret = c->call(22, big, (std::string)"z", rep);
246 VERIFY(rep.size() == 1000001);
247 printf(" -- huge 1M rpc request .. ok\n");
249 // specify a timeout value to an RPC that should timeout (udp)
250 struct sockaddr_in non_existent;
251 memset(&non_existent, 0, sizeof(non_existent));
252 non_existent.sin_family = AF_INET;
253 non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
254 non_existent.sin_port = htons(7661);
255 rpcc *c1 = new rpcc(non_existent);
257 intret = c1->bind(rpcc::to(3000));
259 VERIFY(intret < 0 && (t1 - t0) <= 4);
260 printf(" -- rpc timeout .. ok\n");
261 printf("simple_tests OK\n");
265 concurrent_test(int nt)
267 // create threads that make lots of calls in parallel,
268 // to test thread synchronization for concurrent calls
272 printf("start concurrent_test (%d threads) ...", nt);
275 for(int i = 0; i < nt; i++){
276 ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
280 for(int i = 0; i < nt; i++){
281 VERIFY(pthread_join(th[i], NULL) == 0);
291 printf("start lossy_test ...");
292 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
299 for (int i = 0; i < NUM_CL; i++) {
301 clients[i] = new rpcc(dst);
302 VERIFY(clients[i]->bind()==0);
307 for(int i = 0; i < nt; i++){
308 ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
311 for(int i = 0; i < nt; i++){
312 VERIFY(pthread_join(th[i], NULL) == 0);
315 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
322 rpcc *client = clients[0];
324 printf("failure_test\n");
328 client1 = new rpcc(dst);
329 VERIFY (client1->bind(rpcc::to(3000)) < 0);
330 printf(" -- create new client and try to bind to failed server .. failed ok\n");
337 int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
338 VERIFY(intret == rpc_const::oldsrv_failure);
339 printf(" -- call recovered server with old client .. failed ok\n");
343 clients[0] = client = new rpcc(dst);
344 VERIFY (client->bind() >= 0);
345 VERIFY (client->bind() < 0);
347 intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
349 VERIFY(rep == "hello goodbye");
351 printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
356 printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
359 for(int i = 0; i < nt; i++){
360 ret = pthread_create(&th[i], &attr, client3, (void *) client);
364 for(int i = 0; i < nt; i++){
365 VERIFY(pthread_join(th[i], NULL) == 0);
373 clients[0] = client = new rpcc(dst);
374 VERIFY (client->bind() >= 0);
375 printf(" -- delete existing rpc client and server, create replacements.. ok\n");
377 printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
378 for(int i = 0; i < nt; i++){
379 ret = pthread_create(&th[i], &attr, client3, (void *)client);
383 for(int i = 0; i < nt; i++){
384 VERIFY(pthread_join(th[i], NULL) == 0);
388 printf("failure_test OK\n");
392 main(int argc, char *argv[])
395 setvbuf(stdout, NULL, _IONBF, 0);
396 setvbuf(stderr, NULL, _IONBF, 0);
399 bool isclient = false;
400 bool isserver = false;
403 port = 20000 + (getpid() % 10000);
406 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
415 debug_level = atoi(optarg);
421 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
427 if (!isserver && !isclient) {
428 isserver = isclient = true;
431 if (debug_level > 0) {
432 //__loginit.initNow();
433 jsl_set_debug(debug_level);
434 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
439 pthread_attr_init(&attr);
440 // set stack size to 32K, so we don't run out of memory
441 pthread_attr_setstacksize(&attr, 32*1024);
444 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
450 memset(&dst, 0, sizeof(dst));
451 dst.sin_family = AF_INET;
452 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
453 dst.sin_port = htons(port);
456 // start the client. bind it to the server.
457 // starts a thread to listen for replies and hand them to
458 // the correct waiting caller thread. there should probably
459 // be only one rpcc per process. you probably need one
461 for (int i = 0; i < NUM_CL; i++) {
462 clients[i] = new rpcc(dst);
463 VERIFY (clients[i]->bind() == 0);
466 simple_tests(clients[0]);
473 printf("rpctest OK\n");