1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
12 char log_thread_prefix = 'r';
14 rpcs *server; // server rpc object
15 rpcc *clients[NUM_CL]; // client rpc object
16 string dst; //server's ip address
19 // server-side handlers. they must be methods of some class
20 // to simplify rpcs::reg(). a server process can have handlers
21 // from multiple classes.
24 int handle_22(string & r, const string a, const string b);
25 int handle_fast(int &r, const int a);
26 int handle_slow(int &r, const int a);
27 int handle_bigrep(string &r, const size_t a);
30 namespace srv_protocol {
31 using status = rpc_protocol::status;
32 REMOTE_PROCEDURE_BASE(0);
33 REMOTE_PROCEDURE(22, _22, (string &, string, string));
34 REMOTE_PROCEDURE(23, fast, (int &, int));
35 REMOTE_PROCEDURE(24, slow, (int &, int));
36 REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
39 // a handler. a and b are arguments, r is the result.
40 // there can be multiple arguments but only one result.
41 // the caller also gets to see the int return value
42 // as the return value from rpcc::call().
43 // rpcs::reg() decides how to unmarshall by looking
44 // at these argument types, so this function definition
45 // does what a .x file does in SunRPC.
46 int srv::handle_22(string &r, const string a, string b) {
51 int srv::handle_fast(int &r, const int a) {
56 int srv::handle_slow(int &r, const int a) {
57 usleep(random() % 500);
62 int srv::handle_bigrep(string &r, const size_t len) {
63 r = string((size_t)len, 'x');
70 server = new rpcs(port);
71 server->reg(srv_protocol::_22, &srv::handle_22, &service);
72 server->reg(srv_protocol::fast, &srv::handle_fast, &service);
73 server->reg(srv_protocol::slow, &srv::handle_slow, &service);
74 server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
80 rpc_protocol::request_header rh{1,2,3,4,5};
82 VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
84 unsigned long long l = 1223344455L;
85 string s = "hallo....";
91 VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
93 unmarshall un(b, true);
94 rpc_protocol::request_header rh1;
95 un.unpack_header(rh1);
96 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
98 unsigned long long l1;
104 VERIFY(i1==i && l1==l && s1==s);
107 void client1(size_t cl) {
109 size_t which_cl = cl % NUM_CL;
111 for(int i = 0; i < 100; i++){
112 unsigned long arg = (random() % 2000);
114 int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
116 if ((unsigned long)rep.size()!=arg)
117 cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
118 VERIFY((unsigned long)rep.size() == arg);
121 // test rpc replies coming back not in the order of
122 // the original calls -- i.e. does xid reply dispatch work.
123 for(int i = 0; i < 100; i++){
124 int which = (random() % 2);
125 int arg = (random() % 1000);
128 auto start = steady_clock::now();
130 int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
131 auto end = steady_clock::now();
132 auto diff = duration_cast<milliseconds>(end - start).count();
134 cout << diff << " ms have elapsed!!!" << endl;
136 VERIFY(rep == (which ? arg+1 : arg+2));
140 void client2(size_t cl) {
141 size_t which_cl = cl % NUM_CL;
146 while(time(0) - t1 < 10){
147 unsigned long arg = (random() % 2000);
149 int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
150 if ((unsigned long)rep.size()!=arg)
151 cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
152 VERIFY((unsigned long)rep.size() == arg);
156 void client3(void *xx) {
157 rpcc *c = (rpcc *) xx;
159 for(int i = 0; i < 4; i++){
161 int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
162 VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
166 void simple_tests(rpcc *c) {
167 cout << "simple_tests" << endl;
168 // an RPC call to procedure #22.
169 // rpcc::call() looks at the argument types to decide how
170 // to marshall the RPC call packet, and how to unmarshall
173 int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
174 VERIFY(intret == 0); // this is what handle_22 returns
175 VERIFY(rep == "hello goodbye");
176 cout << " -- string concat RPC .. ok" << endl;
178 // small request, big reply (perhaps req via UDP, reply via TCP)
179 intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
181 VERIFY(rep.size() == 70000);
182 cout << " -- small request, big reply .. ok" << endl;
184 // specify a timeout value to an RPC that should succeed (udp)
186 intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
187 VERIFY(intret == 0 && xx == 78);
188 cout << " -- no spurious timeout .. ok" << endl;
190 // specify a timeout value to an RPC that should succeed (tcp)
192 string arg(1000, 'x');
194 c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
195 VERIFY(rep2.size() == 1001);
196 cout << " -- no spurious timeout .. ok" << endl;
200 string big(1000000, 'x');
201 intret = c->call(srv_protocol::_22, rep, big, (string)"z");
202 VERIFY(rep.size() == 1000001);
203 cout << " -- huge 1M rpc request .. ok" << endl;
205 // specify a timeout value to an RPC that should timeout (udp)
206 string non_existent = "127.0.0.1:7661";
207 rpcc *c1 = new rpcc(non_existent);
209 intret = c1->bind(milliseconds(300));
211 VERIFY(intret < 0 && (t1 - t0) <= 4);
212 cout << " -- rpc timeout .. ok" << endl;
213 cout << "simple_tests OK" << endl;
216 void concurrent_test(size_t nt) {
217 // create threads that make lots of calls in parallel,
218 // to test thread synchronization for concurrent calls
220 cout << "start concurrent_test (" << nt << " threads) ...";
222 vector<thread> th(nt);
224 for(size_t i = 0; i < nt; i++)
225 th[i] = thread(client1, i);
227 for(size_t i = 0; i < nt; i++)
230 cout << " OK" << endl;
234 cout << "start lossy_test ...";
235 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
242 for (int i = 0; i < NUM_CL; i++) {
244 clients[i] = new rpcc(dst);
245 VERIFY(clients[i]->bind()==0);
250 vector<thread> th(nt);
252 for(size_t i = 0; i < nt; i++)
253 th[i] = thread(client2, i);
255 for(size_t i = 0; i < nt; i++)
258 cout << ".. OK" << endl;
259 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
262 void failure_test() {
264 rpcc *client = clients[0];
266 cout << "failure_test" << endl;
270 client1 = new rpcc(dst);
271 VERIFY (client1->bind(milliseconds(3000)) < 0);
272 cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
279 int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
280 VERIFY(intret == rpc_protocol::oldsrv_failure);
281 cout << " -- call recovered server with old client .. failed ok" << endl;
285 clients[0] = client = new rpcc(dst);
286 VERIFY (client->bind() >= 0);
287 VERIFY (client->bind() < 0);
289 intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
291 VERIFY(rep == "hello goodbye");
293 cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
297 cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
299 vector<thread> th(nt);
301 for(size_t i = 0; i < nt; i++)
302 th[i] = thread(client3, client);
304 for(size_t i = 0; i < nt; i++)
307 cout << "ok" << endl;
313 clients[0] = client = new rpcc(dst);
314 VERIFY (client->bind() >= 0);
315 cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
317 cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
319 for(size_t i = 0; i < nt; i++)
320 th[i] = thread(client3, client);
322 for(size_t i = 0; i < nt; i++)
325 cout << "ok" << endl;
327 cout << "failure_test OK" << endl;
330 int main(int argc, char *argv[]) {
332 setvbuf(stdout, NULL, _IONBF, 0);
333 setvbuf(stderr, NULL, _IONBF, 0);
336 bool isclient = false;
337 bool isserver = false;
339 srandom((uint32_t)getpid());
340 port = 20000 + (getpid() % 10000);
343 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
352 debug_level = atoi(optarg);
355 port = (in_port_t)atoi(optarg);
358 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
365 if (!isserver && !isclient) {
366 isserver = isclient = true;
369 if (debug_level > 0) {
370 DEBUG_LEVEL = debug_level;
371 IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
377 cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
383 dst = "127.0.0.1:" + to_string(port);
386 // start the client. bind it to the server.
387 // starts a thread to listen for replies and hand them to
388 // the correct waiting caller thread. there should probably
389 // be only one rpcc per process. you probably need one
391 for (int i = 0; i < NUM_CL; i++) {
392 clients[i] = new rpcc(dst);
393 VERIFY (clients[i]->bind() == 0);
396 simple_tests(clients[0]);
403 cout << "rpctest OK" << endl;