d90e494ad30d5d6468723e00463b8b0a1af972b8
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <getopt.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include "jsl_log.h"
13 #include "lang/verify.h"
14
15 #define NUM_CL 2
16
17 rpcs *server;  // server rpc object
18 rpcc *clients[NUM_CL];  // client rpc object
19 struct sockaddr_in dst; //server's ip address
20 int port;
21
22 // server-side handlers. they must be methods of some class
23 // to simplify rpcs::reg(). a server process can have handlers
24 // from multiple classes.
25 class srv {
26         public:
27                 int handle_22(const std::string a, const std::string b, std::string & r);
28                 int handle_fast(const int a, int &r);
29                 int handle_slow(const int a, int &r);
30                 int handle_bigrep(const int a, std::string &r);
31 };
32
33 // a handler. a and b are arguments, r is the result.
34 // there can be multiple arguments but only one result.
35 // the caller also gets to see the int return value
36 // as the return value from rpcc::call().
37 // rpcs::reg() decides how to unmarshall by looking
38 // at these argument types, so this function definition
39 // does what a .x file does in SunRPC.
40 int
41 srv::handle_22(const std::string a, std::string b, std::string &r)
42 {
43         r = a + b;
44         return 0;
45 }
46
47 int
48 srv::handle_fast(const int a, int &r)
49 {
50         r = a + 1;
51         return 0;
52 }
53
54 int
55 srv::handle_slow(const int a, int &r)
56 {
57         usleep(random() % 5000);
58         r = a + 2;
59         return 0;
60 }
61
62 int
63 srv::handle_bigrep(const int len, std::string &r)
64 {
65         r = std::string(len, 'x');
66         return 0;
67 }
68
69 srv service;
70
71 void startserver()
72 {
73         server = new rpcs(port);
74         server->reg(22, &service, &srv::handle_22);
75         server->reg(23, &service, &srv::handle_fast);
76         server->reg(24, &service, &srv::handle_slow);
77         server->reg(25, &service, &srv::handle_bigrep);
78 }
79
80 void
81 testmarshall()
82 {
83         marshall m;
84         req_header rh(1,2,3,4,5);
85         m.pack_req_header(rh);
86         VERIFY(m.size()==RPC_HEADER_SZ);
87         int i = 12345;
88         unsigned long long l = 1223344455L;
89         std::string s = std::string("hallo....");
90         m << i;
91         m << l;
92         m << s;
93
94         char *b;
95         int sz;
96         m.take_buf(&b,&sz);
97         VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
98
99         unmarshall un(b,sz);
100         req_header rh1;
101         un.unpack_req_header(&rh1);
102         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
103         int i1;
104         unsigned long long l1;
105         std::string s1;
106         un >> i1;
107         un >> l1;
108         un >> s1;
109         VERIFY(un.okdone());
110         VERIFY(i1==i && l1==l && s1==s);
111 }
112
113 void
114 client1(int cl)
115 {
116         // test concurrency.
117         int which_cl = ((unsigned long) cl ) % NUM_CL;
118
119         for(int i = 0; i < 100; i++){
120                 int arg = (random() % 2000);
121                 std::string rep;
122                 int ret = clients[which_cl]->call(25, arg, rep);
123                 VERIFY(ret == 0);
124                 if ((int)rep.size()!=arg) {
125                         printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
126                 }
127                 VERIFY((int)rep.size() == arg);
128         }
129
130         // test rpc replies coming back not in the order of
131         // the original calls -- i.e. does xid reply dispatch work.
132         for(int i = 0; i < 100; i++){
133                 int which = (random() % 2);
134                 int arg = (random() % 1000);
135                 int rep;
136
137                 auto start = std::chrono::steady_clock::now();
138
139                 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
140                 auto end = std::chrono::steady_clock::now();
141                 int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
142                 if (ret != 0)
143                         printf("%d ms have elapsed!!!\n", diff);
144                 VERIFY(ret == 0);
145                 VERIFY(rep == (which ? arg+1 : arg+2));
146         }
147 }
148
149 void
150 client2(int cl)
151 {
152         int which_cl = ((unsigned long) cl ) % NUM_CL;
153
154         time_t t1;
155         time(&t1);
156
157         while(time(0) - t1 < 10){
158                 int arg = (random() % 2000);
159                 std::string rep;
160                 int ret = clients[which_cl]->call(25, arg, rep);
161                 if ((int)rep.size()!=arg) {
162                         printf("ask for %d reply got %d ret %d\n",
163                                arg, (int)rep.size(), ret);
164                 }
165                 VERIFY((int)rep.size() == arg);
166         }
167 }
168
169 void
170 client3(void *xx)
171 {
172         rpcc *c = (rpcc *) xx;
173
174         for(int i = 0; i < 4; i++){
175                 int rep;
176                 int ret = c->call(24, i, rep, rpcc::to(3000));
177                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
178         }
179 }
180
181
182 void
183 simple_tests(rpcc *c)
184 {
185         printf("simple_tests\n");
186         // an RPC call to procedure #22.
187         // rpcc::call() looks at the argument types to decide how
188         // to marshall the RPC call packet, and how to unmarshall
189         // the reply packet.
190         std::string rep;
191         int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
192         VERIFY(intret == 0); // this is what handle_22 returns
193         VERIFY(rep == "hello goodbye");
194         printf("   -- string concat RPC .. ok\n");
195
196         // small request, big reply (perhaps req via UDP, reply via TCP)
197         intret = c->call(25, 70000, rep, rpcc::to(200000));
198         VERIFY(intret == 0);
199         VERIFY(rep.size() == 70000);
200         printf("   -- small request, big reply .. ok\n");
201
202 #if 0
203         // too few arguments
204         intret = c->call(22, (std::string)"just one", rep);
205         VERIFY(intret < 0);
206         printf("   -- too few arguments .. failed ok\n");
207
208         // too many arguments; proc #23 expects just one.
209         intret = c->call(23, 1001, 1002, rep);
210         VERIFY(intret < 0);
211         printf("   -- too many arguments .. failed ok\n");
212
213         // wrong return value size
214         int wrongrep;
215         intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
216         VERIFY(intret < 0);
217         printf("   -- wrong ret value size .. failed ok\n");
218 #endif
219
220         // specify a timeout value to an RPC that should succeed (udp)
221         int xx = 0;
222         intret = c->call(23, 77, xx, rpcc::to(3000));
223         VERIFY(intret == 0 && xx == 78);
224         printf("   -- no suprious timeout .. ok\n");
225
226         // specify a timeout value to an RPC that should succeed (tcp)
227         {
228                 std::string arg(1000, 'x');
229                 std::string rep;
230                 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
231                 VERIFY(rep.size() == 1001);
232                 printf("   -- no suprious timeout .. ok\n");
233         }
234
235         // huge RPC
236         std::string big(1000000, 'x');
237         intret = c->call(22, big, (std::string)"z", rep);
238         VERIFY(rep.size() == 1000001);
239         printf("   -- huge 1M rpc request .. ok\n");
240
241         // specify a timeout value to an RPC that should timeout (udp)
242         struct sockaddr_in non_existent;
243         memset(&non_existent, 0, sizeof(non_existent));
244         non_existent.sin_family = AF_INET;
245         non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
246         non_existent.sin_port = htons(7661);
247         rpcc *c1 = new rpcc(non_existent);
248         time_t t0 = time(0);
249         intret = c1->bind(rpcc::to(3000));
250         time_t t1 = time(0);
251         VERIFY(intret < 0 && (t1 - t0) <= 4);
252         printf("   -- rpc timeout .. ok\n");
253         printf("simple_tests OK\n");
254 }
255
256 void 
257 concurrent_test(int nt)
258 {
259         // create threads that make lots of calls in parallel,
260         // to test thread synchronization for concurrent calls
261         // and dispatches.
262         printf("start concurrent_test (%d threads) ...", nt);
263
264     std::vector<std::thread> th(nt);
265         for(int i = 0; i < nt; i++){
266         th[i] = std::thread(client1, i);
267         }
268
269         for(int i = 0; i < nt; i++){
270         th[i].join();
271         }
272         printf(" OK\n");
273 }
274
275 void 
276 lossy_test()
277 {
278         printf("start lossy_test ...");
279         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
280
281         if (server) {
282                 delete server;
283                 startserver();
284         }
285
286         for (int i = 0; i < NUM_CL; i++) {
287                 delete clients[i];
288                 clients[i] = new rpcc(dst);
289                 VERIFY(clients[i]->bind()==0);
290         }
291
292         int nt = 1;
293     std::vector<std::thread> th(nt);
294         for(int i = 0; i < nt; i++){
295         th[i] = std::thread(client2, i);
296         }
297         for(int i = 0; i < nt; i++){
298         th[i].join();
299         }
300         printf(".. OK\n");
301         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
302 }
303
304 void 
305 failure_test()
306 {
307         rpcc *client1;
308         rpcc *client = clients[0];
309
310         printf("failure_test\n");
311
312         delete server;
313
314         client1 = new rpcc(dst);
315         VERIFY (client1->bind(rpcc::to(3000)) < 0);
316         printf("   -- create new client and try to bind to failed server .. failed ok\n");
317
318         delete client1;
319
320         startserver();
321
322         std::string rep;
323         int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
324         VERIFY(intret == rpc_const::oldsrv_failure);
325         printf("   -- call recovered server with old client .. failed ok\n");
326
327         delete client;
328
329         clients[0] = client = new rpcc(dst);
330         VERIFY (client->bind() >= 0);
331         VERIFY (client->bind() < 0);
332
333         intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
334         VERIFY(intret == 0);
335         VERIFY(rep == "hello goodbye");
336
337         printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
338
339
340         int nt = 10;
341         printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
342
343     std::vector<std::thread> th(nt);
344         for(int i = 0; i < nt; i++){
345         th[i] = std::thread(client3, client);
346         }
347
348         for(int i = 0; i < nt; i++){
349         th[i].join();
350         }
351         printf("ok\n");
352
353         delete server;
354         delete client;
355
356         startserver();
357         clients[0] = client = new rpcc(dst);
358         VERIFY (client->bind() >= 0);
359         printf("   -- delete existing rpc client and server, create replacements.. ok\n");
360
361         printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
362         for(int i = 0; i < nt; i++){
363         th[i] = std::thread(client3, client);
364         }
365
366         for(int i = 0; i < nt; i++){
367         th[i].join();
368         }
369         printf("ok\n");
370
371         printf("failure_test OK\n");
372 }
373
374 int
375 main(int argc, char *argv[])
376 {
377
378         setvbuf(stdout, NULL, _IONBF, 0);
379         setvbuf(stderr, NULL, _IONBF, 0);
380         int debug_level = 0;
381
382         bool isclient = false;
383         bool isserver = false;
384
385         srandom(getpid());
386         port = 20000 + (getpid() % 10000);
387
388         char ch = 0;
389         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
390                 switch (ch) {
391                         case 'c':
392                                 isclient = true;
393                                 break;
394                         case 's':
395                                 isserver = true;
396                                 break;
397                         case 'd':
398                                 debug_level = atoi(optarg);
399                                 break;
400                         case 'p':
401                                 port = atoi(optarg);
402                                 break;
403                         case 'l':
404                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
405                         default:
406                                 break;
407                 }
408         }
409
410         if (!isserver && !isclient)  {
411                 isserver = isclient = true;
412         }
413
414         if (debug_level > 0) {
415                 //__loginit.initNow();
416                 jsl_set_debug(debug_level);
417                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
418         }
419
420         testmarshall();
421
422         if (isserver) {
423                 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
424                 startserver();
425         }
426
427         if (isclient) {
428                 // server's address.
429                 memset(&dst, 0, sizeof(dst));
430                 dst.sin_family = AF_INET;
431                 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
432                 dst.sin_port = htons(port);
433
434
435                 // start the client.  bind it to the server.
436                 // starts a thread to listen for replies and hand them to
437                 // the correct waiting caller thread. there should probably
438                 // be only one rpcc per process. you probably need one
439                 // rpcc per server.
440                 for (int i = 0; i < NUM_CL; i++) {
441                         clients[i] = new rpcc(dst);
442                         VERIFY (clients[i]->bind() == 0);
443                 }
444
445                 simple_tests(clients[0]);
446                 concurrent_test(10);
447                 lossy_test();
448                 if (isserver) {
449                         failure_test();
450                 }
451
452                 printf("rpctest OK\n");
453
454                 exit(0);
455         }
456
457         while (1) {
458                 sleep(1);
459         }
460 }