More clean-ups and cool template stuff
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "rpc.h"
5 #include <arpa/inet.h>
6 #include <iostream>
7 #include <vector>
8 #include <thread>
9 #include <stdlib.h>
10 #include <getopt.h>
11 #include <sys/types.h>
12 #include <unistd.h>
13 #include "jsl_log.h"
14 #include "lang/verify.h"
15
16 #define NUM_CL 2
17
18 char log_thread_prefix = 'r';
19
20 using std::string;
21 using std::cout;
22 using std::endl;
23 using std::vector;
24 using std::thread;
25
26 rpcs *server;  // server rpc object
27 rpcc *clients[NUM_CL];  // client rpc object
28 string dst; //server's ip address
29 int port;
30
31 // server-side handlers. they must be methods of some class
32 // to simplify rpcs::reg(). a server process can have handlers
33 // from multiple classes.
34 class srv {
35         public:
36                 int handle_22(string & r, const string a, const string b);
37                 int handle_fast(int &r, const int a);
38                 int handle_slow(int &r, const int a);
39                 int handle_bigrep(string &r, const size_t a);
40 };
41
42 // a handler. a and b are arguments, r is the result.
43 // there can be multiple arguments but only one result.
44 // the caller also gets to see the int return value
45 // as the return value from rpcc::call().
46 // rpcs::reg() decides how to unmarshall by looking
47 // at these argument types, so this function definition
48 // does what a .x file does in SunRPC.
49 int
50 srv::handle_22(string &r, const string a, string b)
51 {
52         r = a + b;
53         return 0;
54 }
55
56 int
57 srv::handle_fast(int &r, const int a)
58 {
59         r = a + 1;
60         return 0;
61 }
62
63 int
64 srv::handle_slow(int &r, const int a)
65 {
66         usleep(random() % 5000);
67         r = a + 2;
68         return 0;
69 }
70
71 int
72 srv::handle_bigrep(string &r, const size_t len)
73 {
74         r = string((size_t)len, 'x');
75         return 0;
76 }
77
78 srv service;
79
80 void startserver()
81 {
82         server = new rpcs((unsigned int)port);
83         server->reg(22, &srv::handle_22, &service);
84         server->reg(23, &srv::handle_fast, &service);
85         server->reg(24, &srv::handle_slow, &service);
86         server->reg(25, &srv::handle_bigrep, &service);
87 }
88
89 void
90 testmarshall()
91 {
92         marshall m;
93         request_header rh{1,2,3,4,5};
94         m.pack_req_header(rh);
95         VERIFY(m.size()==RPC_HEADER_SZ);
96         int i = 12345;
97         unsigned long long l = 1223344455L;
98         string s = "hallo....";
99         m << i;
100         m << l;
101         m << s;
102
103         char *b;
104         size_t sz;
105         m.take_buf(&b,&sz);
106         VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
107
108         unmarshall un(b,sz);
109         request_header rh1;
110         un.unpack_req_header(&rh1);
111         VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
112         int i1;
113         unsigned long long l1;
114         string s1;
115         un >> i1;
116         un >> l1;
117         un >> s1;
118         VERIFY(un.okdone());
119         VERIFY(i1==i && l1==l && s1==s);
120 }
121
122 void
123 client1(size_t cl)
124 {
125         // test concurrency.
126         size_t which_cl = cl % NUM_CL;
127
128         for(int i = 0; i < 100; i++){
129                 int arg = (random() % 2000);
130                 string rep;
131                 int ret = clients[which_cl]->call(25, rep, arg);
132                 VERIFY(ret == 0);
133                 if ((int)rep.size()!=arg)
134                         cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
135                 VERIFY((int)rep.size() == arg);
136         }
137
138         // test rpc replies coming back not in the order of
139         // the original calls -- i.e. does xid reply dispatch work.
140         for(int i = 0; i < 100; i++){
141                 int which = (random() % 2);
142                 int arg = (random() % 1000);
143                 int rep;
144
145                 auto start = std::chrono::steady_clock::now();
146
147                 int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
148                 auto end = std::chrono::steady_clock::now();
149                 auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
150                 if (ret != 0)
151                         cout << diff << " ms have elapsed!!!" << endl;
152                 VERIFY(ret == 0);
153                 VERIFY(rep == (which ? arg+1 : arg+2));
154         }
155 }
156
157 void
158 client2(size_t cl)
159 {
160         size_t which_cl = cl % NUM_CL;
161
162         time_t t1;
163         time(&t1);
164
165         while(time(0) - t1 < 10){
166                 int arg = (random() % 2000);
167                 string rep;
168                 int ret = clients[which_cl]->call(25, rep, arg);
169                 if ((int)rep.size()!=arg)
170                         cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
171                 VERIFY((int)rep.size() == arg);
172         }
173 }
174
175 void
176 client3(void *xx)
177 {
178         rpcc *c = (rpcc *) xx;
179
180         for(int i = 0; i < 4; i++){
181                 int rep;
182                 int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
183                 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
184         }
185 }
186
187
188 void
189 simple_tests(rpcc *c)
190 {
191         cout << "simple_tests" << endl;
192         // an RPC call to procedure #22.
193         // rpcc::call() looks at the argument types to decide how
194         // to marshall the RPC call packet, and how to unmarshall
195         // the reply packet.
196         string rep;
197         int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
198         VERIFY(intret == 0); // this is what handle_22 returns
199         VERIFY(rep == "hello goodbye");
200         cout << "   -- string concat RPC .. ok" << endl;
201
202         // small request, big reply (perhaps req via UDP, reply via TCP)
203         intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
204         VERIFY(intret == 0);
205         VERIFY(rep.size() == 70000);
206         cout << "   -- small request, big reply .. ok" << endl;
207
208         // specify a timeout value to an RPC that should succeed (udp)
209         int xx = 0;
210         intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
211         VERIFY(intret == 0 && xx == 78);
212         cout << "   -- no spurious timeout .. ok" << endl;
213
214         // specify a timeout value to an RPC that should succeed (tcp)
215         {
216                 string arg(1000, 'x');
217                 string rep2;
218                 c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
219                 VERIFY(rep2.size() == 1001);
220                 cout << "   -- no spurious timeout .. ok" << endl;
221         }
222
223         // huge RPC
224         string big(1000000, 'x');
225         intret = c->call(22, rep, big, (string)"z");
226         VERIFY(rep.size() == 1000001);
227         cout << "   -- huge 1M rpc request .. ok" << endl;
228
229         // specify a timeout value to an RPC that should timeout (udp)
230     string non_existent = "127.0.0.1:7661";
231         rpcc *c1 = new rpcc(non_existent);
232         time_t t0 = time(0);
233         intret = c1->bind(rpcc::to(3000));
234         time_t t1 = time(0);
235         VERIFY(intret < 0 && (t1 - t0) <= 4);
236         cout << "   -- rpc timeout .. ok" << endl;
237         cout << "simple_tests OK" << endl;
238 }
239
240 void 
241 concurrent_test(size_t nt)
242 {
243         // create threads that make lots of calls in parallel,
244         // to test thread synchronization for concurrent calls
245         // and dispatches.
246         cout << "start concurrent_test (" << nt << " threads) ...";
247
248     vector<thread> th(nt);
249
250         for(size_t i = 0; i < nt; i++)
251         th[i] = thread(client1, i);
252
253         for(size_t i = 0; i < nt; i++)
254         th[i].join();
255
256         cout << " OK" << endl;
257 }
258
259 void 
260 lossy_test()
261 {
262         cout << "start lossy_test ...";
263         VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
264
265         if (server) {
266                 delete server;
267                 startserver();
268         }
269
270         for (int i = 0; i < NUM_CL; i++) {
271                 delete clients[i];
272                 clients[i] = new rpcc(dst);
273                 VERIFY(clients[i]->bind()==0);
274         }
275
276         size_t nt = 1;
277
278     vector<thread> th(nt);
279
280         for(size_t i = 0; i < nt; i++)
281         th[i] = thread(client2, i);
282
283         for(size_t i = 0; i < nt; i++)
284         th[i].join();
285
286         cout << ".. OK" << endl;
287         VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
288 }
289
290 void 
291 failure_test()
292 {
293         rpcc *client1;
294         rpcc *client = clients[0];
295
296         cout << "failure_test" << endl;
297
298         delete server;
299
300         client1 = new rpcc(dst);
301         VERIFY (client1->bind(rpcc::to(3000)) < 0);
302         cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
303
304         delete client1;
305
306         startserver();
307
308         string rep;
309         int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
310         VERIFY(intret == rpc_const::oldsrv_failure);
311         cout << "   -- call recovered server with old client .. failed ok" << endl;
312
313         delete client;
314
315         clients[0] = client = new rpcc(dst);
316         VERIFY (client->bind() >= 0);
317         VERIFY (client->bind() < 0);
318
319         intret = client->call(22, rep, (string)"hello", (string)" goodbye");
320         VERIFY(intret == 0);
321         VERIFY(rep == "hello goodbye");
322
323         cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
324
325
326         size_t nt = 10;
327         cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
328
329     vector<thread> th(nt);
330
331         for(size_t i = 0; i < nt; i++)
332         th[i] = thread(client3, client);
333
334         for(size_t i = 0; i < nt; i++)
335         th[i].join();
336
337         cout << "ok" << endl;
338
339         delete server;
340         delete client;
341
342         startserver();
343         clients[0] = client = new rpcc(dst);
344         VERIFY (client->bind() >= 0);
345         cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
346
347         cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
348
349         for(size_t i = 0; i < nt; i++)
350         th[i] = thread(client3, client);
351
352         for(size_t i = 0; i < nt; i++)
353         th[i].join();
354
355         cout << "ok" << endl;
356
357         cout << "failure_test OK" << endl;
358 }
359
360 int
361 main(int argc, char *argv[])
362 {
363
364         setvbuf(stdout, NULL, _IONBF, 0);
365         setvbuf(stderr, NULL, _IONBF, 0);
366         int debug_level = 0;
367
368         bool isclient = false;
369         bool isserver = false;
370
371         srandom((uint32_t)getpid());
372         port = 20000 + (getpid() % 10000);
373
374         int ch = 0;
375         while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
376                 switch (ch) {
377                         case 'c':
378                                 isclient = true;
379                                 break;
380                         case 's':
381                                 isserver = true;
382                                 break;
383                         case 'd':
384                                 debug_level = atoi(optarg);
385                                 break;
386                         case 'p':
387                                 port = atoi(optarg);
388                                 break;
389                         case 'l':
390                                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
391                 break;
392                         default:
393                                 break;
394                 }
395         }
396
397         if (!isserver && !isclient)  {
398                 isserver = isclient = true;
399         }
400
401         if (debug_level > 0) {
402                 JSL_DEBUG_LEVEL = debug_level;
403                 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
404         }
405
406         testmarshall();
407
408         if (isserver) {
409                 cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
410                 startserver();
411         }
412
413         if (isclient) {
414                 // server's address.
415         dst = "127.0.0.1:" + std::to_string(port);
416
417
418                 // start the client.  bind it to the server.
419                 // starts a thread to listen for replies and hand them to
420                 // the correct waiting caller thread. there should probably
421                 // be only one rpcc per process. you probably need one
422                 // rpcc per server.
423                 for (int i = 0; i < NUM_CL; i++) {
424                         clients[i] = new rpcc(dst);
425                         VERIFY (clients[i]->bind() == 0);
426                 }
427
428                 simple_tests(clients[0]);
429                 concurrent_test(10);
430                 lossy_test();
431                 if (isserver) {
432                         failure_test();
433                 }
434
435                 cout << "rpctest OK" << endl;
436
437                 exit(0);
438         }
439
440         while (1) {
441                 sleep(1);
442         }
443 }