Cleanups
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <getopt.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include "jsl_log.h"
13 #include "lang/verify.h"
14
15 #define NUM_CL 2
16
17 char tprintf_thread_prefix = 'r';
18
19 rpcs *server;  // server rpc object
20 rpcc *clients[NUM_CL];  // client rpc object
21 struct sockaddr_in dst; //server's ip address
22 int port;
23
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
27 class srv {
28         public:
29                 int handle_22(std::string & r, const std::string a, const std::string b);
30                 int handle_fast(int &r, const int a);
31                 int handle_slow(int &r, const int a);
32                 int handle_bigrep(std::string &r, const size_t a);
33 };
34
35 // a handler. a and b are arguments, r is the result.
36 // there can be multiple arguments but only one result.
37 // the caller also gets to see the int return value
38 // as the return value from rpcc::call().
39 // rpcs::reg() decides how to unmarshall by looking
40 // at these argument types, so this function definition
41 // does what a .x file does in SunRPC.
42 int
43 srv::handle_22(std::string &r, const std::string a, std::string b)
44 {
45         r = a + b;
46         return 0;
47 }
48
49 int
50 srv::handle_fast(int &r, const int a)
51 {
52         r = a + 1;
53         return 0;
54 }
55
56 int
57 srv::handle_slow(int &r, const int a)
58 {
59         usleep(random() % 5000);
60         r = a + 2;
61         return 0;
62 }
63
64 int
65 srv::handle_bigrep(std::string &r, const size_t len)
66 {
67         r = std::string((size_t)len, 'x');
68         return 0;
69 }
70
71 srv service;
72
73 void startserver()
74 {
75         server = new rpcs((unsigned int)port);
76         server->reg(22, &srv::handle_22, &service);
77         server->reg(23, &srv::handle_fast, &service);
78         server->reg(24, &srv::handle_slow, &service);
79         server->reg(25, &srv::handle_bigrep, &service);
80 }
81
82 void
83 testmarshall()
84 {
85         marshall m;
86         request_header rh{1,2,3,4,5};
87         m.pack_req_header(rh);
88         VERIFY(m.size()==RPC_HEADER_SZ);
89         int i = 12345;
90         unsigned long long l = 1223344455L;
91         std::string s = std::string("hallo....");
92         m << i;
93         m << l;
94         m << s;
95
96         char *b;
97         size_t sz;
98         m.take_buf(&b,&sz);
99         VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
100
101         unmarshall un(b,sz);
102         request_header rh1;
103         un.unpack_req_header(&rh1);
104         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
105         int i1;
106         unsigned long long l1;
107         std::string s1;
108         un >> i1;
109         un >> l1;
110         un >> s1;
111         VERIFY(un.okdone());
112         VERIFY(i1==i && l1==l && s1==s);
113 }
114
115 void
116 client1(size_t cl)
117 {
118         // test concurrency.
119         size_t which_cl = cl % NUM_CL;
120
121         for(int i = 0; i < 100; i++){
122                 int arg = (random() % 2000);
123                 std::string rep;
124                 int ret = clients[which_cl]->call(25, rep, arg);
125                 VERIFY(ret == 0);
126                 if ((int)rep.size()!=arg) {
127                         printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
128                 }
129                 VERIFY((int)rep.size() == arg);
130         }
131
132         // test rpc replies coming back not in the order of
133         // the original calls -- i.e. does xid reply dispatch work.
134         for(int i = 0; i < 100; i++){
135                 int which = (random() % 2);
136                 int arg = (random() % 1000);
137                 int rep;
138
139                 auto start = std::chrono::steady_clock::now();
140
141                 int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
142                 auto end = std::chrono::steady_clock::now();
143                 auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
144                 if (ret != 0)
145                         printf("%d ms have elapsed!!!\n", (int)diff);
146                 VERIFY(ret == 0);
147                 VERIFY(rep == (which ? arg+1 : arg+2));
148         }
149 }
150
151 void
152 client2(size_t cl)
153 {
154         size_t which_cl = cl % NUM_CL;
155
156         time_t t1;
157         time(&t1);
158
159         while(time(0) - t1 < 10){
160                 int arg = (random() % 2000);
161                 std::string rep;
162                 int ret = clients[which_cl]->call(25, rep, arg);
163                 if ((int)rep.size()!=arg) {
164                         printf("ask for %d reply got %d ret %d\n",
165                                arg, (int)rep.size(), ret);
166                 }
167                 VERIFY((int)rep.size() == arg);
168         }
169 }
170
171 void
172 client3(void *xx)
173 {
174         rpcc *c = (rpcc *) xx;
175
176         for(int i = 0; i < 4; i++){
177                 int rep;
178                 int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
179                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
180         }
181 }
182
183
184 void
185 simple_tests(rpcc *c)
186 {
187         printf("simple_tests\n");
188         // an RPC call to procedure #22.
189         // rpcc::call() looks at the argument types to decide how
190         // to marshall the RPC call packet, and how to unmarshall
191         // the reply packet.
192         std::string rep;
193         int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
194         VERIFY(intret == 0); // this is what handle_22 returns
195         VERIFY(rep == "hello goodbye");
196         printf("   -- string concat RPC .. ok\n");
197
198         // small request, big reply (perhaps req via UDP, reply via TCP)
199         intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
200         VERIFY(intret == 0);
201         VERIFY(rep.size() == 70000);
202         printf("   -- small request, big reply .. ok\n");
203
204         // specify a timeout value to an RPC that should succeed (udp)
205         int xx = 0;
206         intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
207         VERIFY(intret == 0 && xx == 78);
208         printf("   -- no spurious timeout .. ok\n");
209
210         // specify a timeout value to an RPC that should succeed (tcp)
211         {
212                 std::string arg(1000, 'x');
213                 std::string rep2;
214                 c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
215                 VERIFY(rep2.size() == 1001);
216                 printf("   -- no spurious timeout .. ok\n");
217         }
218
219         // huge RPC
220         std::string big(1000000, 'x');
221         intret = c->call(22, rep, big, (std::string)"z");
222         VERIFY(rep.size() == 1000001);
223         printf("   -- huge 1M rpc request .. ok\n");
224
225         // specify a timeout value to an RPC that should timeout (udp)
226         struct sockaddr_in non_existent;
227         memset(&non_existent, 0, sizeof(non_existent));
228         non_existent.sin_family = AF_INET;
229         non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
230         non_existent.sin_port = htons(7661);
231         rpcc *c1 = new rpcc(non_existent);
232         time_t t0 = time(0);
233         intret = c1->bind(rpcc::to(3000));
234         time_t t1 = time(0);
235         VERIFY(intret < 0 && (t1 - t0) <= 4);
236         printf("   -- rpc timeout .. ok\n");
237         printf("simple_tests OK\n");
238 }
239
240 void 
241 concurrent_test(size_t nt)
242 {
243         // create threads that make lots of calls in parallel,
244         // to test thread synchronization for concurrent calls
245         // and dispatches.
246         printf("start concurrent_test (%lu threads) ...", nt);
247
248     std::vector<std::thread> th(nt);
249
250         for(size_t i = 0; i < nt; i++)
251         th[i] = std::thread(client1, i);
252
253         for(size_t i = 0; i < nt; i++)
254         th[i].join();
255
256         printf(" OK\n");
257 }
258
259 void 
260 lossy_test()
261 {
262         printf("start lossy_test ...");
263         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
264
265         if (server) {
266                 delete server;
267                 startserver();
268         }
269
270         for (int i = 0; i < NUM_CL; i++) {
271                 delete clients[i];
272                 clients[i] = new rpcc(dst);
273                 VERIFY(clients[i]->bind()==0);
274         }
275
276         size_t nt = 1;
277
278     std::vector<std::thread> th(nt);
279
280         for(size_t i = 0; i < nt; i++)
281         th[i] = std::thread(client2, i);
282
283         for(size_t i = 0; i < nt; i++)
284         th[i].join();
285
286         printf(".. OK\n");
287         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
288 }
289
290 void 
291 failure_test()
292 {
293         rpcc *client1;
294         rpcc *client = clients[0];
295
296         printf("failure_test\n");
297
298         delete server;
299
300         client1 = new rpcc(dst);
301         VERIFY (client1->bind(rpcc::to(3000)) < 0);
302         printf("   -- create new client and try to bind to failed server .. failed ok\n");
303
304         delete client1;
305
306         startserver();
307
308         std::string rep;
309         int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
310         VERIFY(intret == rpc_const::oldsrv_failure);
311         printf("   -- call recovered server with old client .. failed ok\n");
312
313         delete client;
314
315         clients[0] = client = new rpcc(dst);
316         VERIFY (client->bind() >= 0);
317         VERIFY (client->bind() < 0);
318
319         intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
320         VERIFY(intret == 0);
321         VERIFY(rep == "hello goodbye");
322
323         printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
324
325
326         size_t nt = 10;
327         printf("   -- concurrent test on new rpc client w/ %lu threads ..", nt);
328
329     std::vector<std::thread> th(nt);
330
331         for(size_t i = 0; i < nt; i++)
332         th[i] = std::thread(client3, client);
333
334         for(size_t i = 0; i < nt; i++)
335         th[i].join();
336
337         printf("ok\n");
338
339         delete server;
340         delete client;
341
342         startserver();
343         clients[0] = client = new rpcc(dst);
344         VERIFY (client->bind() >= 0);
345         printf("   -- delete existing rpc client and server, create replacements.. ok\n");
346
347         printf("   -- concurrent test on new client and server w/ %lu threads ..", nt);
348
349         for(size_t i = 0; i < nt; i++)
350         th[i] = std::thread(client3, client);
351
352         for(size_t i = 0; i < nt; i++)
353         th[i].join();
354
355         printf("ok\n");
356
357         printf("failure_test OK\n");
358 }
359
360 int
361 main(int argc, char *argv[])
362 {
363
364         setvbuf(stdout, NULL, _IONBF, 0);
365         setvbuf(stderr, NULL, _IONBF, 0);
366         int debug_level = 0;
367
368         bool isclient = false;
369         bool isserver = false;
370
371         srandom((uint32_t)getpid());
372         port = 20000 + (getpid() % 10000);
373
374         int ch = 0;
375         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
376                 switch (ch) {
377                         case 'c':
378                                 isclient = true;
379                                 break;
380                         case 's':
381                                 isserver = true;
382                                 break;
383                         case 'd':
384                                 debug_level = atoi(optarg);
385                                 break;
386                         case 'p':
387                                 port = atoi(optarg);
388                                 break;
389                         case 'l':
390                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
391                 break;
392                         default:
393                                 break;
394                 }
395         }
396
397         if (!isserver && !isclient)  {
398                 isserver = isclient = true;
399         }
400
401         if (debug_level > 0) {
402                 JSL_DEBUG_LEVEL = debug_level;
403                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
404         }
405
406         testmarshall();
407
408         if (isserver) {
409                 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
410                 startserver();
411         }
412
413         if (isclient) {
414                 // server's address.
415                 memset(&dst, 0, sizeof(dst));
416                 dst.sin_family = AF_INET;
417                 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
418                 dst.sin_port = htons(port);
419
420
421                 // start the client.  bind it to the server.
422                 // starts a thread to listen for replies and hand them to
423                 // the correct waiting caller thread. there should probably
424                 // be only one rpcc per process. you probably need one
425                 // rpcc per server.
426                 for (int i = 0; i < NUM_CL; i++) {
427                         clients[i] = new rpcc(dst);
428                         VERIFY (clients[i]->bind() == 0);
429                 }
430
431                 simple_tests(clients[0]);
432                 concurrent_test(10);
433                 lossy_test();
434                 if (isserver) {
435                         failure_test();
436                 }
437
438                 printf("rpctest OK\n");
439
440                 exit(0);
441         }
442
443         while (1) {
444                 sleep(1);
445         }
446 }