Build on wheezy, and presumably precise
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <getopt.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include "jsl_log.h"
13 #include "gettime.h"
14 #include "lang/verify.h"
15
16 #define NUM_CL 2
17
18 rpcs *server;  // server rpc object
19 rpcc *clients[NUM_CL];  // client rpc object
20 struct sockaddr_in dst; //server's ip address
21 int port;
22 pthread_attr_t attr;
23
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
27 class srv {
28         public:
29                 int handle_22(const std::string a, const std::string b, std::string & r);
30                 int handle_fast(const int a, int &r);
31                 int handle_slow(const int a, int &r);
32                 int handle_bigrep(const int a, std::string &r);
33 };
34
35 // a handler. a and b are arguments, r is the result.
36 // there can be multiple arguments but only one result.
37 // the caller also gets to see the int return value
38 // as the return value from rpcc::call().
39 // rpcs::reg() decides how to unmarshall by looking
40 // at these argument types, so this function definition
41 // does what a .x file does in SunRPC.
42 int
43 srv::handle_22(const std::string a, std::string b, std::string &r)
44 {
45         r = a + b;
46         return 0;
47 }
48
49 int
50 srv::handle_fast(const int a, int &r)
51 {
52         r = a + 1;
53         return 0;
54 }
55
56 int
57 srv::handle_slow(const int a, int &r)
58 {
59         usleep(random() % 5000);
60         r = a + 2;
61         return 0;
62 }
63
64 int
65 srv::handle_bigrep(const int len, std::string &r)
66 {
67         r = std::string(len, 'x');
68         return 0;
69 }
70
71 srv service;
72
73 void startserver()
74 {
75         server = new rpcs(port);
76         server->reg(22, &service, &srv::handle_22);
77         server->reg(23, &service, &srv::handle_fast);
78         server->reg(24, &service, &srv::handle_slow);
79         server->reg(25, &service, &srv::handle_bigrep);
80 }
81
82 void
83 testmarshall()
84 {
85         marshall m;
86         req_header rh(1,2,3,4,5);
87         m.pack_req_header(rh);
88         VERIFY(m.size()==RPC_HEADER_SZ);
89         int i = 12345;
90         unsigned long long l = 1223344455L;
91         std::string s = std::string("hallo....");
92         m << i;
93         m << l;
94         m << s;
95
96         char *b;
97         int sz;
98         m.take_buf(&b,&sz);
99         VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
100
101         unmarshall un(b,sz);
102         req_header rh1;
103         un.unpack_req_header(&rh1);
104         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
105         int i1;
106         unsigned long long l1;
107         std::string s1;
108         un >> i1;
109         un >> l1;
110         un >> s1;
111         VERIFY(un.okdone());
112         VERIFY(i1==i && l1==l && s1==s);
113 }
114
115 void *
116 client1(void *xx)
117 {
118
119         // test concurrency.
120         int which_cl = ((unsigned long) xx ) % NUM_CL;
121
122         for(int i = 0; i < 100; i++){
123                 int arg = (random() % 2000);
124                 std::string rep;
125                 int ret = clients[which_cl]->call(25, arg, rep);
126                 VERIFY(ret == 0);
127                 if ((int)rep.size()!=arg) {
128                         printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
129                 }
130                 VERIFY((int)rep.size() == arg);
131         }
132
133         // test rpc replies coming back not in the order of
134         // the original calls -- i.e. does xid reply dispatch work.
135         for(int i = 0; i < 100; i++){
136                 int which = (random() % 2);
137                 int arg = (random() % 1000);
138                 int rep;
139
140                 struct timespec start,end;
141                 clock_gettime(CLOCK_REALTIME, &start);
142
143                 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
144                 clock_gettime(CLOCK_REALTIME, &end);
145                 int diff = diff_timespec(end, start);
146                 if (ret != 0)
147                         printf("%d ms have elapsed!!!\n", diff);
148                 VERIFY(ret == 0);
149                 VERIFY(rep == (which ? arg+1 : arg+2));
150         }
151
152         return 0;
153 }
154
155 void *
156 client2(void *xx)
157 {
158         int which_cl = ((unsigned long) xx ) % NUM_CL;
159
160         time_t t1;
161         time(&t1);
162
163         while(time(0) - t1 < 10){
164                 int arg = (random() % 2000);
165                 std::string rep;
166                 int ret = clients[which_cl]->call(25, arg, rep);
167                 if ((int)rep.size()!=arg) {
168                         printf("ask for %d reply got %d ret %d\n",
169                                arg, (int)rep.size(), ret);
170                 }
171                 VERIFY((int)rep.size() == arg);
172         }
173         return 0;
174 }
175
176 void *
177 client3(void *xx)
178 {
179         rpcc *c = (rpcc *) xx;
180
181         for(int i = 0; i < 4; i++){
182                 int rep;
183                 int ret = c->call(24, i, rep, rpcc::to(3000));
184                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
185         }
186         return 0;
187 }
188
189
190 void
191 simple_tests(rpcc *c)
192 {
193         printf("simple_tests\n");
194         // an RPC call to procedure #22.
195         // rpcc::call() looks at the argument types to decide how
196         // to marshall the RPC call packet, and how to unmarshall
197         // the reply packet.
198         std::string rep;
199         int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
200         VERIFY(intret == 0); // this is what handle_22 returns
201         VERIFY(rep == "hello goodbye");
202         printf("   -- string concat RPC .. ok\n");
203
204         // small request, big reply (perhaps req via UDP, reply via TCP)
205         intret = c->call(25, 70000, rep, rpcc::to(200000));
206         VERIFY(intret == 0);
207         VERIFY(rep.size() == 70000);
208         printf("   -- small request, big reply .. ok\n");
209
210 #if 0
211         // too few arguments
212         intret = c->call(22, (std::string)"just one", rep);
213         VERIFY(intret < 0);
214         printf("   -- too few arguments .. failed ok\n");
215
216         // too many arguments; proc #23 expects just one.
217         intret = c->call(23, 1001, 1002, rep);
218         VERIFY(intret < 0);
219         printf("   -- too many arguments .. failed ok\n");
220
221         // wrong return value size
222         int wrongrep;
223         intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
224         VERIFY(intret < 0);
225         printf("   -- wrong ret value size .. failed ok\n");
226 #endif
227
228         // specify a timeout value to an RPC that should succeed (udp)
229         int xx = 0;
230         intret = c->call(23, 77, xx, rpcc::to(3000));
231         VERIFY(intret == 0 && xx == 78);
232         printf("   -- no suprious timeout .. ok\n");
233
234         // specify a timeout value to an RPC that should succeed (tcp)
235         {
236                 std::string arg(1000, 'x');
237                 std::string rep;
238                 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
239                 VERIFY(rep.size() == 1001);
240                 printf("   -- no suprious timeout .. ok\n");
241         }
242
243         // huge RPC
244         std::string big(1000000, 'x');
245         intret = c->call(22, big, (std::string)"z", rep);
246         VERIFY(rep.size() == 1000001);
247         printf("   -- huge 1M rpc request .. ok\n");
248
249         // specify a timeout value to an RPC that should timeout (udp)
250         struct sockaddr_in non_existent;
251         memset(&non_existent, 0, sizeof(non_existent));
252         non_existent.sin_family = AF_INET;
253         non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
254         non_existent.sin_port = htons(7661);
255         rpcc *c1 = new rpcc(non_existent);
256         time_t t0 = time(0);
257         intret = c1->bind(rpcc::to(3000));
258         time_t t1 = time(0);
259         VERIFY(intret < 0 && (t1 - t0) <= 4);
260         printf("   -- rpc timeout .. ok\n");
261         printf("simple_tests OK\n");
262 }
263
264 void 
265 concurrent_test(int nt)
266 {
267         // create threads that make lots of calls in parallel,
268         // to test thread synchronization for concurrent calls
269         // and dispatches.
270         int ret;
271
272         printf("start concurrent_test (%d threads) ...", nt);
273
274         pthread_t th[nt];
275         for(int i = 0; i < nt; i++){
276                 ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
277                 VERIFY(ret == 0);
278         }
279
280         for(int i = 0; i < nt; i++){
281                 VERIFY(pthread_join(th[i], NULL) == 0);
282         }
283         printf(" OK\n");
284 }
285
286 void 
287 lossy_test()
288 {
289         int ret;
290
291         printf("start lossy_test ...");
292         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
293
294         if (server) {
295                 delete server;
296                 startserver();
297         }
298
299         for (int i = 0; i < NUM_CL; i++) {
300                 delete clients[i];
301                 clients[i] = new rpcc(dst);
302                 VERIFY(clients[i]->bind()==0);
303         }
304
305         int nt = 1;
306         pthread_t th[nt];
307         for(int i = 0; i < nt; i++){
308                 ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
309                 VERIFY(ret == 0);
310         }
311         for(int i = 0; i < nt; i++){
312                 VERIFY(pthread_join(th[i], NULL) == 0);
313         }
314         printf(".. OK\n");
315         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
316 }
317
318 void 
319 failure_test()
320 {
321         rpcc *client1;
322         rpcc *client = clients[0];
323
324         printf("failure_test\n");
325
326         delete server;
327
328         client1 = new rpcc(dst);
329         VERIFY (client1->bind(rpcc::to(3000)) < 0);
330         printf("   -- create new client and try to bind to failed server .. failed ok\n");
331
332         delete client1;
333
334         startserver();
335
336         std::string rep;
337         int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
338         VERIFY(intret == rpc_const::oldsrv_failure);
339         printf("   -- call recovered server with old client .. failed ok\n");
340
341         delete client;
342
343         clients[0] = client = new rpcc(dst);
344         VERIFY (client->bind() >= 0);
345         VERIFY (client->bind() < 0);
346
347         intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
348         VERIFY(intret == 0);
349         VERIFY(rep == "hello goodbye");
350
351         printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
352
353
354         int nt = 10;
355         int ret;
356         printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
357
358         pthread_t th[nt];
359         for(int i = 0; i < nt; i++){
360                 ret = pthread_create(&th[i], &attr, client3, (void *) client);
361                 VERIFY(ret == 0);
362         }
363
364         for(int i = 0; i < nt; i++){
365                 VERIFY(pthread_join(th[i], NULL) == 0);
366         }
367         printf("ok\n");
368
369         delete server;
370         delete client;
371
372         startserver();
373         clients[0] = client = new rpcc(dst);
374         VERIFY (client->bind() >= 0);
375         printf("   -- delete existing rpc client and server, create replacements.. ok\n");
376
377         printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
378         for(int i = 0; i < nt; i++){
379                 ret = pthread_create(&th[i], &attr, client3, (void *)client);
380                 VERIFY(ret == 0);
381         }
382
383         for(int i = 0; i < nt; i++){
384                 VERIFY(pthread_join(th[i], NULL) == 0);
385         }
386         printf("ok\n");
387
388         printf("failure_test OK\n");
389 }
390
391 int
392 main(int argc, char *argv[])
393 {
394
395         setvbuf(stdout, NULL, _IONBF, 0);
396         setvbuf(stderr, NULL, _IONBF, 0);
397         int debug_level = 0;
398
399         bool isclient = false;
400         bool isserver = false;
401
402         srandom(getpid());
403         port = 20000 + (getpid() % 10000);
404
405         char ch = 0;
406         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
407                 switch (ch) {
408                         case 'c':
409                                 isclient = true;
410                                 break;
411                         case 's':
412                                 isserver = true;
413                                 break;
414                         case 'd':
415                                 debug_level = atoi(optarg);
416                                 break;
417                         case 'p':
418                                 port = atoi(optarg);
419                                 break;
420                         case 'l':
421                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
422                         default:
423                                 break;
424                 }
425         }
426
427         if (!isserver && !isclient)  {
428                 isserver = isclient = true;
429         }
430
431         if (debug_level > 0) {
432                 //__loginit.initNow();
433                 jsl_set_debug(debug_level);
434                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
435         }
436
437         testmarshall();
438
439         pthread_attr_init(&attr);
440         // set stack size to 32K, so we don't run out of memory
441         pthread_attr_setstacksize(&attr, 32*1024);
442
443         if (isserver) {
444                 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
445                 startserver();
446         }
447
448         if (isclient) {
449                 // server's address.
450                 memset(&dst, 0, sizeof(dst));
451                 dst.sin_family = AF_INET;
452                 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
453                 dst.sin_port = htons(port);
454
455
456                 // start the client.  bind it to the server.
457                 // starts a thread to listen for replies and hand them to
458                 // the correct waiting caller thread. there should probably
459                 // be only one rpcc per process. you probably need one
460                 // rpcc per server.
461                 for (int i = 0; i < NUM_CL; i++) {
462                         clients[i] = new rpcc(dst);
463                         VERIFY (clients[i]->bind() == 0);
464                 }
465
466                 simple_tests(clients[0]);
467                 concurrent_test(10);
468                 lossy_test();
469                 if (isserver) {
470                         failure_test();
471                 }
472
473                 printf("rpctest OK\n");
474
475                 exit(0);
476         }
477
478         while (1) {
479                 sleep(1);
480         }
481 }