All sleep calls via std::this_thread
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <unistd.h>
9 #include <string.h>
10 #include "threaded_log.h"
11
12 #define NUM_CL 2
13
14 static rpcs *server;  // server rpc object
15 static rpcc *clients[NUM_CL];  // client rpc object
16 static string * dst; //server's ip address
17 static in_port_t port;
18
19 using std::cout;
20 using std::endl;
21 using namespace std::chrono;
22 using std::vector;
23
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
27 class srv {
28     public:
29         int handle_22(string & r, const string a, const string b);
30         int handle_fast(int & r, const int a);
31         int handle_slow(int & r, const int a);
32         int handle_bigrep(string & r, const size_t a);
33 };
34
35 namespace srv_protocol {
36     using status = rpc_protocol::status;
37     REMOTE_PROCEDURE_BASE(0);
38     REMOTE_PROCEDURE(22, _22, (string &, string, string));
39     REMOTE_PROCEDURE(23, fast, (int &, int));
40     REMOTE_PROCEDURE(24, slow, (int &, int));
41     REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
42 }
43
44 // a handler. a and b are arguments, r is the result.
45 // there can be multiple arguments but only one result.
46 // the caller also gets to see the int return value
47 // as the return value from rpcc::call().
48 // rpcs::reg() decides how to unmarshall by looking
49 // at these argument types, so this function definition
50 // does what a .x file does in SunRPC.
51 int srv::handle_22(string & r, const string a, string b) {
52     r = a + b;
53     return 0;
54 }
55
56 int srv::handle_fast(int & r, const int a) {
57     r = a + 1;
58     return 0;
59 }
60
61 int srv::handle_slow(int & r, const int a) {
62     int us = std::uniform_int_distribution<>(0,500)(global->random_generator);
63     std::this_thread::sleep_for(microseconds(us));
64     r = a + 2;
65     return 0;
66 }
67
68 int srv::handle_bigrep(string & r, const size_t len) {
69     r = string(len, 'x');
70     return 0;
71 }
72
73 static srv service;
74
75 static void startserver() {
76     server = new rpcs(port);
77     server->reg(srv_protocol::_22, &srv::handle_22, &service);
78     server->reg(srv_protocol::fast, &srv::handle_fast, &service);
79     server->reg(srv_protocol::slow, &srv::handle_slow, &service);
80     server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
81     server->start();
82 }
83
84 static void testmarshall() {
85     marshall m;
86     rpc_protocol::request_header rh{1,2,3,4,5};
87     m.write_header(rh);
88     VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
89     int i = 12345;
90     unsigned long long l = 1223344455L;
91     size_t sz = 101010101;
92     string s = "hallo....";
93     string bin("\x00\x00\x00\x00\x00\x00\x00\x40\x00\x00\x7f\xe5", 12);
94     m << i;
95     m << l;
96     m << s;
97     m << sz;
98     m << bin;
99
100     string b = m;
101     VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size());
102
103     unmarshall un(b, true);
104     rpc_protocol::request_header rh1;
105     un.read_header(rh1);
106     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
107     int i1;
108     unsigned long long l1;
109     string s1;
110     string bin1;
111     size_t sz1;
112     un >> i1;
113     un >> l1;
114     un >> s1;
115     un >> sz1;
116     un >> bin1;
117     VERIFY(un.okdone());
118     VERIFY(i1==i && l1==l && s1==s && sz1==sz && bin1==bin);
119 }
120
121 static void client1(size_t cl) {
122     // test concurrency.
123     size_t which_cl = cl % NUM_CL;
124
125     for(int i = 0; i < 100; i++){
126         auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
127         string rep;
128         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
129         VERIFY(ret == 0);
130         if ((unsigned long)rep.size()!=arg)
131             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
132         VERIFY((unsigned long)rep.size() == arg);
133     }
134
135     // test rpc replies coming back not in the order of
136     // the original calls -- i.e. does xid reply dispatch work.
137     for(int i = 0; i < 100; i++){
138         bool which = std::bernoulli_distribution()(global->random_generator);
139         int arg = std::uniform_int_distribution<>(0,1000)(global->random_generator);
140         int rep = -1;
141
142         auto start = steady_clock::now();
143
144         int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
145         auto end = steady_clock::now();
146         auto diff = duration_cast<milliseconds>(end - start).count();
147         if (ret != 0)
148             cout << diff << " ms have elapsed!!!" << endl;
149         VERIFY(ret == 0);
150         VERIFY(rep == (which ? arg+1 : arg+2));
151     }
152 }
153
154 static void client2(size_t cl) {
155     size_t which_cl = cl % NUM_CL;
156
157     time_t t1;
158     time(&t1);
159
160     while(time(0) - t1 < 10){
161         auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
162         string rep;
163         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
164         if ((unsigned long)rep.size()!=arg)
165             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
166         VERIFY((unsigned long)rep.size() == arg);
167     }
168 }
169
170 static void client3(void *xx) {
171     rpcc *c = (rpcc *) xx;
172
173     for(int i = 0; i < 4; i++){
174         int rep = 0;
175         int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
176         VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
177     }
178 }
179
180 static void simple_tests(rpcc *c) {
181     cout << "simple_tests" << endl;
182     // an RPC call to procedure #22.
183     // rpcc::call() looks at the argument types to decide how
184     // to marshall the RPC call packet, and how to unmarshall
185     // the reply packet.
186     string rep;
187     int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
188     VERIFY(intret == 0); // this is what handle_22 returns
189     VERIFY(rep == "hello goodbye");
190     cout << "   -- string concat RPC .. ok" << endl;
191
192     // small request, big reply (perhaps req via UDP, reply via TCP)
193     intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
194     VERIFY(intret == 0);
195     VERIFY(rep.size() == 70000);
196     cout << "   -- small request, big reply .. ok" << endl;
197
198     // specify a timeout value to an RPC that should succeed (udp)
199     int xx = 0;
200     intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
201     VERIFY(intret == 0 && xx == 78);
202     cout << "   -- no spurious timeout .. ok" << endl;
203
204     // specify a timeout value to an RPC that should succeed (tcp)
205     {
206         string arg(1000, 'x');
207         string rep2;
208         c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
209         VERIFY(rep2.size() == 1001);
210         cout << "   -- no spurious timeout .. ok" << endl;
211     }
212
213     // huge RPC
214     string big(1000000, 'x');
215     intret = c->call(srv_protocol::_22, rep, big, (string)"z");
216     VERIFY(intret == 0);
217     VERIFY(rep.size() == 1000001);
218     cout << "   -- huge 1M rpc request .. ok" << endl;
219
220     // specify a timeout value to an RPC that should timeout (udp)
221     string non_existent = "127.0.0.1:7661";
222     rpcc *c1 = new rpcc(non_existent);
223     time_t t0 = time(0);
224     intret = c1->bind(milliseconds(300));
225     time_t t1 = time(0);
226     VERIFY(intret < 0 && (t1 - t0) <= 4);
227     cout << "   -- rpc timeout .. ok" << endl;
228     cout << "simple_tests OK" << endl;
229 }
230
231 static void concurrent_test(size_t nt) {
232     // create threads that make lots of calls in parallel,
233     // to test thread synchronization for concurrent calls
234     // and dispatches.
235     cout << "start concurrent_test (" << nt << " threads) ...";
236
237     vector<thread> th(nt);
238
239     for(size_t i = 0; i < nt; i++)
240         th[i] = thread(client1, i);
241
242     for(size_t i = 0; i < nt; i++)
243         th[i].join();
244
245     cout << " OK" << endl;
246 }
247
248 static void lossy_test() {
249     cout << "start lossy_test ...";
250     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
251
252     if (server) {
253         delete server;
254         startserver();
255     }
256
257     for (int i = 0; i < NUM_CL; i++) {
258         delete clients[i];
259         clients[i] = new rpcc(*dst);
260         VERIFY(clients[i]->bind()==0);
261     }
262
263     size_t nt = 1;
264
265     vector<thread> th(nt);
266
267     for(size_t i = 0; i < nt; i++)
268         th[i] = thread(client2, i);
269
270     for(size_t i = 0; i < nt; i++)
271         th[i].join();
272
273     cout << ".. OK" << endl;
274     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
275 }
276
277 static void failure_test() {
278     rpcc *client1;
279     rpcc *client = clients[0];
280
281     cout << "failure_test" << endl;
282
283     delete server;
284
285     client1 = new rpcc(*dst);
286     VERIFY (client1->bind(milliseconds(3000)) < 0);
287     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
288
289     delete client1;
290
291     startserver();
292
293     string rep;
294     int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
295     VERIFY(intret == rpc_protocol::oldsrv_failure);
296     cout << "   -- call recovered server with old client .. failed ok" << endl;
297
298     delete client;
299
300     clients[0] = client = new rpcc(*dst);
301     VERIFY (client->bind() >= 0);
302     VERIFY (client->bind() < 0);
303
304     intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
305     VERIFY(intret == 0);
306     VERIFY(rep == "hello goodbye");
307
308     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
309
310
311     size_t nt = 10;
312     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
313
314     vector<thread> th(nt);
315
316     for(size_t i = 0; i < nt; i++)
317         th[i] = thread(client3, client);
318
319     for(size_t i = 0; i < nt; i++)
320         th[i].join();
321
322     cout << "ok" << endl;
323
324     delete server;
325     delete client;
326
327     startserver();
328     clients[0] = client = new rpcc(*dst);
329     VERIFY (client->bind() >= 0);
330     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
331
332     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
333
334     for(size_t i = 0; i < nt; i++)
335         th[i] = thread(client3, client);
336
337     for(size_t i = 0; i < nt; i++)
338         th[i].join();
339
340     cout << "ok" << endl;
341
342     cout << "failure_test OK" << endl;
343 }
344
345 int main(int argc, char *argv[]) {
346     global = new t4_state('r');
347
348     setvbuf(stdout, NULL, _IONBF, 0);
349     setvbuf(stderr, NULL, _IONBF, 0);
350     int debug_level = 0;
351
352     bool isclient = false;
353     bool isserver = false;
354
355     port = 20000 + (getpid() % 10000);
356
357     int ch = 0;
358     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
359         switch (ch) {
360             case 'c':
361                 isclient = true;
362                 break;
363             case 's':
364                 isserver = true;
365                 break;
366             case 'd':
367                 debug_level = atoi(optarg);
368                 break;
369             case 'p':
370                 port = (in_port_t)atoi(optarg);
371                 break;
372             case 'l':
373                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
374                 break;
375             default:
376                 break;
377         }
378     }
379
380     if (!isserver && !isclient)  {
381         isserver = isclient = true;
382     }
383
384     if (debug_level > 0) {
385         global->DEBUG_LEVEL = debug_level;
386         IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level;
387     }
388
389     testmarshall();
390
391     if (isserver) {
392         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
393         startserver();
394     }
395
396     if (isclient) {
397         // server's address.
398         dst = new string("127.0.0.1:" + std::to_string(port));
399
400
401         // start the client.  bind it to the server.
402         // starts a thread to listen for replies and hand them to
403         // the correct waiting caller thread. there should probably
404         // be only one rpcc per process. you probably need one
405         // rpcc per server.
406         for (int i = 0; i < NUM_CL; i++) {
407             clients[i] = new rpcc(*dst);
408             VERIFY (clients[i]->bind() == 0);
409         }
410
411         simple_tests(clients[0]);
412         concurrent_test(10);
413         lossy_test();
414         if (isserver) {
415             failure_test();
416         }
417
418         cout << "rpctest OK" << endl;
419
420         exit(0);
421     }
422
423     while (1)
424         std::this_thread::sleep_for(milliseconds(100));
425 }