- public:
- int handle_22(string & r, const string a, const string b);
- int handle_fast(int &r, const int a);
- int handle_slow(int &r, const int a);
- int handle_bigrep(string &r, const size_t a);
+ public:
+ int handle_22(string & r, const string a, const string b);
+ int handle_fast(int &r, const int a);
+ int handle_slow(int &r, const int a);
+ int handle_bigrep(string &r, const size_t a);
- server = new rpcs((unsigned int)port);
- server->reg(22, &srv::handle_22, &service);
- server->reg(23, &srv::handle_fast, &service);
- server->reg(24, &srv::handle_slow, &service);
- server->reg(25, &srv::handle_bigrep, &service);
+ server = new rpcs(port);
+ server->reg(22, &srv::handle_22, &service);
+ server->reg(23, &srv::handle_fast, &service);
+ server->reg(24, &srv::handle_slow, &service);
+ server->reg(25, &srv::handle_bigrep, &service);
- marshall m;
- request_header rh{1,2,3,4,5};
- m.pack_req_header(rh);
- VERIFY(m.size()==RPC_HEADER_SZ);
- int i = 12345;
- unsigned long long l = 1223344455L;
- string s = "hallo....";
- m << i;
- m << l;
- m << s;
-
- char *b;
- size_t sz;
- m.take_buf(&b,&sz);
- VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
-
- unmarshall un(b,sz);
- request_header rh1;
- un.unpack_req_header(&rh1);
- VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
- int i1;
- unsigned long long l1;
- string s1;
- un >> i1;
- un >> l1;
- un >> s1;
- VERIFY(un.okdone());
- VERIFY(i1==i && l1==l && s1==s);
+ marshall m;
+ request_header rh{1,2,3,4,5};
+ m.pack_header(rh);
+ VERIFY(((string)m).size()==RPC_HEADER_SZ);
+ int i = 12345;
+ unsigned long long l = 1223344455L;
+ string s = "hallo....";
+ m << i;
+ m << l;
+ m << s;
+
+ string b = m;
+ VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+
+ unmarshall un(b, true);
+ request_header rh1;
+ un.unpack_header(rh1);
+ VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+ int i1;
+ unsigned long long l1;
+ string s1;
+ un >> i1;
+ un >> l1;
+ un >> s1;
+ VERIFY(un.okdone());
+ VERIFY(i1==i && l1==l && s1==s);
- // test concurrency.
- size_t which_cl = cl % NUM_CL;
-
- for(int i = 0; i < 100; i++){
- int arg = (random() % 2000);
- string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- VERIFY(ret == 0);
- if ((int)rep.size()!=arg)
- cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
- VERIFY((int)rep.size() == arg);
- }
-
- // test rpc replies coming back not in the order of
- // the original calls -- i.e. does xid reply dispatch work.
- for(int i = 0; i < 100; i++){
- int which = (random() % 2);
- int arg = (random() % 1000);
- int rep;
-
- auto start = std::chrono::steady_clock::now();
-
- int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
- auto end = std::chrono::steady_clock::now();
- auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
- if (ret != 0)
- cout << diff << " ms have elapsed!!!" << endl;
- VERIFY(ret == 0);
- VERIFY(rep == (which ? arg+1 : arg+2));
- }
+ // test concurrency.
+ size_t which_cl = cl % NUM_CL;
+
+ for(int i = 0; i < 100; i++){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ VERIFY(ret == 0);
+ if ((int)rep.size()!=arg)
+ cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
+ VERIFY((int)rep.size() == arg);
+ }
+
+ // test rpc replies coming back not in the order of
+ // the original calls -- i.e. does xid reply dispatch work.
+ for(int i = 0; i < 100; i++){
+ int which = (random() % 2);
+ int arg = (random() % 1000);
+ int rep;
+
+ auto start = steady_clock::now();
+
+ int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+ auto end = steady_clock::now();
+ auto diff = duration_cast<milliseconds>(end - start).count();
+ if (ret != 0)
+ cout << diff << " ms have elapsed!!!" << endl;
+ VERIFY(ret == 0);
+ VERIFY(rep == (which ? arg+1 : arg+2));
+ }
- size_t which_cl = cl % NUM_CL;
-
- time_t t1;
- time(&t1);
-
- while(time(0) - t1 < 10){
- int arg = (random() % 2000);
- string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg)
- cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
- VERIFY((int)rep.size() == arg);
- }
+ size_t which_cl = cl % NUM_CL;
+
+ time_t t1;
+ time(&t1);
+
+ while(time(0) - t1 < 10){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ if ((int)rep.size()!=arg)
+ cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
+ VERIFY((int)rep.size() == arg);
+ }
- cout << "simple_tests" << endl;
- // an RPC call to procedure #22.
- // rpcc::call() looks at the argument types to decide how
- // to marshall the RPC call packet, and how to unmarshall
- // the reply packet.
- string rep;
- int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
- VERIFY(intret == 0); // this is what handle_22 returns
- VERIFY(rep == "hello goodbye");
- cout << " -- string concat RPC .. ok" << endl;
-
- // small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
- VERIFY(intret == 0);
- VERIFY(rep.size() == 70000);
- cout << " -- small request, big reply .. ok" << endl;
-
- // specify a timeout value to an RPC that should succeed (udp)
- int xx = 0;
- intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
- VERIFY(intret == 0 && xx == 78);
- cout << " -- no spurious timeout .. ok" << endl;
-
- // specify a timeout value to an RPC that should succeed (tcp)
- {
- string arg(1000, 'x');
- string rep2;
- c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
- VERIFY(rep2.size() == 1001);
- cout << " -- no spurious timeout .. ok" << endl;
- }
-
- // huge RPC
- string big(1000000, 'x');
- intret = c->call(22, rep, big, (string)"z");
- VERIFY(rep.size() == 1000001);
- cout << " -- huge 1M rpc request .. ok" << endl;
-
- // specify a timeout value to an RPC that should timeout (udp)
+ cout << "simple_tests" << endl;
+ // an RPC call to procedure #22.
+ // rpcc::call() looks at the argument types to decide how
+ // to marshall the RPC call packet, and how to unmarshall
+ // the reply packet.
+ string rep;
+ int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == 0); // this is what handle_22 returns
+ VERIFY(rep == "hello goodbye");
+ cout << " -- string concat RPC .. ok" << endl;
+
+ // small request, big reply (perhaps req via UDP, reply via TCP)
+ intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
+ VERIFY(intret == 0);
+ VERIFY(rep.size() == 70000);
+ cout << " -- small request, big reply .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (udp)
+ int xx = 0;
+ intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
+ VERIFY(intret == 0 && xx == 78);
+ cout << " -- no spurious timeout .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (tcp)
+ {
+ string arg(1000, 'x');
+ string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+ VERIFY(rep2.size() == 1001);
+ cout << " -- no spurious timeout .. ok" << endl;
+ }
+
+ // huge RPC
+ string big(1000000, 'x');
+ intret = c->call(22, rep, big, (string)"z");
+ VERIFY(rep.size() == 1000001);
+ cout << " -- huge 1M rpc request .. ok" << endl;
+
+ // specify a timeout value to an RPC that should timeout (udp)
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
- int debug_level = 0;
-
- bool isclient = false;
- bool isserver = false;
-
- srandom((uint32_t)getpid());
- port = 20000 + (getpid() % 10000);
-
- int ch = 0;
- while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
- switch (ch) {
- case 'c':
- isclient = true;
- break;
- case 's':
- isserver = true;
- break;
- case 'd':
- debug_level = atoi(optarg);
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'l':
- VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+ int debug_level = 0;
+
+ bool isclient = false;
+ bool isserver = false;
+
+ srandom((uint32_t)getpid());
+ port = 20000 + (getpid() % 10000);
+
+ int ch = 0;
+ while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+ switch (ch) {
+ case 'c':
+ isclient = true;
+ break;
+ case 's':
+ isserver = true;
+ break;
+ case 'd':
+ debug_level = atoi(optarg);
+ break;
+ case 'p':
+ port = (in_port_t)atoi(optarg);
+ break;
+ case 'l':
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ break;
+ default:
- default:
- break;
- }
- }
-
- if (!isserver && !isclient) {
- isserver = isclient = true;
- }
-
- if (debug_level > 0) {
- JSL_DEBUG_LEVEL = debug_level;
- jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
- }
-
- testmarshall();
-
- if (isserver) {
- cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
- startserver();
- }
-
- if (isclient) {
- // server's address.
- dst = "127.0.0.1:" + std::to_string(port);
-
-
- // start the client. bind it to the server.
- // starts a thread to listen for replies and hand them to
- // the correct waiting caller thread. there should probably
- // be only one rpcc per process. you probably need one
- // rpcc per server.
- for (int i = 0; i < NUM_CL; i++) {
- clients[i] = new rpcc(dst);
- VERIFY (clients[i]->bind() == 0);
- }
-
- simple_tests(clients[0]);
- concurrent_test(10);
- lossy_test();
- if (isserver) {
- failure_test();
- }
-
- cout << "rpctest OK" << endl;
-
- exit(0);
- }
-
- while (1) {
- sleep(1);
- }
+ }
+ }
+
+ if (!isserver && !isclient) {
+ isserver = isclient = true;
+ }
+
+ if (debug_level > 0) {
+ DEBUG_LEVEL = debug_level;
+ IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+ }
+
+ testmarshall();
+
+ if (isserver) {
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+ startserver();
+ }
+
+ if (isclient) {
+ // server's address.
+ dst = "127.0.0.1:" + to_string(port);
+
+
+ // start the client. bind it to the server.
+ // starts a thread to listen for replies and hand them to
+ // the correct waiting caller thread. there should probably
+ // be only one rpcc per process. you probably need one
+ // rpcc per server.
+ for (int i = 0; i < NUM_CL; i++) {
+ clients[i] = new rpcc(dst);
+ VERIFY (clients[i]->bind() == 0);
+ }
+
+ simple_tests(clients[0]);
+ concurrent_test(10);
+ lossy_test();
+ if (isserver) {
+ failure_test();
+ }
+
+ cout << "rpctest OK" << endl;
+
+ exit(0);
+ }
+
+ while (1) {
+ sleep(1);
+ }