Lots more clean-ups
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <sys/types.h>
9 #include <unistd.h>
10
11 #define NUM_CL 2
12
13 char log_thread_prefix = 'r';
14
15 rpcs *server;  // server rpc object
16 rpcc *clients[NUM_CL];  // client rpc object
17 string dst; //server's ip address
18 in_port_t port;
19
20 // server-side handlers. they must be methods of some class
21 // to simplify rpcs::reg(). a server process can have handlers
22 // from multiple classes.
23 class srv {
24     public:
25         int handle_22(string & r, const string a, const string b);
26         int handle_fast(int &r, const int a);
27         int handle_slow(int &r, const int a);
28         int handle_bigrep(string &r, const size_t a);
29 };
30
31 // a handler. a and b are arguments, r is the result.
32 // there can be multiple arguments but only one result.
33 // the caller also gets to see the int return value
34 // as the return value from rpcc::call().
35 // rpcs::reg() decides how to unmarshall by looking
36 // at these argument types, so this function definition
37 // does what a .x file does in SunRPC.
38 int
39 srv::handle_22(string &r, const string a, string b)
40 {
41     r = a + b;
42     return 0;
43 }
44
45 int
46 srv::handle_fast(int &r, const int a)
47 {
48     r = a + 1;
49     return 0;
50 }
51
52 int
53 srv::handle_slow(int &r, const int a)
54 {
55     usleep(random() % 500);
56     r = a + 2;
57     return 0;
58 }
59
60 int
61 srv::handle_bigrep(string &r, const size_t len)
62 {
63     r = string((size_t)len, 'x');
64     return 0;
65 }
66
67 srv service;
68
69 void startserver()
70 {
71     server = new rpcs(port);
72     server->reg(22, &srv::handle_22, &service);
73     server->reg(23, &srv::handle_fast, &service);
74     server->reg(24, &srv::handle_slow, &service);
75     server->reg(25, &srv::handle_bigrep, &service);
76 }
77
78 void
79 testmarshall()
80 {
81     marshall m;
82     request_header rh{1,2,3,4,5};
83     m.pack_header(rh);
84     VERIFY(((string)m).size()==RPC_HEADER_SZ);
85     int i = 12345;
86     unsigned long long l = 1223344455L;
87     string s = "hallo....";
88     m << i;
89     m << l;
90     m << s;
91
92     string b = m;
93     VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
94
95     unmarshall un(b, true);
96     request_header rh1;
97     un.unpack_header(rh1);
98     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
99     int i1;
100     unsigned long long l1;
101     string s1;
102     un >> i1;
103     un >> l1;
104     un >> s1;
105     VERIFY(un.okdone());
106     VERIFY(i1==i && l1==l && s1==s);
107 }
108
109 void
110 client1(size_t cl)
111 {
112     // test concurrency.
113     size_t which_cl = cl % NUM_CL;
114
115     for(int i = 0; i < 100; i++){
116         int arg = (random() % 2000);
117         string rep;
118         int ret = clients[which_cl]->call(25, rep, arg);
119         VERIFY(ret == 0);
120         if ((int)rep.size()!=arg)
121             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
122         VERIFY((int)rep.size() == arg);
123     }
124
125     // test rpc replies coming back not in the order of
126     // the original calls -- i.e. does xid reply dispatch work.
127     for(int i = 0; i < 100; i++){
128         int which = (random() % 2);
129         int arg = (random() % 1000);
130         int rep;
131
132         auto start = steady_clock::now();
133
134         int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
135         auto end = steady_clock::now();
136         auto diff = duration_cast<milliseconds>(end - start).count();
137         if (ret != 0)
138             cout << diff << " ms have elapsed!!!" << endl;
139         VERIFY(ret == 0);
140         VERIFY(rep == (which ? arg+1 : arg+2));
141     }
142 }
143
144 void
145 client2(size_t cl)
146 {
147     size_t which_cl = cl % NUM_CL;
148
149     time_t t1;
150     time(&t1);
151
152     while(time(0) - t1 < 10){
153         int arg = (random() % 2000);
154         string rep;
155         int ret = clients[which_cl]->call(25, rep, arg);
156         if ((int)rep.size()!=arg)
157             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
158         VERIFY((int)rep.size() == arg);
159     }
160 }
161
162 void
163 client3(void *xx)
164 {
165     rpcc *c = (rpcc *) xx;
166
167     for(int i = 0; i < 4; i++){
168         int rep = 0;
169         int ret = c->call_timeout(24, milliseconds(300), rep, i);
170         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
171     }
172 }
173
174
175 void
176 simple_tests(rpcc *c)
177 {
178     cout << "simple_tests" << endl;
179     // an RPC call to procedure #22.
180     // rpcc::call() looks at the argument types to decide how
181     // to marshall the RPC call packet, and how to unmarshall
182     // the reply packet.
183     string rep;
184     int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
185     VERIFY(intret == 0); // this is what handle_22 returns
186     VERIFY(rep == "hello goodbye");
187     cout << "   -- string concat RPC .. ok" << endl;
188
189     // small request, big reply (perhaps req via UDP, reply via TCP)
190     intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
191     VERIFY(intret == 0);
192     VERIFY(rep.size() == 70000);
193     cout << "   -- small request, big reply .. ok" << endl;
194
195     // specify a timeout value to an RPC that should succeed (udp)
196     int xx = 0;
197     intret = c->call_timeout(23, milliseconds(300), xx, 77);
198     VERIFY(intret == 0 && xx == 78);
199     cout << "   -- no spurious timeout .. ok" << endl;
200
201     // specify a timeout value to an RPC that should succeed (tcp)
202     {
203         string arg(1000, 'x');
204         string rep2;
205         c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
206         VERIFY(rep2.size() == 1001);
207         cout << "   -- no spurious timeout .. ok" << endl;
208     }
209
210     // huge RPC
211     string big(1000000, 'x');
212     intret = c->call(22, rep, big, (string)"z");
213     VERIFY(rep.size() == 1000001);
214     cout << "   -- huge 1M rpc request .. ok" << endl;
215
216     // specify a timeout value to an RPC that should timeout (udp)
217     string non_existent = "127.0.0.1:7661";
218     rpcc *c1 = new rpcc(non_existent);
219     time_t t0 = time(0);
220     intret = c1->bind(milliseconds(300));
221     time_t t1 = time(0);
222     VERIFY(intret < 0 && (t1 - t0) <= 4);
223     cout << "   -- rpc timeout .. ok" << endl;
224     cout << "simple_tests OK" << endl;
225 }
226
227 void 
228 concurrent_test(size_t nt)
229 {
230     // create threads that make lots of calls in parallel,
231     // to test thread synchronization for concurrent calls
232     // and dispatches.
233     cout << "start concurrent_test (" << nt << " threads) ...";
234
235     vector<thread> th(nt);
236
237     for(size_t i = 0; i < nt; i++)
238         th[i] = thread(client1, i);
239
240     for(size_t i = 0; i < nt; i++)
241         th[i].join();
242
243     cout << " OK" << endl;
244 }
245
246 void 
247 lossy_test()
248 {
249     cout << "start lossy_test ...";
250     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
251
252     if (server) {
253         delete server;
254         startserver();
255     }
256
257     for (int i = 0; i < NUM_CL; i++) {
258         delete clients[i];
259         clients[i] = new rpcc(dst);
260         VERIFY(clients[i]->bind()==0);
261     }
262
263     size_t nt = 1;
264
265     vector<thread> th(nt);
266
267     for(size_t i = 0; i < nt; i++)
268         th[i] = thread(client2, i);
269
270     for(size_t i = 0; i < nt; i++)
271         th[i].join();
272
273     cout << ".. OK" << endl;
274     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
275 }
276
277 void 
278 failure_test()
279 {
280     rpcc *client1;
281     rpcc *client = clients[0];
282
283     cout << "failure_test" << endl;
284
285     delete server;
286
287     client1 = new rpcc(dst);
288     VERIFY (client1->bind(milliseconds(3000)) < 0);
289     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
290
291     delete client1;
292
293     startserver();
294
295     string rep;
296     int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
297     VERIFY(intret == rpc_const::oldsrv_failure);
298     cout << "   -- call recovered server with old client .. failed ok" << endl;
299
300     delete client;
301
302     clients[0] = client = new rpcc(dst);
303     VERIFY (client->bind() >= 0);
304     VERIFY (client->bind() < 0);
305
306     intret = client->call(22, rep, (string)"hello", (string)" goodbye");
307     VERIFY(intret == 0);
308     VERIFY(rep == "hello goodbye");
309
310     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
311
312
313     size_t nt = 10;
314     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
315
316     vector<thread> th(nt);
317
318     for(size_t i = 0; i < nt; i++)
319         th[i] = thread(client3, client);
320
321     for(size_t i = 0; i < nt; i++)
322         th[i].join();
323
324     cout << "ok" << endl;
325
326     delete server;
327     delete client;
328
329     startserver();
330     clients[0] = client = new rpcc(dst);
331     VERIFY (client->bind() >= 0);
332     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
333
334     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
335
336     for(size_t i = 0; i < nt; i++)
337         th[i] = thread(client3, client);
338
339     for(size_t i = 0; i < nt; i++)
340         th[i].join();
341
342     cout << "ok" << endl;
343
344     cout << "failure_test OK" << endl;
345 }
346
347 int
348 main(int argc, char *argv[])
349 {
350
351     setvbuf(stdout, NULL, _IONBF, 0);
352     setvbuf(stderr, NULL, _IONBF, 0);
353     int debug_level = 0;
354
355     bool isclient = false;
356     bool isserver = false;
357
358     srandom((uint32_t)getpid());
359     port = 20000 + (getpid() % 10000);
360
361     int ch = 0;
362     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
363         switch (ch) {
364             case 'c':
365                 isclient = true;
366                 break;
367             case 's':
368                 isserver = true;
369                 break;
370             case 'd':
371                 debug_level = atoi(optarg);
372                 break;
373             case 'p':
374                 port = (in_port_t)atoi(optarg);
375                 break;
376             case 'l':
377                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
378                 break;
379             default:
380                 break;
381         }
382     }
383
384     if (!isserver && !isclient)  {
385         isserver = isclient = true;
386     }
387
388     if (debug_level > 0) {
389         DEBUG_LEVEL = debug_level;
390         IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
391     }
392
393     testmarshall();
394
395     if (isserver) {
396         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
397         startserver();
398     }
399
400     if (isclient) {
401         // server's address.
402         dst = "127.0.0.1:" + to_string(port);
403
404
405         // start the client.  bind it to the server.
406         // starts a thread to listen for replies and hand them to
407         // the correct waiting caller thread. there should probably
408         // be only one rpcc per process. you probably need one
409         // rpcc per server.
410         for (int i = 0; i < NUM_CL; i++) {
411             clients[i] = new rpcc(dst);
412             VERIFY (clients[i]->bind() == 0);
413         }
414
415         simple_tests(clients[0]);
416         concurrent_test(10);
417         lossy_test();
418         if (isserver) {
419             failure_test();
420         }
421
422         cout << "rpctest OK" << endl;
423
424         exit(0);
425     }
426
427     while (1) {
428         usleep(100000);
429     }
430 }