c69d317cd2707c1a707ecfd9381b56ad140cc2fe
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <sys/types.h>
9 #include <unistd.h>
10 #include "jsl_log.h"
11
12 #define NUM_CL 2
13
14 char log_thread_prefix = 'r';
15
16 rpcs *server;  // server rpc object
17 rpcc *clients[NUM_CL];  // client rpc object
18 string dst; //server's ip address
19 int port;
20
21 // server-side handlers. they must be methods of some class
22 // to simplify rpcs::reg(). a server process can have handlers
23 // from multiple classes.
24 class srv {
25         public:
26                 int handle_22(string & r, const string a, const string b);
27                 int handle_fast(int &r, const int a);
28                 int handle_slow(int &r, const int a);
29                 int handle_bigrep(string &r, const size_t a);
30 };
31
32 // a handler. a and b are arguments, r is the result.
33 // there can be multiple arguments but only one result.
34 // the caller also gets to see the int return value
35 // as the return value from rpcc::call().
36 // rpcs::reg() decides how to unmarshall by looking
37 // at these argument types, so this function definition
38 // does what a .x file does in SunRPC.
39 int
40 srv::handle_22(string &r, const string a, string b)
41 {
42         r = a + b;
43         return 0;
44 }
45
46 int
47 srv::handle_fast(int &r, const int a)
48 {
49         r = a + 1;
50         return 0;
51 }
52
53 int
54 srv::handle_slow(int &r, const int a)
55 {
56         usleep(random() % 5000);
57         r = a + 2;
58         return 0;
59 }
60
61 int
62 srv::handle_bigrep(string &r, const size_t len)
63 {
64         r = string((size_t)len, 'x');
65         return 0;
66 }
67
68 srv service;
69
70 void startserver()
71 {
72         server = new rpcs((unsigned int)port);
73         server->reg(22, &srv::handle_22, &service);
74         server->reg(23, &srv::handle_fast, &service);
75         server->reg(24, &srv::handle_slow, &service);
76         server->reg(25, &srv::handle_bigrep, &service);
77 }
78
79 void
80 testmarshall()
81 {
82         marshall m;
83         request_header rh{1,2,3,4,5};
84         m.pack_req_header(rh);
85         VERIFY(m.size()==RPC_HEADER_SZ);
86         int i = 12345;
87         unsigned long long l = 1223344455L;
88         string s = "hallo....";
89         m << i;
90         m << l;
91         m << s;
92
93         char *b;
94         size_t sz;
95         m.take_buf(&b,&sz);
96         VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
97
98         unmarshall un(b,sz);
99         request_header rh1;
100         un.unpack_req_header(&rh1);
101         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
102         int i1;
103         unsigned long long l1;
104         string s1;
105         un >> i1;
106         un >> l1;
107         un >> s1;
108         VERIFY(un.okdone());
109         VERIFY(i1==i && l1==l && s1==s);
110 }
111
112 void
113 client1(size_t cl)
114 {
115         // test concurrency.
116         size_t which_cl = cl % NUM_CL;
117
118         for(int i = 0; i < 100; i++){
119                 int arg = (random() % 2000);
120                 string rep;
121                 int ret = clients[which_cl]->call(25, rep, arg);
122                 VERIFY(ret == 0);
123                 if ((int)rep.size()!=arg)
124                         cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
125                 VERIFY((int)rep.size() == arg);
126         }
127
128         // test rpc replies coming back not in the order of
129         // the original calls -- i.e. does xid reply dispatch work.
130         for(int i = 0; i < 100; i++){
131                 int which = (random() % 2);
132                 int arg = (random() % 1000);
133                 int rep;
134
135                 auto start = std::chrono::steady_clock::now();
136
137                 int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
138                 auto end = std::chrono::steady_clock::now();
139                 auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
140                 if (ret != 0)
141                         cout << diff << " ms have elapsed!!!" << endl;
142                 VERIFY(ret == 0);
143                 VERIFY(rep == (which ? arg+1 : arg+2));
144         }
145 }
146
147 void
148 client2(size_t cl)
149 {
150         size_t which_cl = cl % NUM_CL;
151
152         time_t t1;
153         time(&t1);
154
155         while(time(0) - t1 < 10){
156                 int arg = (random() % 2000);
157                 string rep;
158                 int ret = clients[which_cl]->call(25, rep, arg);
159                 if ((int)rep.size()!=arg)
160                         cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
161                 VERIFY((int)rep.size() == arg);
162         }
163 }
164
165 void
166 client3(void *xx)
167 {
168         rpcc *c = (rpcc *) xx;
169
170         for(int i = 0; i < 4; i++){
171                 int rep;
172                 int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
173                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
174         }
175 }
176
177
178 void
179 simple_tests(rpcc *c)
180 {
181         cout << "simple_tests" << endl;
182         // an RPC call to procedure #22.
183         // rpcc::call() looks at the argument types to decide how
184         // to marshall the RPC call packet, and how to unmarshall
185         // the reply packet.
186         string rep;
187         int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
188         VERIFY(intret == 0); // this is what handle_22 returns
189         VERIFY(rep == "hello goodbye");
190         cout << "   -- string concat RPC .. ok" << endl;
191
192         // small request, big reply (perhaps req via UDP, reply via TCP)
193         intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
194         VERIFY(intret == 0);
195         VERIFY(rep.size() == 70000);
196         cout << "   -- small request, big reply .. ok" << endl;
197
198         // specify a timeout value to an RPC that should succeed (udp)
199         int xx = 0;
200         intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
201         VERIFY(intret == 0 && xx == 78);
202         cout << "   -- no spurious timeout .. ok" << endl;
203
204         // specify a timeout value to an RPC that should succeed (tcp)
205         {
206                 string arg(1000, 'x');
207                 string rep2;
208                 c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
209                 VERIFY(rep2.size() == 1001);
210                 cout << "   -- no spurious timeout .. ok" << endl;
211         }
212
213         // huge RPC
214         string big(1000000, 'x');
215         intret = c->call(22, rep, big, (string)"z");
216         VERIFY(rep.size() == 1000001);
217         cout << "   -- huge 1M rpc request .. ok" << endl;
218
219         // specify a timeout value to an RPC that should timeout (udp)
220     string non_existent = "127.0.0.1:7661";
221         rpcc *c1 = new rpcc(non_existent);
222         time_t t0 = time(0);
223         intret = c1->bind(rpcc::to(3000));
224         time_t t1 = time(0);
225         VERIFY(intret < 0 && (t1 - t0) <= 4);
226         cout << "   -- rpc timeout .. ok" << endl;
227         cout << "simple_tests OK" << endl;
228 }
229
230 void 
231 concurrent_test(size_t nt)
232 {
233         // create threads that make lots of calls in parallel,
234         // to test thread synchronization for concurrent calls
235         // and dispatches.
236         cout << "start concurrent_test (" << nt << " threads) ...";
237
238     vector<thread> th(nt);
239
240         for(size_t i = 0; i < nt; i++)
241         th[i] = thread(client1, i);
242
243         for(size_t i = 0; i < nt; i++)
244         th[i].join();
245
246         cout << " OK" << endl;
247 }
248
249 void 
250 lossy_test()
251 {
252         cout << "start lossy_test ...";
253         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
254
255         if (server) {
256                 delete server;
257                 startserver();
258         }
259
260         for (int i = 0; i < NUM_CL; i++) {
261                 delete clients[i];
262                 clients[i] = new rpcc(dst);
263                 VERIFY(clients[i]->bind()==0);
264         }
265
266         size_t nt = 1;
267
268     vector<thread> th(nt);
269
270         for(size_t i = 0; i < nt; i++)
271         th[i] = thread(client2, i);
272
273         for(size_t i = 0; i < nt; i++)
274         th[i].join();
275
276         cout << ".. OK" << endl;
277         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
278 }
279
280 void 
281 failure_test()
282 {
283         rpcc *client1;
284         rpcc *client = clients[0];
285
286         cout << "failure_test" << endl;
287
288         delete server;
289
290         client1 = new rpcc(dst);
291         VERIFY (client1->bind(rpcc::to(3000)) < 0);
292         cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
293
294         delete client1;
295
296         startserver();
297
298         string rep;
299         int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
300         VERIFY(intret == rpc_const::oldsrv_failure);
301         cout << "   -- call recovered server with old client .. failed ok" << endl;
302
303         delete client;
304
305         clients[0] = client = new rpcc(dst);
306         VERIFY (client->bind() >= 0);
307         VERIFY (client->bind() < 0);
308
309         intret = client->call(22, rep, (string)"hello", (string)" goodbye");
310         VERIFY(intret == 0);
311         VERIFY(rep == "hello goodbye");
312
313         cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
314
315
316         size_t nt = 10;
317         cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
318
319     vector<thread> th(nt);
320
321         for(size_t i = 0; i < nt; i++)
322         th[i] = thread(client3, client);
323
324         for(size_t i = 0; i < nt; i++)
325         th[i].join();
326
327         cout << "ok" << endl;
328
329         delete server;
330         delete client;
331
332         startserver();
333         clients[0] = client = new rpcc(dst);
334         VERIFY (client->bind() >= 0);
335         cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
336
337         cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
338
339         for(size_t i = 0; i < nt; i++)
340         th[i] = thread(client3, client);
341
342         for(size_t i = 0; i < nt; i++)
343         th[i].join();
344
345         cout << "ok" << endl;
346
347         cout << "failure_test OK" << endl;
348 }
349
350 int
351 main(int argc, char *argv[])
352 {
353
354         setvbuf(stdout, NULL, _IONBF, 0);
355         setvbuf(stderr, NULL, _IONBF, 0);
356         int debug_level = 0;
357
358         bool isclient = false;
359         bool isserver = false;
360
361         srandom((uint32_t)getpid());
362         port = 20000 + (getpid() % 10000);
363
364         int ch = 0;
365         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
366                 switch (ch) {
367                         case 'c':
368                                 isclient = true;
369                                 break;
370                         case 's':
371                                 isserver = true;
372                                 break;
373                         case 'd':
374                                 debug_level = atoi(optarg);
375                                 break;
376                         case 'p':
377                                 port = atoi(optarg);
378                                 break;
379                         case 'l':
380                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
381                 break;
382                         default:
383                                 break;
384                 }
385         }
386
387         if (!isserver && !isclient)  {
388                 isserver = isclient = true;
389         }
390
391         if (debug_level > 0) {
392                 JSL_DEBUG_LEVEL = debug_level;
393                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
394         }
395
396         testmarshall();
397
398         if (isserver) {
399                 cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
400                 startserver();
401         }
402
403         if (isclient) {
404                 // server's address.
405         dst = "127.0.0.1:" + std::to_string(port);
406
407
408                 // start the client.  bind it to the server.
409                 // starts a thread to listen for replies and hand them to
410                 // the correct waiting caller thread. there should probably
411                 // be only one rpcc per process. you probably need one
412                 // rpcc per server.
413                 for (int i = 0; i < NUM_CL; i++) {
414                         clients[i] = new rpcc(dst);
415                         VERIFY (clients[i]->bind() == 0);
416                 }
417
418                 simple_tests(clients[0]);
419                 concurrent_test(10);
420                 lossy_test();
421                 if (isserver) {
422                         failure_test();
423                 }
424
425                 cout << "rpctest OK" << endl;
426
427                 exit(0);
428         }
429
430         while (1) {
431                 sleep(1);
432         }
433 }