1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
10 #include <sys/types.h>
13 #include "lang/verify.h"
17 char tprintf_thread_prefix = 'r';
19 rpcs *server; // server rpc object
20 rpcc *clients[NUM_CL]; // client rpc object
21 struct sockaddr_in dst; //server's ip address
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
29 int handle_22(std::string & r, const std::string a, const std::string b);
30 int handle_fast(int &r, const int a);
31 int handle_slow(int &r, const int a);
32 int handle_bigrep(std::string &r, const size_t a);
35 // a handler. a and b are arguments, r is the result.
36 // there can be multiple arguments but only one result.
37 // the caller also gets to see the int return value
38 // as the return value from rpcc::call().
39 // rpcs::reg() decides how to unmarshall by looking
40 // at these argument types, so this function definition
41 // does what a .x file does in SunRPC.
43 srv::handle_22(std::string &r, const std::string a, std::string b)
50 srv::handle_fast(int &r, const int a)
57 srv::handle_slow(int &r, const int a)
59 usleep(random() % 5000);
65 srv::handle_bigrep(std::string &r, const size_t len)
67 r = std::string((size_t)len, 'x');
75 server = new rpcs((unsigned int)port);
76 server->reg(22, &srv::handle_22, &service);
77 server->reg(23, &srv::handle_fast, &service);
78 server->reg(24, &srv::handle_slow, &service);
79 server->reg(25, &srv::handle_bigrep, &service);
86 request_header rh{1,2,3,4,5};
87 m.pack_req_header(rh);
88 VERIFY(m.size()==RPC_HEADER_SZ);
90 unsigned long long l = 1223344455L;
91 std::string s = std::string("hallo....");
99 VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
103 un.unpack_req_header(&rh1);
104 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
106 unsigned long long l1;
112 VERIFY(i1==i && l1==l && s1==s);
119 size_t which_cl = cl % NUM_CL;
121 for(int i = 0; i < 100; i++){
122 int arg = (random() % 2000);
124 int ret = clients[which_cl]->call(25, rep, arg);
126 if ((int)rep.size()!=arg) {
127 printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
129 VERIFY((int)rep.size() == arg);
132 // test rpc replies coming back not in the order of
133 // the original calls -- i.e. does xid reply dispatch work.
134 for(int i = 0; i < 100; i++){
135 int which = (random() % 2);
136 int arg = (random() % 1000);
139 auto start = std::chrono::steady_clock::now();
141 int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
142 auto end = std::chrono::steady_clock::now();
143 auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
145 printf("%d ms have elapsed!!!\n", (int)diff);
147 VERIFY(rep == (which ? arg+1 : arg+2));
154 size_t which_cl = cl % NUM_CL;
159 while(time(0) - t1 < 10){
160 int arg = (random() % 2000);
162 int ret = clients[which_cl]->call(25, rep, arg);
163 if ((int)rep.size()!=arg) {
164 printf("ask for %d reply got %d ret %d\n",
165 arg, (int)rep.size(), ret);
167 VERIFY((int)rep.size() == arg);
174 rpcc *c = (rpcc *) xx;
176 for(int i = 0; i < 4; i++){
178 int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
179 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
185 simple_tests(rpcc *c)
187 printf("simple_tests\n");
188 // an RPC call to procedure #22.
189 // rpcc::call() looks at the argument types to decide how
190 // to marshall the RPC call packet, and how to unmarshall
193 int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
194 VERIFY(intret == 0); // this is what handle_22 returns
195 VERIFY(rep == "hello goodbye");
196 printf(" -- string concat RPC .. ok\n");
198 // small request, big reply (perhaps req via UDP, reply via TCP)
199 intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
201 VERIFY(rep.size() == 70000);
202 printf(" -- small request, big reply .. ok\n");
204 // specify a timeout value to an RPC that should succeed (udp)
206 intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
207 VERIFY(intret == 0 && xx == 78);
208 printf(" -- no spurious timeout .. ok\n");
210 // specify a timeout value to an RPC that should succeed (tcp)
212 std::string arg(1000, 'x');
214 c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
215 VERIFY(rep2.size() == 1001);
216 printf(" -- no spurious timeout .. ok\n");
220 std::string big(1000000, 'x');
221 intret = c->call(22, rep, big, (std::string)"z");
222 VERIFY(rep.size() == 1000001);
223 printf(" -- huge 1M rpc request .. ok\n");
225 // specify a timeout value to an RPC that should timeout (udp)
226 struct sockaddr_in non_existent;
227 memset(&non_existent, 0, sizeof(non_existent));
228 non_existent.sin_family = AF_INET;
229 non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
230 non_existent.sin_port = htons(7661);
231 rpcc *c1 = new rpcc(non_existent);
233 intret = c1->bind(rpcc::to(3000));
235 VERIFY(intret < 0 && (t1 - t0) <= 4);
236 printf(" -- rpc timeout .. ok\n");
237 printf("simple_tests OK\n");
241 concurrent_test(size_t nt)
243 // create threads that make lots of calls in parallel,
244 // to test thread synchronization for concurrent calls
246 printf("start concurrent_test (%lu threads) ...", nt);
248 std::vector<std::thread> th(nt);
250 for(size_t i = 0; i < nt; i++)
251 th[i] = std::thread(client1, i);
253 for(size_t i = 0; i < nt; i++)
262 printf("start lossy_test ...");
263 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
270 for (int i = 0; i < NUM_CL; i++) {
272 clients[i] = new rpcc(dst);
273 VERIFY(clients[i]->bind()==0);
278 std::vector<std::thread> th(nt);
280 for(size_t i = 0; i < nt; i++)
281 th[i] = std::thread(client2, i);
283 for(size_t i = 0; i < nt; i++)
287 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
294 rpcc *client = clients[0];
296 printf("failure_test\n");
300 client1 = new rpcc(dst);
301 VERIFY (client1->bind(rpcc::to(3000)) < 0);
302 printf(" -- create new client and try to bind to failed server .. failed ok\n");
309 int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
310 VERIFY(intret == rpc_const::oldsrv_failure);
311 printf(" -- call recovered server with old client .. failed ok\n");
315 clients[0] = client = new rpcc(dst);
316 VERIFY (client->bind() >= 0);
317 VERIFY (client->bind() < 0);
319 intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
321 VERIFY(rep == "hello goodbye");
323 printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
327 printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt);
329 std::vector<std::thread> th(nt);
331 for(size_t i = 0; i < nt; i++)
332 th[i] = std::thread(client3, client);
334 for(size_t i = 0; i < nt; i++)
343 clients[0] = client = new rpcc(dst);
344 VERIFY (client->bind() >= 0);
345 printf(" -- delete existing rpc client and server, create replacements.. ok\n");
347 printf(" -- concurrent test on new client and server w/ %lu threads ..", nt);
349 for(size_t i = 0; i < nt; i++)
350 th[i] = std::thread(client3, client);
352 for(size_t i = 0; i < nt; i++)
357 printf("failure_test OK\n");
361 main(int argc, char *argv[])
364 setvbuf(stdout, NULL, _IONBF, 0);
365 setvbuf(stderr, NULL, _IONBF, 0);
368 bool isclient = false;
369 bool isserver = false;
371 srandom((uint32_t)getpid());
372 port = 20000 + (getpid() % 10000);
375 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
384 debug_level = atoi(optarg);
390 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
397 if (!isserver && !isclient) {
398 isserver = isclient = true;
401 if (debug_level > 0) {
402 JSL_DEBUG_LEVEL = debug_level;
403 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
409 printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
415 memset(&dst, 0, sizeof(dst));
416 dst.sin_family = AF_INET;
417 dst.sin_addr.s_addr = inet_addr("127.0.0.1");
418 dst.sin_port = htons(port);
421 // start the client. bind it to the server.
422 // starts a thread to listen for replies and hand them to
423 // the correct waiting caller thread. there should probably
424 // be only one rpcc per process. you probably need one
426 for (int i = 0; i < NUM_CL; i++) {
427 clients[i] = new rpcc(dst);
428 VERIFY (clients[i]->bind() == 0);
431 simple_tests(clients[0]);
438 printf("rpctest OK\n");