051e564a0cc46d98ea98160009807eb42ddce9b1
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <unistd.h>
9 #include <string.h>
10 #include "threaded_log.h"
11
12 #define NUM_CL 2
13
14 static rpcs *server;  // server rpc object
15 static rpcc *clients[NUM_CL];  // client rpc object
16 static string * dst; //server's ip address
17 static in_port_t port;
18
19 using std::cout;
20 using std::endl;
21 using namespace std::chrono;
22 using std::vector;
23
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
27 class srv {
28     public:
29         int handle_22(string & r, const string a, const string b);
30         int handle_fast(int & r, const int a);
31         int handle_slow(int & r, const int a);
32         int handle_bigrep(string & r, const size_t a);
33 };
34
35 namespace srv_protocol {
36     using status = rpc_protocol::status;
37     REMOTE_PROCEDURE_BASE(0);
38     REMOTE_PROCEDURE(22, _22, (string &, string, string));
39     REMOTE_PROCEDURE(23, fast, (int &, int));
40     REMOTE_PROCEDURE(24, slow, (int &, int));
41     REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
42 }
43
44 // a handler. a and b are arguments, r is the result.
45 // there can be multiple arguments but only one result.
46 // the caller also gets to see the int return value
47 // as the return value from rpcc::call().
48 // rpcs::reg() decides how to unmarshall by looking
49 // at these argument types, so this function definition
50 // does what a .x file does in SunRPC.
51 int srv::handle_22(string & r, const string a, string b) {
52     r = a + b;
53     return 0;
54 }
55
56 int srv::handle_fast(int & r, const int a) {
57     r = a + 1;
58     return 0;
59 }
60
61 int srv::handle_slow(int & r, const int a) {
62     usleep(std::uniform_int_distribution<useconds_t>(0,500)(global->random_generator));
63     r = a + 2;
64     return 0;
65 }
66
67 int srv::handle_bigrep(string & r, const size_t len) {
68     r = string(len, 'x');
69     return 0;
70 }
71
72 static srv service;
73
74 static void startserver() {
75     server = new rpcs(port);
76     server->reg(srv_protocol::_22, &srv::handle_22, &service);
77     server->reg(srv_protocol::fast, &srv::handle_fast, &service);
78     server->reg(srv_protocol::slow, &srv::handle_slow, &service);
79     server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
80     server->start();
81 }
82
83 static void testmarshall() {
84     marshall m;
85     rpc_protocol::request_header rh{1,2,3,4,5};
86     m.write_header(rh);
87     VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
88     int i = 12345;
89     unsigned long long l = 1223344455L;
90     size_t sz = 101010101;
91     string s = "hallo....";
92     string bin("\x00\x00\x00\x00\x00\x00\x00\x40\x00\x00\x7f\xe5", 12);
93     m << i;
94     m << l;
95     m << s;
96     m << sz;
97     m << bin;
98
99     string b = m;
100     VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size());
101
102     unmarshall un(b, true);
103     rpc_protocol::request_header rh1;
104     un.read_header(rh1);
105     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
106     int i1;
107     unsigned long long l1;
108     string s1;
109     string bin1;
110     size_t sz1;
111     un >> i1;
112     un >> l1;
113     un >> s1;
114     un >> sz1;
115     un >> bin1;
116     VERIFY(un.okdone());
117     VERIFY(i1==i && l1==l && s1==s && sz1==sz && bin1==bin);
118 }
119
120 static void client1(size_t cl) {
121     // test concurrency.
122     size_t which_cl = cl % NUM_CL;
123
124     for(int i = 0; i < 100; i++){
125         auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
126         string rep;
127         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
128         VERIFY(ret == 0);
129         if ((unsigned long)rep.size()!=arg)
130             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
131         VERIFY((unsigned long)rep.size() == arg);
132     }
133
134     // test rpc replies coming back not in the order of
135     // the original calls -- i.e. does xid reply dispatch work.
136     for(int i = 0; i < 100; i++){
137         bool which = std::bernoulli_distribution()(global->random_generator);
138         int arg = std::uniform_int_distribution<>(0,1000)(global->random_generator);
139         int rep = -1;
140
141         auto start = steady_clock::now();
142
143         int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
144         auto end = steady_clock::now();
145         auto diff = duration_cast<milliseconds>(end - start).count();
146         if (ret != 0)
147             cout << diff << " ms have elapsed!!!" << endl;
148         VERIFY(ret == 0);
149         VERIFY(rep == (which ? arg+1 : arg+2));
150     }
151 }
152
153 static void client2(size_t cl) {
154     size_t which_cl = cl % NUM_CL;
155
156     time_t t1;
157     time(&t1);
158
159     while(time(0) - t1 < 10){
160         auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
161         string rep;
162         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
163         if ((unsigned long)rep.size()!=arg)
164             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
165         VERIFY((unsigned long)rep.size() == arg);
166     }
167 }
168
169 static void client3(void *xx) {
170     rpcc *c = (rpcc *) xx;
171
172     for(int i = 0; i < 4; i++){
173         int rep = 0;
174         int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
175         VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
176     }
177 }
178
179 static void simple_tests(rpcc *c) {
180     cout << "simple_tests" << endl;
181     // an RPC call to procedure #22.
182     // rpcc::call() looks at the argument types to decide how
183     // to marshall the RPC call packet, and how to unmarshall
184     // the reply packet.
185     string rep;
186     int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
187     VERIFY(intret == 0); // this is what handle_22 returns
188     VERIFY(rep == "hello goodbye");
189     cout << "   -- string concat RPC .. ok" << endl;
190
191     // small request, big reply (perhaps req via UDP, reply via TCP)
192     intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
193     VERIFY(intret == 0);
194     VERIFY(rep.size() == 70000);
195     cout << "   -- small request, big reply .. ok" << endl;
196
197     // specify a timeout value to an RPC that should succeed (udp)
198     int xx = 0;
199     intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
200     VERIFY(intret == 0 && xx == 78);
201     cout << "   -- no spurious timeout .. ok" << endl;
202
203     // specify a timeout value to an RPC that should succeed (tcp)
204     {
205         string arg(1000, 'x');
206         string rep2;
207         c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
208         VERIFY(rep2.size() == 1001);
209         cout << "   -- no spurious timeout .. ok" << endl;
210     }
211
212     // huge RPC
213     string big(1000000, 'x');
214     intret = c->call(srv_protocol::_22, rep, big, (string)"z");
215     VERIFY(intret == 0);
216     VERIFY(rep.size() == 1000001);
217     cout << "   -- huge 1M rpc request .. ok" << endl;
218
219     // specify a timeout value to an RPC that should timeout (udp)
220     string non_existent = "127.0.0.1:7661";
221     rpcc *c1 = new rpcc(non_existent);
222     time_t t0 = time(0);
223     intret = c1->bind(milliseconds(300));
224     time_t t1 = time(0);
225     VERIFY(intret < 0 && (t1 - t0) <= 4);
226     cout << "   -- rpc timeout .. ok" << endl;
227     cout << "simple_tests OK" << endl;
228 }
229
230 static void concurrent_test(size_t nt) {
231     // create threads that make lots of calls in parallel,
232     // to test thread synchronization for concurrent calls
233     // and dispatches.
234     cout << "start concurrent_test (" << nt << " threads) ...";
235
236     vector<thread> th(nt);
237
238     for(size_t i = 0; i < nt; i++)
239         th[i] = thread(client1, i);
240
241     for(size_t i = 0; i < nt; i++)
242         th[i].join();
243
244     cout << " OK" << endl;
245 }
246
247 static void lossy_test() {
248     cout << "start lossy_test ...";
249     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
250
251     if (server) {
252         delete server;
253         startserver();
254     }
255
256     for (int i = 0; i < NUM_CL; i++) {
257         delete clients[i];
258         clients[i] = new rpcc(*dst);
259         VERIFY(clients[i]->bind()==0);
260     }
261
262     size_t nt = 1;
263
264     vector<thread> th(nt);
265
266     for(size_t i = 0; i < nt; i++)
267         th[i] = thread(client2, i);
268
269     for(size_t i = 0; i < nt; i++)
270         th[i].join();
271
272     cout << ".. OK" << endl;
273     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
274 }
275
276 static void failure_test() {
277     rpcc *client1;
278     rpcc *client = clients[0];
279
280     cout << "failure_test" << endl;
281
282     delete server;
283
284     client1 = new rpcc(*dst);
285     VERIFY (client1->bind(milliseconds(3000)) < 0);
286     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
287
288     delete client1;
289
290     startserver();
291
292     string rep;
293     int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
294     VERIFY(intret == rpc_protocol::oldsrv_failure);
295     cout << "   -- call recovered server with old client .. failed ok" << endl;
296
297     delete client;
298
299     clients[0] = client = new rpcc(*dst);
300     VERIFY (client->bind() >= 0);
301     VERIFY (client->bind() < 0);
302
303     intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
304     VERIFY(intret == 0);
305     VERIFY(rep == "hello goodbye");
306
307     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
308
309
310     size_t nt = 10;
311     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
312
313     vector<thread> th(nt);
314
315     for(size_t i = 0; i < nt; i++)
316         th[i] = thread(client3, client);
317
318     for(size_t i = 0; i < nt; i++)
319         th[i].join();
320
321     cout << "ok" << endl;
322
323     delete server;
324     delete client;
325
326     startserver();
327     clients[0] = client = new rpcc(*dst);
328     VERIFY (client->bind() >= 0);
329     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
330
331     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
332
333     for(size_t i = 0; i < nt; i++)
334         th[i] = thread(client3, client);
335
336     for(size_t i = 0; i < nt; i++)
337         th[i].join();
338
339     cout << "ok" << endl;
340
341     cout << "failure_test OK" << endl;
342 }
343
344 int main(int argc, char *argv[]) {
345     global = new t4_state('r');
346
347     setvbuf(stdout, NULL, _IONBF, 0);
348     setvbuf(stderr, NULL, _IONBF, 0);
349     int debug_level = 0;
350
351     bool isclient = false;
352     bool isserver = false;
353
354     port = 20000 + (getpid() % 10000);
355
356     int ch = 0;
357     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
358         switch (ch) {
359             case 'c':
360                 isclient = true;
361                 break;
362             case 's':
363                 isserver = true;
364                 break;
365             case 'd':
366                 debug_level = atoi(optarg);
367                 break;
368             case 'p':
369                 port = (in_port_t)atoi(optarg);
370                 break;
371             case 'l':
372                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
373                 break;
374             default:
375                 break;
376         }
377     }
378
379     if (!isserver && !isclient)  {
380         isserver = isclient = true;
381     }
382
383     if (debug_level > 0) {
384         global->DEBUG_LEVEL = debug_level;
385         IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level;
386     }
387
388     testmarshall();
389
390     if (isserver) {
391         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
392         startserver();
393     }
394
395     if (isclient) {
396         // server's address.
397         dst = new string("127.0.0.1:" + std::to_string(port));
398
399
400         // start the client.  bind it to the server.
401         // starts a thread to listen for replies and hand them to
402         // the correct waiting caller thread. there should probably
403         // be only one rpcc per process. you probably need one
404         // rpcc per server.
405         for (int i = 0; i < NUM_CL; i++) {
406             clients[i] = new rpcc(*dst);
407             VERIFY (clients[i]->bind() == 0);
408         }
409
410         simple_tests(clients[0]);
411         concurrent_test(10);
412         lossy_test();
413         if (isserver) {
414             failure_test();
415         }
416
417         cout << "rpctest OK" << endl;
418
419         exit(0);
420     }
421
422     while (1)
423         usleep(100000);
424 }