#include "rpc.h"
#include <arpa/inet.h>
-#include <stdio.h>
+#include <iostream>
+#include <vector>
+#include <thread>
#include <stdlib.h>
-#include <string.h>
#include <getopt.h>
#include <sys/types.h>
#include <unistd.h>
#define NUM_CL 2
+char log_thread_prefix = 'r';
+
+using std::string;
+using std::cout;
+using std::endl;
+using std::vector;
+using std::thread;
+
rpcs *server; // server rpc object
rpcc *clients[NUM_CL]; // client rpc object
-struct sockaddr_in dst; //server's ip address
+string dst; //server's ip address
int port;
// server-side handlers. they must be methods of some class
// from multiple classes.
class srv {
public:
- int handle_22(std::string & r, const std::string a, const std::string b);
+ int handle_22(string & r, const string a, const string b);
int handle_fast(int &r, const int a);
int handle_slow(int &r, const int a);
- int handle_bigrep(std::string &r, const int a);
+ int handle_bigrep(string &r, const size_t a);
};
// a handler. a and b are arguments, r is the result.
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
int
-srv::handle_22(std::string &r, const std::string a, std::string b)
+srv::handle_22(string &r, const string a, string b)
{
r = a + b;
return 0;
}
int
-srv::handle_bigrep(std::string &r, const int len)
+srv::handle_bigrep(string &r, const size_t len)
{
- r = std::string(len, 'x');
+ r = string((size_t)len, 'x');
return 0;
}
void startserver()
{
- server = new rpcs(port);
+ server = new rpcs((unsigned int)port);
server->reg(22, &srv::handle_22, &service);
server->reg(23, &srv::handle_fast, &service);
server->reg(24, &srv::handle_slow, &service);
VERIFY(m.size()==RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
- std::string s = std::string("hallo....");
+ string s = "hallo....";
m << i;
m << l;
m << s;
char *b;
- int sz;
+ size_t sz;
m.take_buf(&b,&sz);
- VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
+ VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
unmarshall un(b,sz);
request_header rh1;
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
unsigned long long l1;
- std::string s1;
+ string s1;
un >> i1;
un >> l1;
un >> s1;
}
void
-client1(int cl)
+client1(size_t cl)
{
// test concurrency.
- int which_cl = ((unsigned long) cl ) % NUM_CL;
+ size_t which_cl = cl % NUM_CL;
for(int i = 0; i < 100; i++){
int arg = (random() % 2000);
- std::string rep;
+ string rep;
int ret = clients[which_cl]->call(25, rep, arg);
VERIFY(ret == 0);
- if ((int)rep.size()!=arg) {
- printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
- }
+ if ((int)rep.size()!=arg)
+ cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
VERIFY((int)rep.size() == arg);
}
int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
auto end = std::chrono::steady_clock::now();
- int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+ auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (ret != 0)
- printf("%d ms have elapsed!!!\n", diff);
+ cout << diff << " ms have elapsed!!!" << endl;
VERIFY(ret == 0);
VERIFY(rep == (which ? arg+1 : arg+2));
}
}
void
-client2(int cl)
+client2(size_t cl)
{
- int which_cl = ((unsigned long) cl ) % NUM_CL;
+ size_t which_cl = cl % NUM_CL;
time_t t1;
time(&t1);
while(time(0) - t1 < 10){
int arg = (random() % 2000);
- std::string rep;
+ string rep;
int ret = clients[which_cl]->call(25, rep, arg);
- if ((int)rep.size()!=arg) {
- printf("ask for %d reply got %d ret %d\n",
- arg, (int)rep.size(), ret);
- }
+ if ((int)rep.size()!=arg)
+ cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
VERIFY((int)rep.size() == arg);
}
}
void
simple_tests(rpcc *c)
{
- printf("simple_tests\n");
+ cout << "simple_tests" << endl;
// an RPC call to procedure #22.
// rpcc::call() looks at the argument types to decide how
// to marshall the RPC call packet, and how to unmarshall
// the reply packet.
- std::string rep;
- int intret = c->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ string rep;
+ int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0); // this is what handle_22 returns
VERIFY(rep == "hello goodbye");
- printf(" -- string concat RPC .. ok\n");
+ cout << " -- string concat RPC .. ok" << endl;
// small request, big reply (perhaps req via UDP, reply via TCP)
intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
VERIFY(intret == 0);
VERIFY(rep.size() == 70000);
- printf(" -- small request, big reply .. ok\n");
+ cout << " -- small request, big reply .. ok" << endl;
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
VERIFY(intret == 0 && xx == 78);
- printf(" -- no spurious timeout .. ok\n");
+ cout << " -- no spurious timeout .. ok" << endl;
// specify a timeout value to an RPC that should succeed (tcp)
{
- std::string arg(1000, 'x');
- std::string rep;
- c->call_timeout(22, rpcc::to(3000), rep, arg, (std::string)"x");
- VERIFY(rep.size() == 1001);
- printf(" -- no spurious timeout .. ok\n");
+ string arg(1000, 'x');
+ string rep2;
+ c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+ VERIFY(rep2.size() == 1001);
+ cout << " -- no spurious timeout .. ok" << endl;
}
// huge RPC
- std::string big(1000000, 'x');
- intret = c->call(22, rep, big, (std::string)"z");
+ string big(1000000, 'x');
+ intret = c->call(22, rep, big, (string)"z");
VERIFY(rep.size() == 1000001);
- printf(" -- huge 1M rpc request .. ok\n");
+ cout << " -- huge 1M rpc request .. ok" << endl;
// specify a timeout value to an RPC that should timeout (udp)
- struct sockaddr_in non_existent;
- memset(&non_existent, 0, sizeof(non_existent));
- non_existent.sin_family = AF_INET;
- non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
- non_existent.sin_port = htons(7661);
+ string non_existent = "127.0.0.1:7661";
rpcc *c1 = new rpcc(non_existent);
time_t t0 = time(0);
intret = c1->bind(rpcc::to(3000));
time_t t1 = time(0);
VERIFY(intret < 0 && (t1 - t0) <= 4);
- printf(" -- rpc timeout .. ok\n");
- printf("simple_tests OK\n");
+ cout << " -- rpc timeout .. ok" << endl;
+ cout << "simple_tests OK" << endl;
}
void
-concurrent_test(int nt)
+concurrent_test(size_t nt)
{
// create threads that make lots of calls in parallel,
// to test thread synchronization for concurrent calls
// and dispatches.
- printf("start concurrent_test (%d threads) ...", nt);
+ cout << "start concurrent_test (" << nt << " threads) ...";
- std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
- th[i] = std::thread(client1, i);
- }
+ vector<thread> th(nt);
+
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client1, i);
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
- printf(" OK\n");
+
+ cout << " OK" << endl;
}
void
lossy_test()
{
- printf("start lossy_test ...");
+ cout << "start lossy_test ...";
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
if (server) {
VERIFY(clients[i]->bind()==0);
}
- int nt = 1;
- std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
- th[i] = std::thread(client2, i);
- }
- for(int i = 0; i < nt; i++){
+ size_t nt = 1;
+
+ vector<thread> th(nt);
+
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client2, i);
+
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
- printf(".. OK\n");
+
+ cout << ".. OK" << endl;
VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
}
rpcc *client1;
rpcc *client = clients[0];
- printf("failure_test\n");
+ cout << "failure_test" << endl;
delete server;
client1 = new rpcc(dst);
VERIFY (client1->bind(rpcc::to(3000)) < 0);
- printf(" -- create new client and try to bind to failed server .. failed ok\n");
+ cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
delete client1;
startserver();
- std::string rep;
- int intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ string rep;
+ int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == rpc_const::oldsrv_failure);
- printf(" -- call recovered server with old client .. failed ok\n");
+ cout << " -- call recovered server with old client .. failed ok" << endl;
delete client;
VERIFY (client->bind() >= 0);
VERIFY (client->bind() < 0);
- intret = client->call(22, rep, (std::string)"hello", (std::string)" goodbye");
+ intret = client->call(22, rep, (string)"hello", (string)" goodbye");
VERIFY(intret == 0);
VERIFY(rep == "hello goodbye");
- printf(" -- delete existing rpc client, create replacement rpc client .. ok\n");
+ cout << " -- delete existing rpc client, create replacement rpc client .. ok" << endl;
- int nt = 10;
- printf(" -- concurrent test on new rpc client w/ %d threads ..", nt);
+ size_t nt = 10;
+ cout << " -- concurrent test on new rpc client w/ " << nt << " threads ..";
- std::vector<std::thread> th(nt);
- for(int i = 0; i < nt; i++){
- th[i] = std::thread(client3, client);
- }
+ vector<thread> th(nt);
+
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client3, client);
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
- printf("ok\n");
+
+ cout << "ok" << endl;
delete server;
delete client;
startserver();
clients[0] = client = new rpcc(dst);
VERIFY (client->bind() >= 0);
- printf(" -- delete existing rpc client and server, create replacements.. ok\n");
+ cout << " -- delete existing rpc client and server, create replacements.. ok" << endl;
- printf(" -- concurrent test on new client and server w/ %d threads ..", nt);
- for(int i = 0; i < nt; i++){
- th[i] = std::thread(client3, client);
- }
+ cout << " -- concurrent test on new client and server w/ " << nt << " threads ..";
+
+ for(size_t i = 0; i < nt; i++)
+ th[i] = thread(client3, client);
- for(int i = 0; i < nt; i++){
+ for(size_t i = 0; i < nt; i++)
th[i].join();
- }
- printf("ok\n");
- printf("failure_test OK\n");
+ cout << "ok" << endl;
+
+ cout << "failure_test OK" << endl;
}
int
bool isclient = false;
bool isserver = false;
- srandom(getpid());
+ srandom((uint32_t)getpid());
port = 20000 + (getpid() % 10000);
- char ch = 0;
+ int ch = 0;
while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
switch (ch) {
case 'c':
break;
case 'l':
VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+ break;
default:
break;
}
testmarshall();
if (isserver) {
- printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
+ cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
startserver();
}
if (isclient) {
// server's address.
- memset(&dst, 0, sizeof(dst));
- dst.sin_family = AF_INET;
- dst.sin_addr.s_addr = inet_addr("127.0.0.1");
- dst.sin_port = htons(port);
+ dst = "127.0.0.1:" + std::to_string(port);
// start the client. bind it to the server.
failure_test();
}
- printf("rpctest OK\n");
+ cout << "rpctest OK" << endl;
exit(0);
}