Fixed a race condition!
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <sys/types.h>
9 #include <unistd.h>
10
11 #define NUM_CL 2
12
13 char log_thread_prefix = 'r';
14
15 rpcs *server;  // server rpc object
16 rpcc *clients[NUM_CL];  // client rpc object
17 string dst; //server's ip address
18 in_port_t port;
19
20 // server-side handlers. they must be methods of some class
21 // to simplify rpcs::reg(). a server process can have handlers
22 // from multiple classes.
23 class srv {
24     public:
25         int handle_22(string & r, const string a, const string b);
26         int handle_fast(int &r, const int a);
27         int handle_slow(int &r, const int a);
28         int handle_bigrep(string &r, const size_t a);
29 };
30
31 // a handler. a and b are arguments, r is the result.
32 // there can be multiple arguments but only one result.
33 // the caller also gets to see the int return value
34 // as the return value from rpcc::call().
35 // rpcs::reg() decides how to unmarshall by looking
36 // at these argument types, so this function definition
37 // does what a .x file does in SunRPC.
38 int
39 srv::handle_22(string &r, const string a, string b)
40 {
41     r = a + b;
42     return 0;
43 }
44
45 int
46 srv::handle_fast(int &r, const int a)
47 {
48     r = a + 1;
49     return 0;
50 }
51
52 int
53 srv::handle_slow(int &r, const int a)
54 {
55     usleep(random() % 500);
56     r = a + 2;
57     return 0;
58 }
59
60 int
61 srv::handle_bigrep(string &r, const size_t len)
62 {
63     r = string((size_t)len, 'x');
64     return 0;
65 }
66
67 srv service;
68
69 void startserver()
70 {
71     server = new rpcs(port);
72     server->reg(22, &srv::handle_22, &service);
73     server->reg(23, &srv::handle_fast, &service);
74     server->reg(24, &srv::handle_slow, &service);
75     server->reg(25, &srv::handle_bigrep, &service);
76     server->start();
77 }
78
79 void
80 testmarshall()
81 {
82     marshall m;
83     request_header rh{1,2,3,4,5};
84     m.pack_header(rh);
85     VERIFY(((string)m).size()==RPC_HEADER_SZ);
86     int i = 12345;
87     unsigned long long l = 1223344455L;
88     string s = "hallo....";
89     m << i;
90     m << l;
91     m << s;
92
93     string b = m;
94     VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
95
96     unmarshall un(b, true);
97     request_header rh1;
98     un.unpack_header(rh1);
99     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
100     int i1;
101     unsigned long long l1;
102     string s1;
103     un >> i1;
104     un >> l1;
105     un >> s1;
106     VERIFY(un.okdone());
107     VERIFY(i1==i && l1==l && s1==s);
108 }
109
110 void
111 client1(size_t cl)
112 {
113     // test concurrency.
114     size_t which_cl = cl % NUM_CL;
115
116     for(int i = 0; i < 100; i++){
117         int arg = (random() % 2000);
118         string rep;
119         int ret = clients[which_cl]->call(25, rep, arg);
120         VERIFY(ret == 0);
121         if ((int)rep.size()!=arg)
122             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
123         VERIFY((int)rep.size() == arg);
124     }
125
126     // test rpc replies coming back not in the order of
127     // the original calls -- i.e. does xid reply dispatch work.
128     for(int i = 0; i < 100; i++){
129         int which = (random() % 2);
130         int arg = (random() % 1000);
131         int rep;
132
133         auto start = steady_clock::now();
134
135         int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
136         auto end = steady_clock::now();
137         auto diff = duration_cast<milliseconds>(end - start).count();
138         if (ret != 0)
139             cout << diff << " ms have elapsed!!!" << endl;
140         VERIFY(ret == 0);
141         VERIFY(rep == (which ? arg+1 : arg+2));
142     }
143 }
144
145 void
146 client2(size_t cl)
147 {
148     size_t which_cl = cl % NUM_CL;
149
150     time_t t1;
151     time(&t1);
152
153     while(time(0) - t1 < 10){
154         int arg = (random() % 2000);
155         string rep;
156         int ret = clients[which_cl]->call(25, rep, arg);
157         if ((int)rep.size()!=arg)
158             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
159         VERIFY((int)rep.size() == arg);
160     }
161 }
162
163 void
164 client3(void *xx)
165 {
166     rpcc *c = (rpcc *) xx;
167
168     for(int i = 0; i < 4; i++){
169         int rep = 0;
170         int ret = c->call_timeout(24, milliseconds(300), rep, i);
171         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
172     }
173 }
174
175
176 void
177 simple_tests(rpcc *c)
178 {
179     cout << "simple_tests" << endl;
180     // an RPC call to procedure #22.
181     // rpcc::call() looks at the argument types to decide how
182     // to marshall the RPC call packet, and how to unmarshall
183     // the reply packet.
184     string rep;
185     int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
186     VERIFY(intret == 0); // this is what handle_22 returns
187     VERIFY(rep == "hello goodbye");
188     cout << "   -- string concat RPC .. ok" << endl;
189
190     // small request, big reply (perhaps req via UDP, reply via TCP)
191     intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
192     VERIFY(intret == 0);
193     VERIFY(rep.size() == 70000);
194     cout << "   -- small request, big reply .. ok" << endl;
195
196     // specify a timeout value to an RPC that should succeed (udp)
197     int xx = 0;
198     intret = c->call_timeout(23, milliseconds(300), xx, 77);
199     VERIFY(intret == 0 && xx == 78);
200     cout << "   -- no spurious timeout .. ok" << endl;
201
202     // specify a timeout value to an RPC that should succeed (tcp)
203     {
204         string arg(1000, 'x');
205         string rep2;
206         c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
207         VERIFY(rep2.size() == 1001);
208         cout << "   -- no spurious timeout .. ok" << endl;
209     }
210
211     // huge RPC
212     string big(1000000, 'x');
213     intret = c->call(22, rep, big, (string)"z");
214     VERIFY(rep.size() == 1000001);
215     cout << "   -- huge 1M rpc request .. ok" << endl;
216
217     // specify a timeout value to an RPC that should timeout (udp)
218     string non_existent = "127.0.0.1:7661";
219     rpcc *c1 = new rpcc(non_existent);
220     time_t t0 = time(0);
221     intret = c1->bind(milliseconds(300));
222     time_t t1 = time(0);
223     VERIFY(intret < 0 && (t1 - t0) <= 4);
224     cout << "   -- rpc timeout .. ok" << endl;
225     cout << "simple_tests OK" << endl;
226 }
227
228 void 
229 concurrent_test(size_t nt)
230 {
231     // create threads that make lots of calls in parallel,
232     // to test thread synchronization for concurrent calls
233     // and dispatches.
234     cout << "start concurrent_test (" << nt << " threads) ...";
235
236     vector<thread> th(nt);
237
238     for(size_t i = 0; i < nt; i++)
239         th[i] = thread(client1, i);
240
241     for(size_t i = 0; i < nt; i++)
242         th[i].join();
243
244     cout << " OK" << endl;
245 }
246
247 void 
248 lossy_test()
249 {
250     cout << "start lossy_test ...";
251     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
252
253     if (server) {
254         delete server;
255         startserver();
256     }
257
258     for (int i = 0; i < NUM_CL; i++) {
259         delete clients[i];
260         clients[i] = new rpcc(dst);
261         VERIFY(clients[i]->bind()==0);
262     }
263
264     size_t nt = 1;
265
266     vector<thread> th(nt);
267
268     for(size_t i = 0; i < nt; i++)
269         th[i] = thread(client2, i);
270
271     for(size_t i = 0; i < nt; i++)
272         th[i].join();
273
274     cout << ".. OK" << endl;
275     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
276 }
277
278 void 
279 failure_test()
280 {
281     rpcc *client1;
282     rpcc *client = clients[0];
283
284     cout << "failure_test" << endl;
285
286     delete server;
287
288     client1 = new rpcc(dst);
289     VERIFY (client1->bind(milliseconds(3000)) < 0);
290     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
291
292     delete client1;
293
294     startserver();
295
296     string rep;
297     int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
298     VERIFY(intret == rpc_const::oldsrv_failure);
299     cout << "   -- call recovered server with old client .. failed ok" << endl;
300
301     delete client;
302
303     clients[0] = client = new rpcc(dst);
304     VERIFY (client->bind() >= 0);
305     VERIFY (client->bind() < 0);
306
307     intret = client->call(22, rep, (string)"hello", (string)" goodbye");
308     VERIFY(intret == 0);
309     VERIFY(rep == "hello goodbye");
310
311     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
312
313
314     size_t nt = 10;
315     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
316
317     vector<thread> th(nt);
318
319     for(size_t i = 0; i < nt; i++)
320         th[i] = thread(client3, client);
321
322     for(size_t i = 0; i < nt; i++)
323         th[i].join();
324
325     cout << "ok" << endl;
326
327     delete server;
328     delete client;
329
330     startserver();
331     clients[0] = client = new rpcc(dst);
332     VERIFY (client->bind() >= 0);
333     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
334
335     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
336
337     for(size_t i = 0; i < nt; i++)
338         th[i] = thread(client3, client);
339
340     for(size_t i = 0; i < nt; i++)
341         th[i].join();
342
343     cout << "ok" << endl;
344
345     cout << "failure_test OK" << endl;
346 }
347
348 int
349 main(int argc, char *argv[])
350 {
351
352     setvbuf(stdout, NULL, _IONBF, 0);
353     setvbuf(stderr, NULL, _IONBF, 0);
354     int debug_level = 0;
355
356     bool isclient = false;
357     bool isserver = false;
358
359     srandom((uint32_t)getpid());
360     port = 20000 + (getpid() % 10000);
361
362     int ch = 0;
363     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
364         switch (ch) {
365             case 'c':
366                 isclient = true;
367                 break;
368             case 's':
369                 isserver = true;
370                 break;
371             case 'd':
372                 debug_level = atoi(optarg);
373                 break;
374             case 'p':
375                 port = (in_port_t)atoi(optarg);
376                 break;
377             case 'l':
378                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
379                 break;
380             default:
381                 break;
382         }
383     }
384
385     if (!isserver && !isclient)  {
386         isserver = isclient = true;
387     }
388
389     if (debug_level > 0) {
390         DEBUG_LEVEL = debug_level;
391         IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
392     }
393
394     testmarshall();
395
396     if (isserver) {
397         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
398         startserver();
399     }
400
401     if (isclient) {
402         // server's address.
403         dst = "127.0.0.1:" + to_string(port);
404
405
406         // start the client.  bind it to the server.
407         // starts a thread to listen for replies and hand them to
408         // the correct waiting caller thread. there should probably
409         // be only one rpcc per process. you probably need one
410         // rpcc per server.
411         for (int i = 0; i < NUM_CL; i++) {
412             clients[i] = new rpcc(dst);
413             VERIFY (clients[i]->bind() == 0);
414         }
415
416         simple_tests(clients[0]);
417         concurrent_test(10);
418         lossy_test();
419         if (isserver) {
420             failure_test();
421         }
422
423         cout << "rpctest OK" << endl;
424
425         exit(0);
426     }
427
428     while (1) {
429         usleep(100000);
430     }
431 }