1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
14 char log_thread_prefix = 'r';
16 rpcs *server; // server rpc object
17 rpcc *clients[NUM_CL]; // client rpc object
18 string dst; //server's ip address
21 // server-side handlers. they must be methods of some class
22 // to simplify rpcs::reg(). a server process can have handlers
23 // from multiple classes.
26 int handle_22(string & r, const string a, const string b);
27 int handle_fast(int &r, const int a);
28 int handle_slow(int &r, const int a);
29 int handle_bigrep(string &r, const size_t a);
32 // a handler. a and b are arguments, r is the result.
33 // there can be multiple arguments but only one result.
34 // the caller also gets to see the int return value
35 // as the return value from rpcc::call().
36 // rpcs::reg() decides how to unmarshall by looking
37 // at these argument types, so this function definition
38 // does what a .x file does in SunRPC.
40 srv::handle_22(string &r, const string a, string b)
47 srv::handle_fast(int &r, const int a)
54 srv::handle_slow(int &r, const int a)
56 usleep(random() % 5000);
62 srv::handle_bigrep(string &r, const size_t len)
64 r = string((size_t)len, 'x');
72 server = new rpcs((unsigned int)port);
73 server->reg(22, &srv::handle_22, &service);
74 server->reg(23, &srv::handle_fast, &service);
75 server->reg(24, &srv::handle_slow, &service);
76 server->reg(25, &srv::handle_bigrep, &service);
83 request_header rh{1,2,3,4,5};
84 m.pack_req_header(rh);
85 VERIFY(m.size()==RPC_HEADER_SZ);
87 unsigned long long l = 1223344455L;
88 string s = "hallo....";
96 VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
100 un.unpack_req_header(&rh1);
101 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
103 unsigned long long l1;
109 VERIFY(i1==i && l1==l && s1==s);
116 size_t which_cl = cl % NUM_CL;
118 for(int i = 0; i < 100; i++){
119 int arg = (random() % 2000);
121 int ret = clients[which_cl]->call(25, rep, arg);
123 if ((int)rep.size()!=arg)
124 cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
125 VERIFY((int)rep.size() == arg);
128 // test rpc replies coming back not in the order of
129 // the original calls -- i.e. does xid reply dispatch work.
130 for(int i = 0; i < 100; i++){
131 int which = (random() % 2);
132 int arg = (random() % 1000);
135 auto start = std::chrono::steady_clock::now();
137 int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
138 auto end = std::chrono::steady_clock::now();
139 auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
141 cout << diff << " ms have elapsed!!!" << endl;
143 VERIFY(rep == (which ? arg+1 : arg+2));
150 size_t which_cl = cl % NUM_CL;
155 while(time(0) - t1 < 10){
156 int arg = (random() % 2000);
158 int ret = clients[which_cl]->call(25, rep, arg);
159 if ((int)rep.size()!=arg)
160 cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
161 VERIFY((int)rep.size() == arg);
168 rpcc *c = (rpcc *) xx;
170 for(int i = 0; i < 4; i++){
172 int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
173 VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
179 simple_tests(rpcc *c)
181 cout << "simple_tests" << endl;
182 // an RPC call to procedure #22.
183 // rpcc::call() looks at the argument types to decide how
184 // to marshall the RPC call packet, and how to unmarshall
187 int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
188 VERIFY(intret == 0); // this is what handle_22 returns
189 VERIFY(rep == "hello goodbye");
190 cout << " -- string concat RPC .. ok" << endl;
192 // small request, big reply (perhaps req via UDP, reply via TCP)
193 intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
195 VERIFY(rep.size() == 70000);
196 cout << " -- small request, big reply .. ok" << endl;
198 // specify a timeout value to an RPC that should succeed (udp)
200 intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
201 VERIFY(intret == 0 && xx == 78);
202 cout << " -- no spurious timeout .. ok" << endl;
204 // specify a timeout value to an RPC that should succeed (tcp)
206 string arg(1000, 'x');
208 c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
209 VERIFY(rep2.size() == 1001);
210 cout << " -- no spurious timeout .. ok" << endl;
214 string big(1000000, 'x');
215 intret = c->call(22, rep, big, (string)"z");
216 VERIFY(rep.size() == 1000001);
217 cout << " -- huge 1M rpc request .. ok" << endl;
219 // specify a timeout value to an RPC that should timeout (udp)
220 string non_existent = "127.0.0.1:7661";
221 rpcc *c1 = new rpcc(non_existent);
223 intret = c1->bind(rpcc::to(3000));
225 VERIFY(intret < 0 && (t1 - t0) <= 4);
226 cout << " -- rpc timeout .. ok" << endl;
227 cout << "simple_tests OK" << endl;
231 concurrent_test(size_t nt)
233 // create threads that make lots of calls in parallel,
234 // to test thread synchronization for concurrent calls
236 cout << "start concurrent_test (" << nt << " threads) ...";
238 vector<thread> th(nt);
240 for(size_t i = 0; i < nt; i++)
241 th[i] = thread(client1, i);
243 for(size_t i = 0; i < nt; i++)
246 cout << " OK" << endl;
252 cout << "start lossy_test ...";
253 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
260 for (int i = 0; i < NUM_CL; i++) {
262 clients[i] = new rpcc(dst);
263 VERIFY(clients[i]->bind()==0);
268 vector<thread> th(nt);
270 for(size_t i = 0; i < nt; i++)
271 th[i] = thread(client2, i);
273 for(size_t i = 0; i < nt; i++)
276 cout << ".. OK" << endl;
277 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
284 rpcc *client = clients[0];
286 cout << "failure_test" << endl;
290 client1 = new rpcc(dst);
291 VERIFY (client1->bind(rpcc::to(3000)) < 0);
292 cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
299 int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
300 VERIFY(intret == rpc_const::oldsrv_failure);
301 cout << " -- call recovered server with old client .. failed ok" << endl;
305 clients[0] = client = new rpcc(dst);
306 VERIFY (client->bind() >= 0);
307 VERIFY (client->bind() < 0);
309 intret = client->call(22, rep, (string)"hello", (string)" goodbye");
311 VERIFY(rep == "hello goodbye");
313 cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
317 cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
319 vector<thread> th(nt);
321 for(size_t i = 0; i < nt; i++)
322 th[i] = thread(client3, client);
324 for(size_t i = 0; i < nt; i++)
327 cout << "ok" << endl;
333 clients[0] = client = new rpcc(dst);
334 VERIFY (client->bind() >= 0);
335 cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
337 cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
339 for(size_t i = 0; i < nt; i++)
340 th[i] = thread(client3, client);
342 for(size_t i = 0; i < nt; i++)
345 cout << "ok" << endl;
347 cout << "failure_test OK" << endl;
351 main(int argc, char *argv[])
354 setvbuf(stdout, NULL, _IONBF, 0);
355 setvbuf(stderr, NULL, _IONBF, 0);
358 bool isclient = false;
359 bool isserver = false;
361 srandom((uint32_t)getpid());
362 port = 20000 + (getpid() % 10000);
365 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
374 debug_level = atoi(optarg);
380 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
387 if (!isserver && !isclient) {
388 isserver = isclient = true;
391 if (debug_level > 0) {
392 JSL_DEBUG_LEVEL = debug_level;
393 jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
399 cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
405 dst = "127.0.0.1:" + std::to_string(port);
408 // start the client. bind it to the server.
409 // starts a thread to listen for replies and hand them to
410 // the correct waiting caller thread. there should probably
411 // be only one rpcc per process. you probably need one
413 for (int i = 0; i < NUM_CL; i++) {
414 clients[i] = new rpcc(dst);
415 VERIFY (clients[i]->bind() == 0);
418 simple_tests(clients[0]);
425 cout << "rpctest OK" << endl;