MOAR TEMPLATE MAGIC
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <sys/types.h>
9 #include <unistd.h>
10
11 #define NUM_CL 2
12
13 char log_thread_prefix = 'r';
14
15 rpcs *server;  // server rpc object
16 rpcc *clients[NUM_CL];  // client rpc object
17 string dst; //server's ip address
18 int port;
19
20 // server-side handlers. they must be methods of some class
21 // to simplify rpcs::reg(). a server process can have handlers
22 // from multiple classes.
23 class srv {
24     public:
25         int handle_22(string & r, const string a, const string b);
26         int handle_fast(int &r, const int a);
27         int handle_slow(int &r, const int a);
28         int handle_bigrep(string &r, const size_t a);
29 };
30
31 // a handler. a and b are arguments, r is the result.
32 // there can be multiple arguments but only one result.
33 // the caller also gets to see the int return value
34 // as the return value from rpcc::call().
35 // rpcs::reg() decides how to unmarshall by looking
36 // at these argument types, so this function definition
37 // does what a .x file does in SunRPC.
38 int
39 srv::handle_22(string &r, const string a, string b)
40 {
41     r = a + b;
42     return 0;
43 }
44
45 int
46 srv::handle_fast(int &r, const int a)
47 {
48     r = a + 1;
49     return 0;
50 }
51
52 int
53 srv::handle_slow(int &r, const int a)
54 {
55     usleep(random() % 5000);
56     r = a + 2;
57     return 0;
58 }
59
60 int
61 srv::handle_bigrep(string &r, const size_t len)
62 {
63     r = string((size_t)len, 'x');
64     return 0;
65 }
66
67 srv service;
68
69 void startserver()
70 {
71     server = new rpcs((unsigned int)port);
72     server->reg(22, &srv::handle_22, &service);
73     server->reg(23, &srv::handle_fast, &service);
74     server->reg(24, &srv::handle_slow, &service);
75     server->reg(25, &srv::handle_bigrep, &service);
76 }
77
78 void
79 testmarshall()
80 {
81     marshall m;
82     request_header rh{1,2,3,4,5};
83     m.pack_req_header(rh);
84     VERIFY(m.size()==RPC_HEADER_SZ);
85     int i = 12345;
86     unsigned long long l = 1223344455L;
87     string s = "hallo....";
88     m << i;
89     m << l;
90     m << s;
91
92     char *b;
93     size_t sz;
94     m.take_buf(&b,&sz);
95     VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
96
97     unmarshall un(b,sz);
98     request_header rh1;
99     un.unpack_req_header(&rh1);
100     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
101     int i1;
102     unsigned long long l1;
103     string s1;
104     un >> i1;
105     un >> l1;
106     un >> s1;
107     VERIFY(un.okdone());
108     VERIFY(i1==i && l1==l && s1==s);
109 }
110
111 void
112 client1(size_t cl)
113 {
114     // test concurrency.
115     size_t which_cl = cl % NUM_CL;
116
117     for(int i = 0; i < 100; i++){
118         int arg = (random() % 2000);
119         string rep;
120         int ret = clients[which_cl]->call(25, rep, arg);
121         VERIFY(ret == 0);
122         if ((int)rep.size()!=arg)
123             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
124         VERIFY((int)rep.size() == arg);
125     }
126
127     // test rpc replies coming back not in the order of
128     // the original calls -- i.e. does xid reply dispatch work.
129     for(int i = 0; i < 100; i++){
130         int which = (random() % 2);
131         int arg = (random() % 1000);
132         int rep;
133
134         auto start = std::chrono::steady_clock::now();
135
136         int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
137         auto end = std::chrono::steady_clock::now();
138         auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
139         if (ret != 0)
140             cout << diff << " ms have elapsed!!!" << endl;
141         VERIFY(ret == 0);
142         VERIFY(rep == (which ? arg+1 : arg+2));
143     }
144 }
145
146 void
147 client2(size_t cl)
148 {
149     size_t which_cl = cl % NUM_CL;
150
151     time_t t1;
152     time(&t1);
153
154     while(time(0) - t1 < 10){
155         int arg = (random() % 2000);
156         string rep;
157         int ret = clients[which_cl]->call(25, rep, arg);
158         if ((int)rep.size()!=arg)
159             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
160         VERIFY((int)rep.size() == arg);
161     }
162 }
163
164 void
165 client3(void *xx)
166 {
167     rpcc *c = (rpcc *) xx;
168
169     for(int i = 0; i < 4; i++){
170         int rep = 0;
171         int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
172         VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
173     }
174 }
175
176
177 void
178 simple_tests(rpcc *c)
179 {
180     cout << "simple_tests" << endl;
181     // an RPC call to procedure #22.
182     // rpcc::call() looks at the argument types to decide how
183     // to marshall the RPC call packet, and how to unmarshall
184     // the reply packet.
185     string rep;
186     int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
187     VERIFY(intret == 0); // this is what handle_22 returns
188     VERIFY(rep == "hello goodbye");
189     cout << "   -- string concat RPC .. ok" << endl;
190
191     // small request, big reply (perhaps req via UDP, reply via TCP)
192     intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
193     VERIFY(intret == 0);
194     VERIFY(rep.size() == 70000);
195     cout << "   -- small request, big reply .. ok" << endl;
196
197     // specify a timeout value to an RPC that should succeed (udp)
198     int xx = 0;
199     intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
200     VERIFY(intret == 0 && xx == 78);
201     cout << "   -- no spurious timeout .. ok" << endl;
202
203     // specify a timeout value to an RPC that should succeed (tcp)
204     {
205         string arg(1000, 'x');
206         string rep2;
207         c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
208         VERIFY(rep2.size() == 1001);
209         cout << "   -- no spurious timeout .. ok" << endl;
210     }
211
212     // huge RPC
213     string big(1000000, 'x');
214     intret = c->call(22, rep, big, (string)"z");
215     VERIFY(rep.size() == 1000001);
216     cout << "   -- huge 1M rpc request .. ok" << endl;
217
218     // specify a timeout value to an RPC that should timeout (udp)
219     string non_existent = "127.0.0.1:7661";
220     rpcc *c1 = new rpcc(non_existent);
221     time_t t0 = time(0);
222     intret = c1->bind(rpcc::to(3000));
223     time_t t1 = time(0);
224     VERIFY(intret < 0 && (t1 - t0) <= 4);
225     cout << "   -- rpc timeout .. ok" << endl;
226     cout << "simple_tests OK" << endl;
227 }
228
229 void 
230 concurrent_test(size_t nt)
231 {
232     // create threads that make lots of calls in parallel,
233     // to test thread synchronization for concurrent calls
234     // and dispatches.
235     cout << "start concurrent_test (" << nt << " threads) ...";
236
237     vector<thread> th(nt);
238
239     for(size_t i = 0; i < nt; i++)
240         th[i] = thread(client1, i);
241
242     for(size_t i = 0; i < nt; i++)
243         th[i].join();
244
245     cout << " OK" << endl;
246 }
247
248 void 
249 lossy_test()
250 {
251     cout << "start lossy_test ...";
252     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
253
254     if (server) {
255         delete server;
256         startserver();
257     }
258
259     for (int i = 0; i < NUM_CL; i++) {
260         delete clients[i];
261         clients[i] = new rpcc(dst);
262         VERIFY(clients[i]->bind()==0);
263     }
264
265     size_t nt = 1;
266
267     vector<thread> th(nt);
268
269     for(size_t i = 0; i < nt; i++)
270         th[i] = thread(client2, i);
271
272     for(size_t i = 0; i < nt; i++)
273         th[i].join();
274
275     cout << ".. OK" << endl;
276     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
277 }
278
279 void 
280 failure_test()
281 {
282     rpcc *client1;
283     rpcc *client = clients[0];
284
285     cout << "failure_test" << endl;
286
287     delete server;
288
289     client1 = new rpcc(dst);
290     VERIFY (client1->bind(rpcc::to(3000)) < 0);
291     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
292
293     delete client1;
294
295     startserver();
296
297     string rep;
298     int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
299     VERIFY(intret == rpc_const::oldsrv_failure);
300     cout << "   -- call recovered server with old client .. failed ok" << endl;
301
302     delete client;
303
304     clients[0] = client = new rpcc(dst);
305     VERIFY (client->bind() >= 0);
306     VERIFY (client->bind() < 0);
307
308     intret = client->call(22, rep, (string)"hello", (string)" goodbye");
309     VERIFY(intret == 0);
310     VERIFY(rep == "hello goodbye");
311
312     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
313
314
315     size_t nt = 10;
316     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
317
318     vector<thread> th(nt);
319
320     for(size_t i = 0; i < nt; i++)
321         th[i] = thread(client3, client);
322
323     for(size_t i = 0; i < nt; i++)
324         th[i].join();
325
326     cout << "ok" << endl;
327
328     delete server;
329     delete client;
330
331     startserver();
332     clients[0] = client = new rpcc(dst);
333     VERIFY (client->bind() >= 0);
334     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
335
336     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
337
338     for(size_t i = 0; i < nt; i++)
339         th[i] = thread(client3, client);
340
341     for(size_t i = 0; i < nt; i++)
342         th[i].join();
343
344     cout << "ok" << endl;
345
346     cout << "failure_test OK" << endl;
347 }
348
349 int
350 main(int argc, char *argv[])
351 {
352
353     setvbuf(stdout, NULL, _IONBF, 0);
354     setvbuf(stderr, NULL, _IONBF, 0);
355     int debug_level = 0;
356
357     bool isclient = false;
358     bool isserver = false;
359
360     srandom((uint32_t)getpid());
361     port = 20000 + (getpid() % 10000);
362
363     int ch = 0;
364     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
365         switch (ch) {
366             case 'c':
367                 isclient = true;
368                 break;
369             case 's':
370                 isserver = true;
371                 break;
372             case 'd':
373                 debug_level = atoi(optarg);
374                 break;
375             case 'p':
376                 port = atoi(optarg);
377                 break;
378             case 'l':
379                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
380                 break;
381             default:
382                 break;
383         }
384     }
385
386     if (!isserver && !isclient)  {
387         isserver = isclient = true;
388     }
389
390     if (debug_level > 0) {
391         DEBUG_LEVEL = debug_level;
392         IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
393     }
394
395     testmarshall();
396
397     if (isserver) {
398         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
399         startserver();
400     }
401
402     if (isclient) {
403         // server's address.
404         dst = "127.0.0.1:" + std::to_string(port);
405
406
407         // start the client.  bind it to the server.
408         // starts a thread to listen for replies and hand them to
409         // the correct waiting caller thread. there should probably
410         // be only one rpcc per process. you probably need one
411         // rpcc per server.
412         for (int i = 0; i < NUM_CL; i++) {
413             clients[i] = new rpcc(dst);
414             VERIFY (clients[i]->bind() == 0);
415         }
416
417         simple_tests(clients[0]);
418         concurrent_test(10);
419         lossy_test();
420         if (isserver) {
421             failure_test();
422         }
423
424         cout << "rpctest OK" << endl;
425
426         exit(0);
427     }
428
429     while (1) {
430         sleep(1);
431     }
432 }