Lots of clean-ups and simplifications
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <getopt.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12 #include "jsl_log.h"
13 #include "lang/verify.h"
14
15 #define NUM_CL 2
16
17 rpcs *server;  // server rpc object
18 rpcc *clients[NUM_CL];  // client rpc object
19 struct sockaddr_in dst; //server's ip address
20 int port;
21
22 // server-side handlers. they must be methods of some class
23 // to simplify rpcs::reg(). a server process can have handlers
24 // from multiple classes.
25 class srv {
26         public:
27                 int handle_22(const std::string a, const std::string b, std::string & r);
28                 int handle_fast(const int a, int &r);
29                 int handle_slow(const int a, int &r);
30                 int handle_bigrep(const int a, std::string &r);
31 };
32
33 // a handler. a and b are arguments, r is the result.
34 // there can be multiple arguments but only one result.
35 // the caller also gets to see the int return value
36 // as the return value from rpcc::call().
37 // rpcs::reg() decides how to unmarshall by looking
38 // at these argument types, so this function definition
39 // does what a .x file does in SunRPC.
40 int
41 srv::handle_22(const std::string a, std::string b, std::string &r)
42 {
43         r = a + b;
44         return 0;
45 }
46
47 int
48 srv::handle_fast(const int a, int &r)
49 {
50         r = a + 1;
51         return 0;
52 }
53
54 int
55 srv::handle_slow(const int a, int &r)
56 {
57         usleep(random() % 5000);
58         r = a + 2;
59         return 0;
60 }
61
62 int
63 srv::handle_bigrep(const int len, std::string &r)
64 {
65         r = std::string(len, 'x');
66         return 0;
67 }
68
69 srv service;
70
71 void startserver()
72 {
73         server = new rpcs(port);
74         server->reg(22, &service, &srv::handle_22);
75         server->reg(23, &service, &srv::handle_fast);
76         server->reg(24, &service, &srv::handle_slow);
77         server->reg(25, &service, &srv::handle_bigrep);
78 }
79
80 void
81 testmarshall()
82 {
83         marshall m;
84         req_header rh(1,2,3,4,5);
85         m.pack_req_header(rh);
86         VERIFY(m.size()==RPC_HEADER_SZ);
87         int i = 12345;
88         unsigned long long l = 1223344455L;
89         std::string s = std::string("hallo....");
90         m << i;
91         m << l;
92         m << s;
93
94         char *b;
95         int sz;
96         m.take_buf(&b,&sz);
97         VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
98
99         unmarshall un(b,sz);
100         req_header rh1;
101         un.unpack_req_header(&rh1);
102         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
103         int i1;
104         unsigned long long l1;
105         std::string s1;
106         un >> i1;
107         un >> l1;
108         un >> s1;
109         VERIFY(un.okdone());
110         VERIFY(i1==i && l1==l && s1==s);
111 }
112
113 void
114 client1(int cl)
115 {
116         // test concurrency.
117         int which_cl = ((unsigned long) cl ) % NUM_CL;
118
119         for(int i = 0; i < 100; i++){
120                 int arg = (random() % 2000);
121                 std::string rep;
122                 int ret = clients[which_cl]->call(25, arg, rep);
123                 VERIFY(ret == 0);
124                 if ((int)rep.size()!=arg) {
125                         printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
126                 }
127                 VERIFY((int)rep.size() == arg);
128         }
129
130         // test rpc replies coming back not in the order of
131         // the original calls -- i.e. does xid reply dispatch work.
132         for(int i = 0; i < 100; i++){
133                 int which = (random() % 2);
134                 int arg = (random() % 1000);
135                 int rep;
136
137                 auto start = std::chrono::steady_clock::now();
138
139                 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
140                 auto end = std::chrono::steady_clock::now();
141                 int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
142                 if (ret != 0)
143                         printf("%d ms have elapsed!!!\n", diff);
144                 VERIFY(ret == 0);
145                 VERIFY(rep == (which ? arg+1 : arg+2));
146         }
147 }
148
149 void
150 client2(int cl)
151 {
152         int which_cl = ((unsigned long) cl ) % NUM_CL;
153
154         time_t t1;
155         time(&t1);
156
157         while(time(0) - t1 < 10){
158                 int arg = (random() % 2000);
159                 std::string rep;
160                 int ret = clients[which_cl]->call(25, arg, rep);
161                 if ((int)rep.size()!=arg) {
162                         printf("ask for %d reply got %d ret %d\n",
163                                arg, (int)rep.size(), ret);
164                 }
165                 VERIFY((int)rep.size() == arg);
166         }
167 }
168
169 void
170 client3(void *xx)
171 {
172         rpcc *c = (rpcc *) xx;
173
174         for(int i = 0; i < 4; i++){
175                 int rep;
176                 int ret = c->call(24, i, rep, rpcc::to(3000));
177                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
178         }
179 }
180
181
182 void
183 simple_tests(rpcc *c)
184 {
185         printf("simple_tests\n");
186         // an RPC call to procedure #22.
187         // rpcc::call() looks at the argument types to decide how
188         // to marshall the RPC call packet, and how to unmarshall
189         // the reply packet.
190         std::string rep;
191         int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
192         VERIFY(intret == 0); // this is what handle_22 returns
193         VERIFY(rep == "hello goodbye");
194         printf("   -- string concat RPC .. ok\n");
195
196         // small request, big reply (perhaps req via UDP, reply via TCP)
197         intret = c->call(25, 70000, rep, rpcc::to(200000));
198         VERIFY(intret == 0);
199         VERIFY(rep.size() == 70000);
200         printf("   -- small request, big reply .. ok\n");
201
202         // specify a timeout value to an RPC that should succeed (udp)
203         int xx = 0;
204         intret = c->call(23, 77, xx, rpcc::to(3000));
205         VERIFY(intret == 0 && xx == 78);
206         printf("   -- no suprious timeout .. ok\n");
207
208         // specify a timeout value to an RPC that should succeed (tcp)
209         {
210                 std::string arg(1000, 'x');
211                 std::string rep;
212                 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
213                 VERIFY(rep.size() == 1001);
214                 printf("   -- no suprious timeout .. ok\n");
215         }
216
217         // huge RPC
218         std::string big(1000000, 'x');
219         intret = c->call(22, big, (std::string)"z", rep);
220         VERIFY(rep.size() == 1000001);
221         printf("   -- huge 1M rpc request .. ok\n");
222
223         // specify a timeout value to an RPC that should timeout (udp)
224         struct sockaddr_in non_existent;
225         memset(&non_existent, 0, sizeof(non_existent));
226         non_existent.sin_family = AF_INET;
227         non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
228         non_existent.sin_port = htons(7661);
229         rpcc *c1 = new rpcc(non_existent);
230         time_t t0 = time(0);
231         intret = c1->bind(rpcc::to(3000));
232         time_t t1 = time(0);
233         VERIFY(intret < 0 && (t1 - t0) <= 4);
234         printf("   -- rpc timeout .. ok\n");
235         printf("simple_tests OK\n");
236 }
237
238 void 
239 concurrent_test(int nt)
240 {
241         // create threads that make lots of calls in parallel,
242         // to test thread synchronization for concurrent calls
243         // and dispatches.
244         printf("start concurrent_test (%d threads) ...", nt);
245
246     std::vector<std::thread> th(nt);
247         for(int i = 0; i < nt; i++){
248         th[i] = std::thread(client1, i);
249         }
250
251         for(int i = 0; i < nt; i++){
252         th[i].join();
253         }
254         printf(" OK\n");
255 }
256
257 void 
258 lossy_test()
259 {
260         printf("start lossy_test ...");
261         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
262
263         if (server) {
264                 delete server;
265                 startserver();
266         }
267
268         for (int i = 0; i < NUM_CL; i++) {
269                 delete clients[i];
270                 clients[i] = new rpcc(dst);
271                 VERIFY(clients[i]->bind()==0);
272         }
273
274         int nt = 1;
275     std::vector<std::thread> th(nt);
276         for(int i = 0; i < nt; i++){
277         th[i] = std::thread(client2, i);
278         }
279         for(int i = 0; i < nt; i++){
280         th[i].join();
281         }
282         printf(".. OK\n");
283         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
284 }
285
286 void 
287 failure_test()
288 {
289         rpcc *client1;
290         rpcc *client = clients[0];
291
292         printf("failure_test\n");
293
294         delete server;
295
296         client1 = new rpcc(dst);
297         VERIFY (client1->bind(rpcc::to(3000)) < 0);
298         printf("   -- create new client and try to bind to failed server .. failed ok\n");
299
300         delete client1;
301
302         startserver();
303
304         std::string rep;
305         int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
306         VERIFY(intret == rpc_const::oldsrv_failure);
307         printf("   -- call recovered server with old client .. failed ok\n");
308
309         delete client;
310
311         clients[0] = client = new rpcc(dst);
312         VERIFY (client->bind() >= 0);
313         VERIFY (client->bind() < 0);
314
315         intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
316         VERIFY(intret == 0);
317         VERIFY(rep == "hello goodbye");
318
319         printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
320
321
322         int nt = 10;
323         printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
324
325     std::vector<std::thread> th(nt);
326         for(int i = 0; i < nt; i++){
327         th[i] = std::thread(client3, client);
328         }
329
330         for(int i = 0; i < nt; i++){
331         th[i].join();
332         }
333         printf("ok\n");
334
335         delete server;
336         delete client;
337
338         startserver();
339         clients[0] = client = new rpcc(dst);
340         VERIFY (client->bind() >= 0);
341         printf("   -- delete existing rpc client and server, create replacements.. ok\n");
342
343         printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
344         for(int i = 0; i < nt; i++){
345         th[i] = std::thread(client3, client);
346         }
347
348         for(int i = 0; i < nt; i++){
349         th[i].join();
350         }
351         printf("ok\n");
352
353         printf("failure_test OK\n");
354 }
355
356 int
357 main(int argc, char *argv[])
358 {
359
360         setvbuf(stdout, NULL, _IONBF, 0);
361         setvbuf(stderr, NULL, _IONBF, 0);
362         int debug_level = 0;
363
364         bool isclient = false;
365         bool isserver = false;
366
367         srandom(getpid());
368         port = 20000 + (getpid() % 10000);
369
370         char ch = 0;
371         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
372                 switch (ch) {
373                         case 'c':
374                                 isclient = true;
375                                 break;
376                         case 's':
377                                 isserver = true;
378                                 break;
379                         case 'd':
380                                 debug_level = atoi(optarg);
381                                 break;
382                         case 'p':
383                                 port = atoi(optarg);
384                                 break;
385                         case 'l':
386                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
387                         default:
388                                 break;
389                 }
390         }
391
392         if (!isserver && !isclient)  {
393                 isserver = isclient = true;
394         }
395
396         if (debug_level > 0) {
397                 JSL_DEBUG_LEVEL = debug_level;
398                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
399         }
400
401         testmarshall();
402
403         if (isserver) {
404                 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
405                 startserver();
406         }
407
408         if (isclient) {
409                 // server's address.
410                 memset(&dst, 0, sizeof(dst));
411                 dst.sin_family = AF_INET;
412                 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
413                 dst.sin_port = htons(port);
414
415
416                 // start the client.  bind it to the server.
417                 // starts a thread to listen for replies and hand them to
418                 // the correct waiting caller thread. there should probably
419                 // be only one rpcc per process. you probably need one
420                 // rpcc per server.
421                 for (int i = 0; i < NUM_CL; i++) {
422                         clients[i] = new rpcc(dst);
423                         VERIFY (clients[i]->bind() == 0);
424                 }
425
426                 simple_tests(clients[0]);
427                 concurrent_test(10);
428                 lossy_test();
429                 if (isserver) {
430                         failure_test();
431                 }
432
433                 printf("rpctest OK\n");
434
435                 exit(0);
436         }
437
438         while (1) {
439                 sleep(1);
440         }
441 }