// RPC test and pseudo-documentation.
// generates print statements on failures, but eventually says "rpctest OK"
+#include "types.h"
#include "rpc.h"
#include <arpa/inet.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
#include <getopt.h>
#include <sys/types.h>
#include <unistd.h>
-#include "jsl_log.h"
-#include "lang/verify.h"
#define NUM_CL 2
-char tprintf_thread_prefix = 'r';
+char log_thread_prefix = 'r';
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
-struct sockaddr_in dst; //server's ip address
+string dst; //server's ip address
int port;
// server-side handlers. they must be methods of some class
// to simplify rpcs::reg(). a server process can have handlers
// from multiple classes.
class srv {
- public:
- int handle_22(std::string & r, const std::string a, const std::string b);
- int handle_fast(int &r, const int a);
- int handle_slow(int &r, const int a);
- int handle_bigrep(std::string &r, const size_t a);
+ public:
+ int handle_22(string & r, const string a, const string b);
+ int handle_fast(int &r, const int a);
+ int handle_slow(int &r, const int a);
+ int handle_bigrep(string &r, const size_t a);
};
// a handler. a and b are arguments, r is the result.
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
int
-srv::handle_22(std::string &r, const std::string a, std::string b)
+srv::handle_22(string &r, const string a, string b)
{
- r = a + b;
- return 0;
+ r = a + b;
+ return 0;
}
int
srv::handle_fast(int &r, const int a)
{
- r = a + 1;
- return 0;
+ r = a + 1;
+ return 0;
}
int
srv::handle_slow(int &r, const int a)
{
- usleep(random() % 5000);
- r = a + 2;
- return 0;
+ usleep(random() % 5000);
+ r = a + 2;
+ return 0;
}
int
-srv::handle_bigrep(std::string &r, const size_t len)
+srv::handle_bigrep(string &r, const size_t len)
{
- r = std::string((size_t)len, 'x');
- return 0;
+ r = string((size_t)len, 'x');
+ return 0;
}
srv service;
void startserver()
{
- server = new rpcs((unsigned int)port);
- server->reg(22, &srv::handle_22, &service);
- server->reg(23, &srv::handle_fast, &service);
- server->reg(24, &srv::handle_slow, &service);
- server->reg(25, &srv::handle_bigrep, &service);
+ server = new rpcs((unsigned int)port);
+ server->reg(22, &srv::handle_22, &service);
+ server->reg(23, &srv::handle_fast, &service);
+ server->reg(24, &srv::handle_slow, &service);
+ server->reg(25, &srv::handle_bigrep, &service);
}
void
testmarshall()
{
- marshall m;
- request_header rh{1,2,3,4,5};
- m.pack_req_header(rh);
- VERIFY(m.size()==RPC_HEADER_SZ);
- int i = 12345;
- unsigned long long l = 1223344455L;
- std::string s = std::string("hallo....");
- m << i;
- m << l;
- m << s;
-
- char *b;
- size_t sz;
- m.take_buf(&b,&sz);
- VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
-
- unmarshall un(b,sz);
- request_header rh1;
- un.unpack_req_header(&rh1);
- VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
- int i1;
- unsigned long long l1;
- std::string s1;
- un >> i1;
- un >> l1;
- un >> s1;
- VERIFY(un.okdone());
- VERIFY(i1==i && l1==l && s1==s);
+ marshall m;
+ request_header rh{1,2,3,4,5};
+ m.pack_req_header(rh);
+ VERIFY(m.size()==RPC_HEADER_SZ);
+ int i = 12345;
+ unsigned long long l = 1223344455L;
+ string s = "hallo....";
+ m << i;
+ m << l;
+ m << s;
+
+ char *b;
+ size_t sz;
+ m.take_buf(&b,&sz);
+ VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+
+ unmarshall un(b,sz);
+ request_header rh1;
+ un.unpack_req_header(&rh1);
+ VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+ int i1;
+ unsigned long long l1;
+ string s1;
+ un >> i1;
+ un >> l1;
+ un >> s1;
+ VERIFY(un.okdone());
+ VERIFY(i1==i && l1==l && s1==s);
}
void
client1(size_t cl)
{
- // test concurrency.
- size_t which_cl = cl % NUM_CL;
-
- for(int i = 0; i < 100; i++){
- int arg = (random() % 2000);
- std::string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- VERIFY(ret == 0);
- if ((int)rep.size()!=arg) {
- printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
- }
- VERIFY((int)rep.size() == arg);
- }
-
- // test rpc replies coming back not in the order of
- // the original calls -- i.e. does xid reply dispatch work.
- for(int i = 0; i < 100; i++){
- int which = (random() % 2);
- int arg = (random() % 1000);
- int rep;
-
- auto start = std::chrono::steady_clock::now();
-
- int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
- auto end = std::chrono::steady_clock::now();
- auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
- if (ret != 0)
- printf("%d ms have elapsed!!!\n", (int)diff);
- VERIFY(ret == 0);
- VERIFY(rep == (which ? arg+1 : arg+2));
- }
+ // test concurrency.
+ size_t which_cl = cl % NUM_CL;
+
+ for(int i = 0; i < 100; i++){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ VERIFY(ret == 0);
+ if ((int)rep.size()!=arg)
+ cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
+ VERIFY((int)rep.size() == arg);
+ }
+
+ // test rpc replies coming back not in the order of
+ // the original calls -- i.e. does xid reply dispatch work.
+ for(int i = 0; i < 100; i++){
+ int which = (random() % 2);
+ int arg = (random() % 1000);
+ int rep;
+
+ auto start = std::chrono::steady_clock::now();
+
+ int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+ auto end = std::chrono::steady_clock::now();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+ if (ret != 0)
+ cout << diff << " ms have elapsed!!!" << endl;
+ VERIFY(ret == 0);
+ VERIFY(rep == (which ? arg+1 : arg+2));
+ }
}
void
client2(size_t cl)
{
- size_t which_cl = cl % NUM_CL;
-
- time_t t1;
- time(&t1);
-
- while(time(0) - t1 < 10){
- int arg = (random() % 2000);
- std::string rep;
- int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg) {
- printf("ask for %d reply got %d ret %d\n",
- arg, (int)rep.size(), ret);
- }
- VERIFY((int)rep.size() == arg);
- }
+ size_t which_cl = cl % NUM_CL;
+
+ time_t t1;
+ time(&t1);
+
+ while(time(0) - t1 < 10){
+ int arg = (random() % 2000);
+ string rep;
+ int ret = clients[which_cl]->call(25, rep, arg);
+ if ((int)rep.size()!=arg)
+ cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
+ VERIFY((int)rep.size() == arg);
+ }
}
void
client3(void *xx)
{
- rpcc *c = (rpcc *) xx;
+ rpcc *c = (rpcc *) xx;
- for(int i = 0; i < 4; i++){
- int rep;
- int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
- VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
- }
+ for(int i = 0; i < 4; i++){
+ int rep = 0;
+ int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
+ VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+ }
}
void
simple_tests(rpcc *c)
{
- printf("simple_tests\n");
- // an RPC call to procedure #22.
- // rpcc::call() looks at the argument types to decide how
- // to marshall the RPC call packet, and how to unmarshall
- // the reply packet.
- std::string rep;
- int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
- VERIFY(intret == 0); // this is what handle_22 returns
- VERIFY(rep == "hello goodbye");
- printf(" -- string concat RPC .. ok\n");
-
- // small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
- VERIFY(intret == 0);
- VERIFY(rep.size() == 70000);
- printf(" -- small request, big reply .. ok\n");
-
- // specify a timeout value to an RPC that should succeed (udp)
- int xx = 0;
- intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
- VERIFY(intret == 0 && xx == 78);
- printf(" -- no spurious timeout .. ok\n");
-
- // specify a timeout value to an RPC that should succeed (tcp)
- {
- std::string arg(1000, 'x');
- std::string rep2;
- c->call_timeout(22, rpcc::to(3000), rep2, arg, (std::string)"x");
- VERIFY(rep2.size() == 1001);
- printf(" -- no spurious timeout .. ok\n");
- }
-
- // huge RPC
- std::string big(1000000, 'x');
- intret = c->call(22, rep, big, (std::string)"z");
- VERIFY(rep.size() == 1000001);
- printf(" -- huge 1M rpc request .. ok\n");
-
- // specify a timeout value to an RPC that should timeout (udp)
- struct sockaddr_in non_existent;
- memset(&non_existent, 0, sizeof(non_existent));
- non_existent.sin_family = AF_INET;
- non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
- non_existent.sin_port = htons(7661);
- rpcc *c1 = new rpcc(non_existent);
- time_t t0 = time(0);
- intret = c1->bind(rpcc::to(3000));
- time_t t1 = time(0);
- VERIFY(intret < 0 && (t1 - t0) <= 4);
- printf(" -- rpc timeout .. ok\n");
- printf("simple_tests OK\n");
+ cout << "simple_tests" << endl;
+ // an RPC call to procedure #22.
+ // rpcc::call() looks at the argument types to decide how
+ // to marshall the RPC call packet, and how to unmarshall
+ // the reply packet.
+ string rep;
+ int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == 0); // this is what handle_22 returns
+ VERIFY(rep == "hello goodbye");
+ cout << " -- string concat RPC .. ok" << endl;
+
+ // small request, big reply (perhaps req via UDP, reply via TCP)
+ intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
+ VERIFY(intret == 0);
+ VERIFY(rep.size() == 70000);
+ cout << " -- small request, big reply .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (udp)
+ int xx = 0;
+ intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
+ VERIFY(intret == 0 && xx == 78);
+ cout << " -- no spurious timeout .. ok" << endl;
+
+ // specify a timeout value to an RPC that should succeed (tcp)
+ {
+ string arg(1000, 'x');
+ string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+ VERIFY(rep2.size() == 1001);
+ cout << " -- no spurious timeout .. ok" << endl;
+ }
+
+ // huge RPC
+ string big(1000000, 'x');
+ intret = c->call(22, rep, big, (string)"z");
+ VERIFY(rep.size() == 1000001);
+ cout << " -- huge 1M rpc request .. ok" << endl;
+
+ // specify a timeout value to an RPC that should timeout (udp)
+ string non_existent = "127.0.0.1:7661";
+ rpcc *c1 = new rpcc(non_existent);
+ time_t t0 = time(0);
+ intret = c1->bind(rpcc::to(3000));
+ time_t t1 = time(0);
+ VERIFY(intret < 0 && (t1 - t0) <= 4);
+ cout << " -- rpc timeout .. ok" << endl;
+ cout << "simple_tests OK" << endl;
}
void
concurrent_test(size_t nt)
{
- // create threads that make lots of calls in parallel,
- // to test thread synchronization for concurrent calls
- // and dispatches.
- printf("start concurrent_test (%lu threads) ...", nt);
+ // create threads that make lots of calls in parallel,
+ // to test thread synchronization for concurrent calls
+ // and dispatches.
+ cout << "start concurrent_test (" << nt << " threads) ...";
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client1, i);
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client1, i);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- printf(" OK\n");
+ cout << " OK" << endl;
}
void
lossy_test()
{
- printf("start lossy_test ...");
- VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ cout << "start lossy_test ...";
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
- if (server) {
- delete server;
- startserver();
- }
+ if (server) {
+ delete server;
+ startserver();
+ }
- for (int i = 0; i < NUM_CL; i++) {
- delete clients[i];
- clients[i] = new rpcc(dst);
- VERIFY(clients[i]->bind()==0);
- }
+ for (int i = 0; i < NUM_CL; i++) {
+ delete clients[i];
+ clients[i] = new rpcc(dst);
+ VERIFY(clients[i]->bind()==0);
+ }
- size_t nt = 1;
+ size_t nt = 1;
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client2, i);
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client2, i);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- printf(".. OK\n");
- VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
+ cout << ".. OK" << endl;
+ VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
void
failure_test()
{
- rpcc *client1;
- rpcc *client = clients[0];
+ rpcc *client1;
+ rpcc *client = clients[0];
- printf("failure_test\n");
+ cout << "failure_test" << endl;
- delete server;
+ delete server;
- client1 = new rpcc(dst);
- VERIFY (client1->bind(rpcc::to(3000)) < 0);
- printf(" -- create new client and try to bind to failed server .. failed ok\n");
+ client1 = new rpcc(dst);
+ VERIFY (client1->bind(rpcc::to(3000)) < 0);
+ cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
- delete client1;
+ delete client1;
- startserver();
+ startserver();
- std::string rep;
- int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
- VERIFY(intret == rpc_const::oldsrv_failure);
- printf(" -- call recovered server with old client .. failed ok\n");
+ string rep;
+ int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == rpc_const::oldsrv_failure);
+ cout << " -- call recovered server with old client .. failed ok" << endl;
- delete client;
+ delete client;
- clients[0] = client = new rpcc(dst);
- VERIFY (client->bind() >= 0);
- VERIFY (client->bind() < 0);
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ VERIFY (client->bind() < 0);
- intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
- VERIFY(intret == 0);
- VERIFY(rep == "hello goodbye");
+ intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+ VERIFY(intret == 0);
+ VERIFY(rep == "hello goodbye");
- printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
+ cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
- size_t nt = 10;
- printf(" -- concurrent test on new rpc client w/ %lu threads ..", nt);
+ size_t nt = 10;
+ cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
- std::vector<std::thread> th(nt);
+ vector<thread> th(nt);
- for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client3, client);
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client3, client);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- printf("ok\n");
+ cout << "ok" << endl;
- delete server;
- delete client;
+ delete server;
+ delete client;
- startserver();
- clients[0] = client = new rpcc(dst);
- VERIFY (client->bind() >= 0);
- printf(" -- delete existing rpc client and server, create replacements.. ok\n");
+ startserver();
+ clients[0] = client = new rpcc(dst);
+ VERIFY (client->bind() >= 0);
+ cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
- printf(" -- concurrent test on new client and server w/ %lu threads ..", nt);
+ cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
- for(size_t i = 0; i < nt; i++)
- th[i] = std::thread(client3, client);
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client3, client);
- for(size_t i = 0; i < nt; i++)
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- printf("ok\n");
+ cout << "ok" << endl;
- printf("failure_test OK\n");
+ cout << "failure_test OK" << endl;
}
int
main(int argc, char *argv[])
{
- setvbuf(stdout, NULL, _IONBF, 0);
- setvbuf(stderr, NULL, _IONBF, 0);
- int debug_level = 0;
-
- bool isclient = false;
- bool isserver = false;
-
- srandom((uint32_t)getpid());
- port = 20000 + (getpid() % 10000);
-
- int ch = 0;
- while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
- switch (ch) {
- case 'c':
- isclient = true;
- break;
- case 's':
- isserver = true;
- break;
- case 'd':
- debug_level = atoi(optarg);
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'l':
- VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ setvbuf(stdout, NULL, _IONBF, 0);
+ setvbuf(stderr, NULL, _IONBF, 0);
+ int debug_level = 0;
+
+ bool isclient = false;
+ bool isserver = false;
+
+ srandom((uint32_t)getpid());
+ port = 20000 + (getpid() % 10000);
+
+ int ch = 0;
+ while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+ switch (ch) {
+ case 'c':
+ isclient = true;
+ break;
+ case 's':
+ isserver = true;
+ break;
+ case 'd':
+ debug_level = atoi(optarg);
+ break;
+ case 'p':
+ port = atoi(optarg);
+ break;
+ case 'l':
+ VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ break;
+ default:
break;
- default:
- break;
- }
- }
-
- if (!isserver && !isclient) {
- isserver = isclient = true;
- }
-
- if (debug_level > 0) {
- JSL_DEBUG_LEVEL = debug_level;
- jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
- }
-
- testmarshall();
-
- if (isserver) {
- printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
- startserver();
- }
-
- if (isclient) {
- // server's address.
- memset(&dst, 0, sizeof(dst));
- dst.sin_family = AF_INET;
- dst.sin_addr.s_addr = inet_addr("127.0.0.1");
- dst.sin_port = htons(port);
-
-
- // start the client. bind it to the server.
- // starts a thread to listen for replies and hand them to
- // the correct waiting caller thread. there should probably
- // be only one rpcc per process. you probably need one
- // rpcc per server.
- for (int i = 0; i < NUM_CL; i++) {
- clients[i] = new rpcc(dst);
- VERIFY (clients[i]->bind() == 0);
- }
-
- simple_tests(clients[0]);
- concurrent_test(10);
- lossy_test();
- if (isserver) {
- failure_test();
- }
-
- printf("rpctest OK\n");
-
- exit(0);
- }
-
- while (1) {
- sleep(1);
- }
+ }
+ }
+
+ if (!isserver && !isclient) {
+ isserver = isclient = true;
+ }
+
+ if (debug_level > 0) {
+ DEBUG_LEVEL = debug_level;
+ IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+ }
+
+ testmarshall();
+
+ if (isserver) {
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+ startserver();
+ }
+
+ if (isclient) {
+ // server's address.
+ dst = "127.0.0.1:" + std::to_string(port);
+
+
+ // start the client. bind it to the server.
+ // starts a thread to listen for replies and hand them to
+ // the correct waiting caller thread. there should probably
+ // be only one rpcc per process. you probably need one
+ // rpcc per server.
+ for (int i = 0; i < NUM_CL; i++) {
+ clients[i] = new rpcc(dst);
+ VERIFY (clients[i]->bind() == 0);
+ }
+
+ simple_tests(clients[0]);
+ concurrent_test(10);
+ lossy_test();
+ if (isserver) {
+ failure_test();
+ }
+
+ cout << "rpctest OK" << endl;
+
+ exit(0);
+ }
+
+ while (1) {
+ sleep(1);
+ }
}