1 // RPC test and pseudo-documentation.
2 // generates print statements on failures, but eventually says "rpctest OK"
10 #include "threaded_log.h"
14 static rpcs *server; // server rpc object
15 static rpcc *clients[NUM_CL]; // client rpc object
16 static string * dst; //server's ip address
17 static in_port_t port;
21 using namespace std::chrono;
24 // server-side handlers. they must be methods of some class
25 // to simplify rpcs::reg(). a server process can have handlers
26 // from multiple classes.
29 int handle_22(string & r, const string a, const string b);
30 int handle_fast(int & r, const int a);
31 int handle_slow(int & r, const int a);
32 int handle_bigrep(string & r, const size_t a);
35 namespace srv_protocol {
36 using status = rpc_protocol::status;
37 REMOTE_PROCEDURE_BASE(0);
38 REMOTE_PROCEDURE(22, _22, (string &, string, string));
39 REMOTE_PROCEDURE(23, fast, (int &, int));
40 REMOTE_PROCEDURE(24, slow, (int &, int));
41 REMOTE_PROCEDURE(25, bigrep, (string &, size_t));
44 // a handler. a and b are arguments, r is the result.
45 // there can be multiple arguments but only one result.
46 // the caller also gets to see the int return value
47 // as the return value from rpcc::call().
48 // rpcs::reg() decides how to unmarshall by looking
49 // at these argument types, so this function definition
50 // does what a .x file does in SunRPC.
51 int srv::handle_22(string & r, const string a, string b) {
56 int srv::handle_fast(int & r, const int a) {
61 int srv::handle_slow(int & r, const int a) {
62 int us = std::uniform_int_distribution<>(0,500)(global->random_generator);
63 std::this_thread::sleep_for(microseconds(us));
68 int srv::handle_bigrep(string & r, const size_t len) {
75 static void startserver() {
76 server = new rpcs(port);
77 server->reg(srv_protocol::_22, &srv::handle_22, &service);
78 server->reg(srv_protocol::fast, &srv::handle_fast, &service);
79 server->reg(srv_protocol::slow, &srv::handle_slow, &service);
80 server->reg(srv_protocol::bigrep, &srv::handle_bigrep, &service);
84 static void testmarshall() {
86 rpc_protocol::request_header rh{1,2,3,4,5};
88 VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
90 unsigned long long l = 1223344455L;
91 size_t sz = 101010101;
92 string s = "hallo....";
93 string bin("\x00\x00\x00\x00\x00\x00\x00\x40\x00\x00\x7f\xe5", 12);
101 VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+sizeof(uint32_t)+s.size()+sizeof(uint32_t)+sizeof(uint32_t)+bin.size());
103 unmarshall un(b, true);
104 rpc_protocol::request_header rh1;
106 VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
108 unsigned long long l1;
118 VERIFY(i1==i && l1==l && s1==s && sz1==sz && bin1==bin);
121 static void client1(size_t cl) {
123 size_t which_cl = cl % NUM_CL;
125 for(int i = 0; i < 100; i++){
126 auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
128 int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
130 if ((unsigned long)rep.size()!=arg)
131 cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
132 VERIFY((unsigned long)rep.size() == arg);
135 // test rpc replies coming back not in the order of
136 // the original calls -- i.e. does xid reply dispatch work.
137 for(int i = 0; i < 100; i++){
138 bool which = std::bernoulli_distribution()(global->random_generator);
139 int arg = std::uniform_int_distribution<>(0,1000)(global->random_generator);
142 auto start = steady_clock::now();
144 int ret = clients[which_cl]->call(which ? srv_protocol::fast : srv_protocol::slow, rep, arg);
145 auto end = steady_clock::now();
146 auto diff = duration_cast<milliseconds>(end - start).count();
148 cout << diff << " ms have elapsed!!!" << endl;
150 VERIFY(rep == (which ? arg+1 : arg+2));
154 static void client2(size_t cl) {
155 size_t which_cl = cl % NUM_CL;
160 while(time(0) - t1 < 10){
161 auto arg = std::uniform_int_distribution<unsigned long>(0,2000)(global->random_generator);
163 int ret = clients[which_cl]->call(srv_protocol::bigrep, rep, arg);
164 if ((unsigned long)rep.size()!=arg)
165 cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
166 VERIFY((unsigned long)rep.size() == arg);
170 static void client3(void *xx) {
171 rpcc *c = (rpcc *) xx;
173 for(int i = 0; i < 4; i++){
175 int ret = c->call_timeout(srv_protocol::slow, milliseconds(300), rep, i);
176 VERIFY(ret == rpc_protocol::timeout_failure || rep == i+2);
180 static void simple_tests(rpcc *c) {
181 cout << "simple_tests" << endl;
182 // an RPC call to procedure #22.
183 // rpcc::call() looks at the argument types to decide how
184 // to marshall the RPC call packet, and how to unmarshall
187 int intret = c->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
188 VERIFY(intret == 0); // this is what handle_22 returns
189 VERIFY(rep == "hello goodbye");
190 cout << " -- string concat RPC .. ok" << endl;
192 // small request, big reply (perhaps req via UDP, reply via TCP)
193 intret = c->call_timeout(srv_protocol::bigrep, milliseconds(20000), rep, 70000ul);
195 VERIFY(rep.size() == 70000);
196 cout << " -- small request, big reply .. ok" << endl;
198 // specify a timeout value to an RPC that should succeed (udp)
200 intret = c->call_timeout(srv_protocol::fast, milliseconds(300), xx, 77);
201 VERIFY(intret == 0 && xx == 78);
202 cout << " -- no spurious timeout .. ok" << endl;
204 // specify a timeout value to an RPC that should succeed (tcp)
206 string arg(1000, 'x');
208 c->call_timeout(srv_protocol::_22, milliseconds(300), rep2, arg, (string)"x");
209 VERIFY(rep2.size() == 1001);
210 cout << " -- no spurious timeout .. ok" << endl;
214 string big(1000000, 'x');
215 intret = c->call(srv_protocol::_22, rep, big, (string)"z");
217 VERIFY(rep.size() == 1000001);
218 cout << " -- huge 1M rpc request .. ok" << endl;
220 // specify a timeout value to an RPC that should timeout (udp)
221 string non_existent = "127.0.0.1:7661";
222 rpcc *c1 = new rpcc(non_existent);
224 intret = c1->bind(milliseconds(300));
226 VERIFY(intret < 0 && (t1 - t0) <= 4);
227 cout << " -- rpc timeout .. ok" << endl;
228 cout << "simple_tests OK" << endl;
231 static void concurrent_test(size_t nt) {
232 // create threads that make lots of calls in parallel,
233 // to test thread synchronization for concurrent calls
235 cout << "start concurrent_test (" << nt << " threads) ...";
237 vector<thread> th(nt);
239 for(size_t i = 0; i < nt; i++)
240 th[i] = thread(client1, i);
242 for(size_t i = 0; i < nt; i++)
245 cout << " OK" << endl;
248 static void lossy_test() {
249 cout << "start lossy_test ...";
250 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
257 for (int i = 0; i < NUM_CL; i++) {
259 clients[i] = new rpcc(*dst);
260 VERIFY(clients[i]->bind()==0);
265 vector<thread> th(nt);
267 for(size_t i = 0; i < nt; i++)
268 th[i] = thread(client2, i);
270 for(size_t i = 0; i < nt; i++)
273 cout << ".. OK" << endl;
274 VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
277 static void failure_test() {
279 rpcc *client = clients[0];
281 cout << "failure_test" << endl;
285 client1 = new rpcc(*dst);
286 VERIFY (client1->bind(milliseconds(3000)) < 0);
287 cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
294 int intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
295 VERIFY(intret == rpc_protocol::oldsrv_failure);
296 cout << " -- call recovered server with old client .. failed ok" << endl;
300 clients[0] = client = new rpcc(*dst);
301 VERIFY (client->bind() >= 0);
302 VERIFY (client->bind() < 0);
304 intret = client->call(srv_protocol::_22, rep, (string)"hello", (string)" goodbye");
306 VERIFY(rep == "hello goodbye");
308 cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
312 cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
314 vector<thread> th(nt);
316 for(size_t i = 0; i < nt; i++)
317 th[i] = thread(client3, client);
319 for(size_t i = 0; i < nt; i++)
322 cout << "ok" << endl;
328 clients[0] = client = new rpcc(*dst);
329 VERIFY (client->bind() >= 0);
330 cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
332 cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
334 for(size_t i = 0; i < nt; i++)
335 th[i] = thread(client3, client);
337 for(size_t i = 0; i < nt; i++)
340 cout << "ok" << endl;
342 cout << "failure_test OK" << endl;
345 int main(int argc, char *argv[]) {
346 global = new t4_state('r');
348 setvbuf(stdout, NULL, _IONBF, 0);
349 setvbuf(stderr, NULL, _IONBF, 0);
352 bool isclient = false;
353 bool isserver = false;
355 port = 20000 + (getpid() % 10000);
358 while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
367 debug_level = atoi(optarg);
370 port = (in_port_t)atoi(optarg);
373 VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
380 if (!isserver && !isclient) {
381 isserver = isclient = true;
384 if (debug_level > 0) {
385 global->DEBUG_LEVEL = debug_level;
386 IF_LEVEL(1) LOG_NONMEMBER << "DEBUG LEVEL: " << debug_level;
392 cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)rpc_protocol::RPC_HEADER_SZ << endl;
398 dst = new string("127.0.0.1:" + std::to_string(port));
401 // start the client. bind it to the server.
402 // starts a thread to listen for replies and hand them to
403 // the correct waiting caller thread. there should probably
404 // be only one rpcc per process. you probably need one
406 for (int i = 0; i < NUM_CL; i++) {
407 clients[i] = new rpcc(*dst);
408 VERIFY (clients[i]->bind() == 0);
411 simple_tests(clients[0]);
418 cout << "rpctest OK" << endl;
424 std::this_thread::sleep_for(milliseconds(100));