Imported from 6.824 labs
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <getopt.h>
10 #include "jsl_log.h"
11 #include "gettime.h"
12 #include "lang/verify.h"
13
14 #define NUM_CL 2
15
16 rpcs *server;  // server rpc object
17 rpcc *clients[NUM_CL];  // client rpc object
18 struct sockaddr_in dst; //server's ip address
19 int port;
20 pthread_attr_t attr;
21
22 // server-side handlers. they must be methods of some class
23 // to simplify rpcs::reg(). a server process can have handlers
24 // from multiple classes.
25 class srv {
26         public:
27                 int handle_22(const std::string a, const std::string b, std::string & r);
28                 int handle_fast(const int a, int &r);
29                 int handle_slow(const int a, int &r);
30                 int handle_bigrep(const int a, std::string &r);
31 };
32
33 // a handler. a and b are arguments, r is the result.
34 // there can be multiple arguments but only one result.
35 // the caller also gets to see the int return value
36 // as the return value from rpcc::call().
37 // rpcs::reg() decides how to unmarshall by looking
38 // at these argument types, so this function definition
39 // does what a .x file does in SunRPC.
40 int
41 srv::handle_22(const std::string a, std::string b, std::string &r)
42 {
43         r = a + b;
44         return 0;
45 }
46
47 int
48 srv::handle_fast(const int a, int &r)
49 {
50         r = a + 1;
51         return 0;
52 }
53
54 int
55 srv::handle_slow(const int a, int &r)
56 {
57         usleep(random() % 5000);
58         r = a + 2;
59         return 0;
60 }
61
62 int
63 srv::handle_bigrep(const int len, std::string &r)
64 {
65         r = std::string(len, 'x');
66         return 0;
67 }
68
69 srv service;
70
71 void startserver()
72 {
73         server = new rpcs(port);
74         server->reg(22, &service, &srv::handle_22);
75         server->reg(23, &service, &srv::handle_fast);
76         server->reg(24, &service, &srv::handle_slow);
77         server->reg(25, &service, &srv::handle_bigrep);
78 }
79
80 void
81 testmarshall()
82 {
83         marshall m;
84         req_header rh(1,2,3,4,5);
85         m.pack_req_header(rh);
86         VERIFY(m.size()==RPC_HEADER_SZ);
87         int i = 12345;
88         unsigned long long l = 1223344455L;
89         std::string s = std::string("hallo....");
90         m << i;
91         m << l;
92         m << s;
93
94         char *b;
95         int sz;
96         m.take_buf(&b,&sz);
97         VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
98
99         unmarshall un(b,sz);
100         req_header rh1;
101         un.unpack_req_header(&rh1);
102         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
103         int i1;
104         unsigned long long l1;
105         std::string s1;
106         un >> i1;
107         un >> l1;
108         un >> s1;
109         VERIFY(un.okdone());
110         VERIFY(i1==i && l1==l && s1==s);
111 }
112
113 void *
114 client1(void *xx)
115 {
116
117         // test concurrency.
118         int which_cl = ((unsigned long) xx ) % NUM_CL;
119
120         for(int i = 0; i < 100; i++){
121                 int arg = (random() % 2000);
122                 std::string rep;
123                 int ret = clients[which_cl]->call(25, arg, rep);
124                 VERIFY(ret == 0);
125                 if ((int)rep.size()!=arg) {
126                         printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
127                 }
128                 VERIFY((int)rep.size() == arg);
129         }
130
131         // test rpc replies coming back not in the order of
132         // the original calls -- i.e. does xid reply dispatch work.
133         for(int i = 0; i < 100; i++){
134                 int which = (random() % 2);
135                 int arg = (random() % 1000);
136                 int rep;
137
138                 struct timespec start,end;
139                 clock_gettime(CLOCK_REALTIME, &start);
140
141                 int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
142                 clock_gettime(CLOCK_REALTIME, &end);
143                 int diff = diff_timespec(end, start);
144                 if (ret != 0)
145                         printf("%d ms have elapsed!!!\n", diff);
146                 VERIFY(ret == 0);
147                 VERIFY(rep == (which ? arg+1 : arg+2));
148         }
149
150         return 0;
151 }
152
153 void *
154 client2(void *xx)
155 {
156         int which_cl = ((unsigned long) xx ) % NUM_CL;
157
158         time_t t1;
159         time(&t1);
160
161         while(time(0) - t1 < 10){
162                 int arg = (random() % 2000);
163                 std::string rep;
164                 int ret = clients[which_cl]->call(25, arg, rep);
165                 if ((int)rep.size()!=arg) {
166                         printf("ask for %d reply got %d ret %d\n",
167                                arg, (int)rep.size(), ret);
168                 }
169                 VERIFY((int)rep.size() == arg);
170         }
171         return 0;
172 }
173
174 void *
175 client3(void *xx)
176 {
177         rpcc *c = (rpcc *) xx;
178
179         for(int i = 0; i < 4; i++){
180                 int rep;
181                 int ret = c->call(24, i, rep, rpcc::to(3000));
182                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
183         }
184         return 0;
185 }
186
187
188 void
189 simple_tests(rpcc *c)
190 {
191         printf("simple_tests\n");
192         // an RPC call to procedure #22.
193         // rpcc::call() looks at the argument types to decide how
194         // to marshall the RPC call packet, and how to unmarshall
195         // the reply packet.
196         std::string rep;
197         int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
198         VERIFY(intret == 0); // this is what handle_22 returns
199         VERIFY(rep == "hello goodbye");
200         printf("   -- string concat RPC .. ok\n");
201
202         // small request, big reply (perhaps req via UDP, reply via TCP)
203         intret = c->call(25, 70000, rep, rpcc::to(200000));
204         VERIFY(intret == 0);
205         VERIFY(rep.size() == 70000);
206         printf("   -- small request, big reply .. ok\n");
207
208 #if 0
209         // too few arguments
210         intret = c->call(22, (std::string)"just one", rep);
211         VERIFY(intret < 0);
212         printf("   -- too few arguments .. failed ok\n");
213
214         // too many arguments; proc #23 expects just one.
215         intret = c->call(23, 1001, 1002, rep);
216         VERIFY(intret < 0);
217         printf("   -- too many arguments .. failed ok\n");
218
219         // wrong return value size
220         int wrongrep;
221         intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep);
222         VERIFY(intret < 0);
223         printf("   -- wrong ret value size .. failed ok\n");
224 #endif
225
226         // specify a timeout value to an RPC that should succeed (udp)
227         int xx = 0;
228         intret = c->call(23, 77, xx, rpcc::to(3000));
229         VERIFY(intret == 0 && xx == 78);
230         printf("   -- no suprious timeout .. ok\n");
231
232         // specify a timeout value to an RPC that should succeed (tcp)
233         {
234                 std::string arg(1000, 'x');
235                 std::string rep;
236                 c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
237                 VERIFY(rep.size() == 1001);
238                 printf("   -- no suprious timeout .. ok\n");
239         }
240
241         // huge RPC
242         std::string big(1000000, 'x');
243         intret = c->call(22, big, (std::string)"z", rep);
244         VERIFY(rep.size() == 1000001);
245         printf("   -- huge 1M rpc request .. ok\n");
246
247         // specify a timeout value to an RPC that should timeout (udp)
248         struct sockaddr_in non_existent;
249         memset(&non_existent, 0, sizeof(non_existent));
250         non_existent.sin_family = AF_INET;
251         non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
252         non_existent.sin_port = htons(7661);
253         rpcc *c1 = new rpcc(non_existent);
254         time_t t0 = time(0);
255         intret = c1->bind(rpcc::to(3000));
256         time_t t1 = time(0);
257         VERIFY(intret < 0 && (t1 - t0) <= 4);
258         printf("   -- rpc timeout .. ok\n");
259         printf("simple_tests OK\n");
260 }
261
262 void 
263 concurrent_test(int nt)
264 {
265         // create threads that make lots of calls in parallel,
266         // to test thread synchronization for concurrent calls
267         // and dispatches.
268         int ret;
269
270         printf("start concurrent_test (%d threads) ...", nt);
271
272         pthread_t th[nt];
273         for(int i = 0; i < nt; i++){
274                 ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i);
275                 VERIFY(ret == 0);
276         }
277
278         for(int i = 0; i < nt; i++){
279                 VERIFY(pthread_join(th[i], NULL) == 0);
280         }
281         printf(" OK\n");
282 }
283
284 void 
285 lossy_test()
286 {
287         int ret;
288
289         printf("start lossy_test ...");
290         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
291
292         if (server) {
293                 delete server;
294                 startserver();
295         }
296
297         for (int i = 0; i < NUM_CL; i++) {
298                 delete clients[i];
299                 clients[i] = new rpcc(dst);
300                 VERIFY(clients[i]->bind()==0);
301         }
302
303         int nt = 1;
304         pthread_t th[nt];
305         for(int i = 0; i < nt; i++){
306                 ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i);
307                 VERIFY(ret == 0);
308         }
309         for(int i = 0; i < nt; i++){
310                 VERIFY(pthread_join(th[i], NULL) == 0);
311         }
312         printf(".. OK\n");
313         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
314 }
315
316 void 
317 failure_test()
318 {
319         rpcc *client1;
320         rpcc *client = clients[0];
321
322         printf("failure_test\n");
323
324         delete server;
325
326         client1 = new rpcc(dst);
327         VERIFY (client1->bind(rpcc::to(3000)) < 0);
328         printf("   -- create new client and try to bind to failed server .. failed ok\n");
329
330         delete client1;
331
332         startserver();
333
334         std::string rep;
335         int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
336         VERIFY(intret == rpc_const::oldsrv_failure);
337         printf("   -- call recovered server with old client .. failed ok\n");
338
339         delete client;
340
341         clients[0] = client = new rpcc(dst);
342         VERIFY (client->bind() >= 0);
343         VERIFY (client->bind() < 0);
344
345         intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
346         VERIFY(intret == 0);
347         VERIFY(rep == "hello goodbye");
348
349         printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
350
351
352         int nt = 10;
353         int ret;
354         printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
355
356         pthread_t th[nt];
357         for(int i = 0; i < nt; i++){
358                 ret = pthread_create(&th[i], &attr, client3, (void *) client);
359                 VERIFY(ret == 0);
360         }
361
362         for(int i = 0; i < nt; i++){
363                 VERIFY(pthread_join(th[i], NULL) == 0);
364         }
365         printf("ok\n");
366
367         delete server;
368         delete client;
369
370         startserver();
371         clients[0] = client = new rpcc(dst);
372         VERIFY (client->bind() >= 0);
373         printf("   -- delete existing rpc client and server, create replacements.. ok\n");
374
375         printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
376         for(int i = 0; i < nt; i++){
377                 ret = pthread_create(&th[i], &attr, client3, (void *)client);
378                 VERIFY(ret == 0);
379         }
380
381         for(int i = 0; i < nt; i++){
382                 VERIFY(pthread_join(th[i], NULL) == 0);
383         }
384         printf("ok\n");
385
386         printf("failure_test OK\n");
387 }
388
389 int
390 main(int argc, char *argv[])
391 {
392
393         setvbuf(stdout, NULL, _IONBF, 0);
394         setvbuf(stderr, NULL, _IONBF, 0);
395         int debug_level = 0;
396
397         bool isclient = false;
398         bool isserver = false;
399
400         srandom(getpid());
401         port = 20000 + (getpid() % 10000);
402
403         char ch = 0;
404         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
405                 switch (ch) {
406                         case 'c':
407                                 isclient = true;
408                                 break;
409                         case 's':
410                                 isserver = true;
411                                 break;
412                         case 'd':
413                                 debug_level = atoi(optarg);
414                                 break;
415                         case 'p':
416                                 port = atoi(optarg);
417                                 break;
418                         case 'l':
419                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
420                         default:
421                                 break;
422                 }
423         }
424
425         if (!isserver && !isclient)  {
426                 isserver = isclient = true;
427         }
428
429         if (debug_level > 0) {
430                 //__loginit.initNow();
431                 jsl_set_debug(debug_level);
432                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
433         }
434
435         testmarshall();
436
437         pthread_attr_init(&attr);
438         // set stack size to 32K, so we don't run out of memory
439         pthread_attr_setstacksize(&attr, 32*1024);
440
441         if (isserver) {
442                 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ);
443                 startserver();
444         }
445
446         if (isclient) {
447                 // server's address.
448                 memset(&dst, 0, sizeof(dst));
449                 dst.sin_family = AF_INET;
450                 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
451                 dst.sin_port = htons(port);
452
453
454                 // start the client.  bind it to the server.
455                 // starts a thread to listen for replies and hand them to
456                 // the correct waiting caller thread. there should probably
457                 // be only one rpcc per process. you probably need one
458                 // rpcc per server.
459                 for (int i = 0; i < NUM_CL; i++) {
460                         clients[i] = new rpcc(dst);
461                         VERIFY (clients[i]->bind() == 0);
462                 }
463
464                 simple_tests(clients[0]);
465                 concurrent_test(10);
466                 lossy_test();
467                 if (isserver) {
468                         failure_test();
469                 }
470
471                 printf("rpctest OK\n");
472
473                 exit(0);
474         }
475
476         while (1) {
477                 sleep(1);
478         }
479 }