1963ada94e021a4c7d4e4bb854c81a66d644ac93
[invirt/third/libt4.git] / rpc / rpctest.cc
1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
3
4 #include "types.h"
5 #include "rpc.h"
6 #include <arpa/inet.h>
7 #include <getopt.h>
8 #include <unistd.h>
9
10 #define NUM_CL 2
11
12 char log_thread_prefix = 'r';
13
14 rpcs *server;  // server rpc object
15 rpcc *clients[NUM_CL];  // client rpc object
16 string dst; //server's ip address
17 in_port_t port;
18
19 // server-side handlers. they must be methods of some class
20 // to simplify rpcs::reg(). a server process can have handlers
21 // from multiple classes.
22 class srv {
23     public:
24         int handle_22(string & r, const string a, const string b);
25         int handle_fast(int &r, const int a);
26         int handle_slow(int &r, const int a);
27         int handle_bigrep(string &r, const size_t a);
28 };
29
30 namespace srv_protocol {
31     using status = rpc_protocol::status;
32     REMOTE_PROCEDURE_BASE(0);
33     REMOTE_PROCEDURE(22, _22, (string &, string, string));
34     REMOTE_PROCEDURE(23, fast, (int &, int));
35     REMOTE_PROCEDURE(24, slow, (int &, int));
36     REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
37 };
38
39 // a handler. a and b are arguments, r is the result.
40 // there can be multiple arguments but only one result.
41 // the caller also gets to see the int return value
42 // as the return value from rpcc::call().
43 // rpcs::reg() decides how to unmarshall by looking
44 // at these argument types, so this function definition
45 // does what a .x file does in SunRPC.
46 int srv::handle_22(string &r, const string a, string b) {
47     r = a + b;
48     return 0;
49 }
50
51 int srv::handle_fast(int &r, const int a) {
52     r = a + 1;
53     return 0;
54 }
55
56 int srv::handle_slow(int &r, const int a) {
57     usleep(random() % 500);
58     r = a + 2;
59     return 0;
60 }
61
62 int srv::handle_bigrep(string &r, const size_t len) {
63     r = string((size_t)len, 'x');
64     return 0;
65 }
66
67 srv service;
68
69 void startserver() {
70     server = new rpcs(port);
71     server->reg(srv_protocol::_22, &srv::handle_22, &service);
72     server->reg(srv_protocol::fast, &srv::handle_fast, &service);
73     server->reg(srv_protocol::slow, &srv::handle_slow, &service);
74     server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
75     server->start();
76 }
77
78 void testmarshall() {
79     marshall m;
80     rpc_protocol::request_header rh{1,2,3,4,5};
81     m.pack_header(rh);
82     VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
83     int i = 12345;
84     unsigned long long l = 1223344455L;
85     string s = "hallo....";
86     m << i;
87     m << l;
88     m << s;
89
90     string b = m;
91     VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
92
93     unmarshall un(b, true);
94     rpc_protocol::request_header rh1;
95     un.unpack_header(rh1);
96     VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
97     int i1;
98     unsigned long long l1;
99     string s1;
100     un >> i1;
101     un >> l1;
102     un >> s1;
103     VERIFY(un.okdone());
104     VERIFY(i1==i && l1==l && s1==s);
105 }
106
107 void client1(size_t cl) {
108     // test concurrency.
109     size_t which_cl = cl % NUM_CL;
110
111     for(int i = 0; i < 100; i++){
112         unsigned long arg = (random() % 2000);
113         string rep;
114         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
115         VERIFY(ret == 0);
116         if ((unsigned long)rep.size()!=arg)
117             cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
118         VERIFY((unsigned long)rep.size() == arg);
119     }
120
121     // test rpc replies coming back not in the order of
122     // the original calls -- i.e. does xid reply dispatch work.
123     for(int i = 0; i < 100; i++){
124         int which = (random() % 2);
125         int arg = (random() % 1000);
126         int rep;
127
128         auto start = steady_clock::now();
129
130         int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
131         auto end = steady_clock::now();
132         auto diff = duration_cast<milliseconds>(end - start).count();
133         if (ret != 0)
134             cout << diff << " ms have elapsed!!!" << endl;
135         VERIFY(ret == 0);
136         VERIFY(rep == (which ? arg+1 : arg+2));
137     }
138 }
139
140 void client2(size_t cl) {
141     size_t which_cl = cl % NUM_CL;
142
143     time_t t1;
144     time(&t1);
145
146     while(time(0) - t1 < 10){
147         unsigned long arg = (random() % 2000);
148         string rep;
149         int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
150         if ((unsigned long)rep.size()!=arg)
151             cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
152         VERIFY((unsigned long)rep.size() == arg);
153     }
154 }
155
156 void client3(void *xx) {
157     rpcc *c = (rpcc *) xx;
158
159     for(int i = 0; i < 4; i++){
160         int rep = 0;
161         int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
162         VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
163     }
164 }
165
166 void simple_tests(rpcc *c) {
167     cout << "simple_tests" << endl;
168     // an RPC call to procedure #22.
169     // rpcc::call() looks at the argument types to decide how
170     // to marshall the RPC call packet, and how to unmarshall
171     // the reply packet.
172     string rep;
173     int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
174     VERIFY(intret == 0); // this is what handle_22 returns
175     VERIFY(rep == "hello goodbye");
176     cout << "   -- string concat RPC .. ok" << endl;
177
178     // small request, big reply (perhaps req via UDP, reply via TCP)
179     intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
180     VERIFY(intret == 0);
181     VERIFY(rep.size() == 70000);
182     cout << "   -- small request, big reply .. ok" << endl;
183
184     // specify a timeout value to an RPC that should succeed (udp)
185     int xx = 0;
186     intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
187     VERIFY(intret == 0 && xx == 78);
188     cout << "   -- no spurious timeout .. ok" << endl;
189
190     // specify a timeout value to an RPC that should succeed (tcp)
191     {
192         string arg(1000, 'x');
193         string rep2;
194         c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
195         VERIFY(rep2.size() == 1001);
196         cout << "   -- no spurious timeout .. ok" << endl;
197     }
198
199     // huge RPC
200     string big(1000000, 'x');
201     intret = c->call(srv_protocol::_22, rep, big, (string)"z");
202     VERIFY(rep.size() == 1000001);
203     cout << "   -- huge 1M rpc request .. ok" << endl;
204
205     // specify a timeout value to an RPC that should timeout (udp)
206     string non_existent = "127.0.0.1:7661";
207     rpcc *c1 = new rpcc(non_existent);
208     time_t t0 = time(0);
209     intret = c1->bind(milliseconds(300));
210     time_t t1 = time(0);
211     VERIFY(intret < 0 && (t1 - t0) <= 4);
212     cout << "   -- rpc timeout .. ok" << endl;
213     cout << "simple_tests OK" << endl;
214 }
215
216 void concurrent_test(size_t nt) {
217     // create threads that make lots of calls in parallel,
218     // to test thread synchronization for concurrent calls
219     // and dispatches.
220     cout << "start concurrent_test (" << nt << " threads) ...";
221
222     vector<thread> th(nt);
223
224     for(size_t i = 0; i < nt; i++)
225         th[i] = thread(client1, i);
226
227     for(size_t i = 0; i < nt; i++)
228         th[i].join();
229
230     cout << " OK" << endl;
231 }
232
233 void lossy_test() {
234     cout << "start lossy_test ...";
235     VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
236
237     if (server) {
238         delete server;
239         startserver();
240     }
241
242     for (int i = 0; i < NUM_CL; i++) {
243         delete clients[i];
244         clients[i] = new rpcc(dst);
245         VERIFY(clients[i]->bind()==0);
246     }
247
248     size_t nt = 1;
249
250     vector<thread> th(nt);
251
252     for(size_t i = 0; i < nt; i++)
253         th[i] = thread(client2, i);
254
255     for(size_t i = 0; i < nt; i++)
256         th[i].join();
257
258     cout << ".. OK" << endl;
259     VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
260 }
261
262 void failure_test() {
263     rpcc *client1;
264     rpcc *client = clients[0];
265
266     cout << "failure_test" << endl;
267
268     delete server;
269
270     client1 = new rpcc(dst);
271     VERIFY (client1->bind(milliseconds(3000)) < 0);
272     cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
273
274     delete client1;
275
276     startserver();
277
278     string rep;
279     int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
280     VERIFY(intret == rpc_protocol::oldsrv_failure);
281     cout << "   -- call recovered server with old client .. failed ok" << endl;
282
283     delete client;
284
285     clients[0] = client = new rpcc(dst);
286     VERIFY (client->bind() >= 0);
287     VERIFY (client->bind() < 0);
288
289     intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
290     VERIFY(intret == 0);
291     VERIFY(rep == "hello goodbye");
292
293     cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
294
295
296     size_t nt = 10;
297     cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
298
299     vector<thread> th(nt);
300
301     for(size_t i = 0; i < nt; i++)
302         th[i] = thread(client3, client);
303
304     for(size_t i = 0; i < nt; i++)
305         th[i].join();
306
307     cout << "ok" << endl;
308
309     delete server;
310     delete client;
311
312     startserver();
313     clients[0] = client = new rpcc(dst);
314     VERIFY (client->bind() >= 0);
315     cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
316
317     cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
318
319     for(size_t i = 0; i < nt; i++)
320         th[i] = thread(client3, client);
321
322     for(size_t i = 0; i < nt; i++)
323         th[i].join();
324
325     cout << "ok" << endl;
326
327     cout << "failure_test OK" << endl;
328 }
329
330 int main(int argc, char *argv[]) {
331
332     setvbuf(stdout, NULL, _IONBF, 0);
333     setvbuf(stderr, NULL, _IONBF, 0);
334     int debug_level = 0;
335
336     bool isclient = false;
337     bool isserver = false;
338
339     srandom((uint32_t)getpid());
340     port = 20000 + (getpid() % 10000);
341
342     int ch = 0;
343     while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
344         switch (ch) {
345             case 'c':
346                 isclient = true;
347                 break;
348             case 's':
349                 isserver = true;
350                 break;
351             case 'd':
352                 debug_level = atoi(optarg);
353                 break;
354             case 'p':
355                 port = (in_port_t)atoi(optarg);
356                 break;
357             case 'l':
358                 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
359                 break;
360             default:
361                 break;
362         }
363     }
364
365     if (!isserver && !isclient)  {
366         isserver = isclient = true;
367     }
368
369     if (debug_level > 0) {
370         DEBUG_LEVEL = debug_level;
371         IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
372     }
373
374     testmarshall();
375
376     if (isserver) {
377         cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
378         startserver();
379     }
380
381     if (isclient) {
382         // server's address.
383         dst = "127.0.0.1:" + to_string(port);
384
385
386         // start the client.  bind it to the server.
387         // starts a thread to listen for replies and hand them to
388         // the correct waiting caller thread. there should probably
389         // be only one rpcc per process. you probably need one
390         // rpcc per server.
391         for (int i = 0; i < NUM_CL; i++) {
392             clients[i] = new rpcc(dst);
393             VERIFY (clients[i]->bind() == 0);
394         }
395
396         simple_tests(clients[0]);
397         concurrent_test(10);
398         lossy_test();
399         if (isserver) {
400             failure_test();
401         }
402
403         cout << "rpctest OK" << endl;
404
405         exit(0);
406     }
407
408     while (1)
409         usleep(100000);
410 }