Split out marshall code into a new file
[invirt/third/libt4.git] / rpc / rpctest.cc
index 115f484..7217b25 100644 (file)
@@ -1,33 +1,31 @@
 // RPC test and pseudo-documentation.
 // generates print statements on failures, but eventually says "rpctest OK"
 
+#include "types.h"
 #include "rpc.h"
 #include <arpa/inet.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
 #include <getopt.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include "jsl_log.h"
-#include "lang/verify.h"
 
 #define NUM_CL 2
 
+char log_thread_prefix = 'r';
+
 rpcs *server;  // server rpc object
 rpcc *clients[NUM_CL];  // client rpc object
-struct sockaddr_in dst; //server's ip address
+string dst; //server's ip address
 int port;
 
 // server-side handlers. they must be methods of some class
 // to simplify rpcs::reg(). a server process can have handlers
 // from multiple classes.
 class srv {
-       public:
-               int handle_22(const std::string a, const std::string b, std::string & r);
-               int handle_fast(const int a, int &r);
-               int handle_slow(const int a, int &r);
-               int handle_bigrep(const int a, std::string &r);
+    public:
+        int handle_22(string & r, const string a, const string b);
+        int handle_fast(int &r, const int a);
+        int handle_slow(int &r, const int a);
+        int handle_bigrep(string &r, const size_t a);
 };
 
 // a handler. a and b are arguments, r is the result.
@@ -38,404 +36,397 @@ class srv {
 // at these argument types, so this function definition
 // does what a .x file does in SunRPC.
 int
-srv::handle_22(const std::string a, std::string b, std::string &r)
+srv::handle_22(string &r, const string a, string b)
 {
-       r = a + b;
-       return 0;
+    r = a + b;
+    return 0;
 }
 
 int
-srv::handle_fast(const int a, int &r)
+srv::handle_fast(int &r, const int a)
 {
-       r = a + 1;
-       return 0;
+    r = a + 1;
+    return 0;
 }
 
 int
-srv::handle_slow(const int a, int &r)
+srv::handle_slow(int &r, const int a)
 {
-       usleep(random() % 5000);
-       r = a + 2;
-       return 0;
+    usleep(random() % 5000);
+    r = a + 2;
+    return 0;
 }
 
 int
-srv::handle_bigrep(const int len, std::string &r)
+srv::handle_bigrep(string &r, const size_t len)
 {
-       r = std::string(len, 'x');
-       return 0;
+    r = string((size_t)len, 'x');
+    return 0;
 }
 
 srv service;
 
 void startserver()
 {
-       server = new rpcs(port);
-       server->reg(22, &service, &srv::handle_22);
-       server->reg(23, &service, &srv::handle_fast);
-       server->reg(24, &service, &srv::handle_slow);
-       server->reg(25, &service, &srv::handle_bigrep);
+    server = new rpcs((unsigned int)port);
+    server->reg(22, &srv::handle_22, &service);
+    server->reg(23, &srv::handle_fast, &service);
+    server->reg(24, &srv::handle_slow, &service);
+    server->reg(25, &srv::handle_bigrep, &service);
 }
 
 void
 testmarshall()
 {
-       marshall m;
-       req_header rh(1,2,3,4,5);
-       m.pack_req_header(rh);
-       VERIFY(m.size()==RPC_HEADER_SZ);
-       int i = 12345;
-       unsigned long long l = 1223344455L;
-       std::string s = std::string("hallo....");
-       m << i;
-       m << l;
-       m << s;
-
-       char *b;
-       int sz;
-       m.take_buf(&b,&sz);
-       VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)));
-
-       unmarshall un(b,sz);
-       req_header rh1;
-       un.unpack_req_header(&rh1);
-       VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
-       int i1;
-       unsigned long long l1;
-       std::string s1;
-       un >> i1;
-       un >> l1;
-       un >> s1;
-       VERIFY(un.okdone());
-       VERIFY(i1==i && l1==l && s1==s);
+    marshall m;
+    request_header rh{1,2,3,4,5};
+    m.pack_req_header(rh);
+    VERIFY(m.size()==RPC_HEADER_SZ);
+    int i = 12345;
+    unsigned long long l = 1223344455L;
+    string s = "hallo....";
+    m << i;
+    m << l;
+    m << s;
+
+    char *b;
+    size_t sz;
+    m.take_buf(&b,&sz);
+    VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+
+    unmarshall un(b,sz);
+    request_header rh1;
+    un.unpack_req_header(&rh1);
+    VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
+    int i1;
+    unsigned long long l1;
+    string s1;
+    un >> i1;
+    un >> l1;
+    un >> s1;
+    VERIFY(un.okdone());
+    VERIFY(i1==i && l1==l && s1==s);
 }
 
 void
-client1(int cl)
+client1(size_t cl)
 {
-       // test concurrency.
-       int which_cl = ((unsigned long) cl ) % NUM_CL;
-
-       for(int i = 0; i < 100; i++){
-               int arg = (random() % 2000);
-               std::string rep;
-               int ret = clients[which_cl]->call(25, arg, rep);
-               VERIFY(ret == 0);
-               if ((int)rep.size()!=arg) {
-                       printf("repsize wrong %d!=%d\n", (int)rep.size(), arg);
-               }
-               VERIFY((int)rep.size() == arg);
-       }
-
-       // test rpc replies coming back not in the order of
-       // the original calls -- i.e. does xid reply dispatch work.
-       for(int i = 0; i < 100; i++){
-               int which = (random() % 2);
-               int arg = (random() % 1000);
-               int rep;
-
-               auto start = std::chrono::steady_clock::now();
-
-               int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep);
-               auto end = std::chrono::steady_clock::now();
-               int diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
-               if (ret != 0)
-                       printf("%d ms have elapsed!!!\n", diff);
-               VERIFY(ret == 0);
-               VERIFY(rep == (which ? arg+1 : arg+2));
-       }
+    // test concurrency.
+    size_t which_cl = cl % NUM_CL;
+
+    for(int i = 0; i < 100; i++){
+        int arg = (random() % 2000);
+        string rep;
+        int ret = clients[which_cl]->call(25, rep, arg);
+        VERIFY(ret == 0);
+        if ((int)rep.size()!=arg)
+            cout << "repsize wrong " << rep.size() << "!=" << arg << endl;
+        VERIFY((int)rep.size() == arg);
+    }
+
+    // test rpc replies coming back not in the order of
+    // the original calls -- i.e. does xid reply dispatch work.
+    for(int i = 0; i < 100; i++){
+        int which = (random() % 2);
+        int arg = (random() % 1000);
+        int rep;
+
+        auto start = std::chrono::steady_clock::now();
+
+        int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg);
+        auto end = std::chrono::steady_clock::now();
+        auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
+        if (ret != 0)
+            cout << diff << " ms have elapsed!!!" << endl;
+        VERIFY(ret == 0);
+        VERIFY(rep == (which ? arg+1 : arg+2));
+    }
 }
 
 void
-client2(int cl)
+client2(size_t cl)
 {
-       int which_cl = ((unsigned long) cl ) % NUM_CL;
-
-       time_t t1;
-       time(&t1);
-
-       while(time(0) - t1 < 10){
-               int arg = (random() % 2000);
-               std::string rep;
-               int ret = clients[which_cl]->call(25, arg, rep);
-               if ((int)rep.size()!=arg) {
-                       printf("ask for %d reply got %d ret %d\n",
-                               arg, (int)rep.size(), ret);
-               }
-               VERIFY((int)rep.size() == arg);
-       }
+    size_t which_cl = cl % NUM_CL;
+
+    time_t t1;
+    time(&t1);
+
+    while(time(0) - t1 < 10){
+        int arg = (random() % 2000);
+        string rep;
+        int ret = clients[which_cl]->call(25, rep, arg);
+        if ((int)rep.size()!=arg)
+            cout << "ask for " << arg << " reply got " << rep.size() << " ret " << ret << endl;
+        VERIFY((int)rep.size() == arg);
+    }
 }
 
 void
 client3(void *xx)
 {
-       rpcc *c = (rpcc *) xx;
+    rpcc *c = (rpcc *) xx;
 
-       for(int i = 0; i < 4; i++){
-               int rep;
-               int ret = c->call(24, i, rep, rpcc::to(3000));
-               VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
-       }
+    for(int i = 0; i < 4; i++){
+        int rep = 0;
+        int ret = c->call_timeout(24, rpcc::to(3000), rep, i);
+        VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
+    }
 }
 
 
 void
 simple_tests(rpcc *c)
 {
-       printf("simple_tests\n");
-       // an RPC call to procedure #22.
-       // rpcc::call() looks at the argument types to decide how
-       // to marshall the RPC call packet, and how to unmarshall
-       // the reply packet.
-       std::string rep;
-       int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep);
-       VERIFY(intret == 0); // this is what handle_22 returns
-       VERIFY(rep == "hello goodbye");
-       printf("   -- string concat RPC .. ok\n");
-
-       // small request, big reply (perhaps req via UDP, reply via TCP)
-       intret = c->call(25, 70000, rep, rpcc::to(200000));
-       VERIFY(intret == 0);
-       VERIFY(rep.size() == 70000);
-       printf("   -- small request, big reply .. ok\n");
-
-       // specify a timeout value to an RPC that should succeed (udp)
-       int xx = 0;
-       intret = c->call(23, 77, xx, rpcc::to(3000));
-       VERIFY(intret == 0 && xx == 78);
-       printf("   -- no suprious timeout .. ok\n");
-
-       // specify a timeout value to an RPC that should succeed (tcp)
-       {
-               std::string arg(1000, 'x');
-               std::string rep;
-               c->call(22, arg, (std::string)"x", rep, rpcc::to(3000));
-               VERIFY(rep.size() == 1001);
-               printf("   -- no suprious timeout .. ok\n");
-       }
-
-       // huge RPC
-       std::string big(1000000, 'x');
-       intret = c->call(22, big, (std::string)"z", rep);
-       VERIFY(rep.size() == 1000001);
-       printf("   -- huge 1M rpc request .. ok\n");
-
-       // specify a timeout value to an RPC that should timeout (udp)
-       struct sockaddr_in non_existent;
-       memset(&non_existent, 0, sizeof(non_existent));
-       non_existent.sin_family = AF_INET;
-       non_existent.sin_addr.s_addr = inet_addr("127.0.0.1");
-       non_existent.sin_port = htons(7661);
-       rpcc *c1 = new rpcc(non_existent);
-       time_t t0 = time(0);
-       intret = c1->bind(rpcc::to(3000));
-       time_t t1 = time(0);
-       VERIFY(intret < 0 && (t1 - t0) <= 4);
-       printf("   -- rpc timeout .. ok\n");
-       printf("simple_tests OK\n");
+    cout << "simple_tests" << endl;
+    // an RPC call to procedure #22.
+    // rpcc::call() looks at the argument types to decide how
+    // to marshall the RPC call packet, and how to unmarshall
+    // the reply packet.
+    string rep;
+    int intret = c->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == 0); // this is what handle_22 returns
+    VERIFY(rep == "hello goodbye");
+    cout << "   -- string concat RPC .. ok" << endl;
+
+    // small request, big reply (perhaps req via UDP, reply via TCP)
+    intret = c->call_timeout(25, rpcc::to(200000), rep, 70000);
+    VERIFY(intret == 0);
+    VERIFY(rep.size() == 70000);
+    cout << "   -- small request, big reply .. ok" << endl;
+
+    // specify a timeout value to an RPC that should succeed (udp)
+    int xx = 0;
+    intret = c->call_timeout(23, rpcc::to(3000), xx, 77);
+    VERIFY(intret == 0 && xx == 78);
+    cout << "   -- no spurious timeout .. ok" << endl;
+
+    // specify a timeout value to an RPC that should succeed (tcp)
+    {
+        string arg(1000, 'x');
+        string rep2;
+        c->call_timeout(22, rpcc::to(3000), rep2, arg, (string)"x");
+        VERIFY(rep2.size() == 1001);
+        cout << "   -- no spurious timeout .. ok" << endl;
+    }
+
+    // huge RPC
+    string big(1000000, 'x');
+    intret = c->call(22, rep, big, (string)"z");
+    VERIFY(rep.size() == 1000001);
+    cout << "   -- huge 1M rpc request .. ok" << endl;
+
+    // specify a timeout value to an RPC that should timeout (udp)
+    string non_existent = "127.0.0.1:7661";
+    rpcc *c1 = new rpcc(non_existent);
+    time_t t0 = time(0);
+    intret = c1->bind(rpcc::to(3000));
+    time_t t1 = time(0);
+    VERIFY(intret < 0 && (t1 - t0) <= 4);
+    cout << "   -- rpc timeout .. ok" << endl;
+    cout << "simple_tests OK" << endl;
 }
 
 void 
-concurrent_test(int nt)
+concurrent_test(size_t nt)
 {
-       // create threads that make lots of calls in parallel,
-       // to test thread synchronization for concurrent calls
-       // and dispatches.
-       printf("start concurrent_test (%d threads) ...", nt);
+    // create threads that make lots of calls in parallel,
+    // to test thread synchronization for concurrent calls
+    // and dispatches.
+    cout << "start concurrent_test (" << nt << " threads) ...";
+
+    vector<thread> th(nt);
 
-    std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
-        th[i] = std::thread(client1, i);
-       }
+    for(size_t i = 0; i < nt; i++)
+        th[i] = thread(client1, i);
 
-       for(int i = 0; i < nt; i++){
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
-       printf(" OK\n");
+
+    cout << " OK" << endl;
 }
 
 void 
 lossy_test()
 {
-       printf("start lossy_test ...");
-       VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
-
-       if (server) {
-               delete server;
-               startserver();
-       }
-
-       for (int i = 0; i < NUM_CL; i++) {
-               delete clients[i];
-               clients[i] = new rpcc(dst);
-               VERIFY(clients[i]->bind()==0);
-       }
-
-       int nt = 1;
-    std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
-        th[i] = std::thread(client2, i);
-       }
-       for(int i = 0; i < nt; i++){
+    cout << "start lossy_test ...";
+    VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+
+    if (server) {
+        delete server;
+        startserver();
+    }
+
+    for (int i = 0; i < NUM_CL; i++) {
+        delete clients[i];
+        clients[i] = new rpcc(dst);
+        VERIFY(clients[i]->bind()==0);
+    }
+
+    size_t nt = 1;
+
+    vector<thread> th(nt);
+
+    for(size_t i = 0; i < nt; i++)
+        th[i] = thread(client2, i);
+
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
-       printf(".. OK\n");
-       VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
+
+    cout << ".. OK" << endl;
+    VERIFY(setenv("RPC_LOSSY", "0", 1) == 0);
 }
 
 void 
 failure_test()
 {
-       rpcc *client1;
-       rpcc *client = clients[0];
+    rpcc *client1;
+    rpcc *client = clients[0];
+
+    cout << "failure_test" << endl;
 
-       printf("failure_test\n");
+    delete server;
 
-       delete server;
+    client1 = new rpcc(dst);
+    VERIFY (client1->bind(rpcc::to(3000)) < 0);
+    cout << "   -- create new client and try to bind to failed server .. failed ok" << endl;
 
-       client1 = new rpcc(dst);
-       VERIFY (client1->bind(rpcc::to(3000)) < 0);
-       printf("   -- create new client and try to bind to failed server .. failed ok\n");
+    delete client1;
 
-       delete client1;
+    startserver();
 
-       startserver();
+    string rep;
+    int intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == rpc_const::oldsrv_failure);
+    cout << "   -- call recovered server with old client .. failed ok" << endl;
 
-       std::string rep;
-       int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
-       VERIFY(intret == rpc_const::oldsrv_failure);
-       printf("   -- call recovered server with old client .. failed ok\n");
+    delete client;
 
-       delete client;
+    clients[0] = client = new rpcc(dst);
+    VERIFY (client->bind() >= 0);
+    VERIFY (client->bind() < 0);
 
-       clients[0] = client = new rpcc(dst);
-       VERIFY (client->bind() >= 0);
-       VERIFY (client->bind() < 0);
+    intret = client->call(22, rep, (string)"hello", (string)" goodbye");
+    VERIFY(intret == 0);
+    VERIFY(rep == "hello goodbye");
 
-       intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep);
-       VERIFY(intret == 0);
-       VERIFY(rep == "hello goodbye");
+    cout << "   -- delete existing rpc client, create replacement rpc client .. ok" << endl;
 
-       printf("   -- delete existing rpc client, create replacement rpc client .. ok\n");
 
+    size_t nt = 10;
+    cout << "   -- concurrent test on new rpc client w/ " << nt << " threads ..";
 
-       int nt = 10;
-       printf("   -- concurrent test on new rpc client w/ %d threads ..", nt);
+    vector<thread> th(nt);
 
-    std::vector<std::thread> th(nt);
-       for(int i = 0; i < nt; i++){
-        th[i] = std::thread(client3, client);
-       }
+    for(size_t i = 0; i < nt; i++)
+        th[i] = thread(client3, client);
 
-       for(int i = 0; i < nt; i++){
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
-       printf("ok\n");
 
-       delete server;
-       delete client;
+    cout << "ok" << endl;
 
-       startserver();
-       clients[0] = client = new rpcc(dst);
-       VERIFY (client->bind() >= 0);
-       printf("   -- delete existing rpc client and server, create replacements.. ok\n");
+    delete server;
+    delete client;
 
-       printf("   -- concurrent test on new client and server w/ %d threads ..", nt);
-       for(int i = 0; i < nt; i++){
-        th[i] = std::thread(client3, client);
-       }
+    startserver();
+    clients[0] = client = new rpcc(dst);
+    VERIFY (client->bind() >= 0);
+    cout << "   -- delete existing rpc client and server, create replacements.. ok" << endl;
 
-       for(int i = 0; i < nt; i++){
+    cout << "   -- concurrent test on new client and server w/ " << nt << " threads ..";
+
+    for(size_t i = 0; i < nt; i++)
+        th[i] = thread(client3, client);
+
+    for(size_t i = 0; i < nt; i++)
         th[i].join();
-       }
-       printf("ok\n");
 
-       printf("failure_test OK\n");
+    cout << "ok" << endl;
+
+    cout << "failure_test OK" << endl;
 }
 
 int
 main(int argc, char *argv[])
 {
 
-       setvbuf(stdout, NULL, _IONBF, 0);
-       setvbuf(stderr, NULL, _IONBF, 0);
-       int debug_level = 0;
-
-       bool isclient = false;
-       bool isserver = false;
-
-       srandom(getpid());
-       port = 20000 + (getpid() % 10000);
-
-       char ch = 0;
-       while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
-               switch (ch) {
-                       case 'c':
-                               isclient = true;
-                               break;
-                       case 's':
-                               isserver = true;
-                               break;
-                       case 'd':
-                               debug_level = atoi(optarg);
-                               break;
-                       case 'p':
-                               port = atoi(optarg);
-                               break;
-                       case 'l':
-                               VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
-                       default:
-                               break;
-               }
-       }
-
-       if (!isserver && !isclient)  {
-               isserver = isclient = true;
-       }
-
-       if (debug_level > 0) {
-               JSL_DEBUG_LEVEL = debug_level;
-               jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level);
-       }
-
-       testmarshall();
-
-       if (isserver) {
-               printf("starting server on port %d RPC_HEADER_SZ %d\n", port, (int)RPC_HEADER_SZ);
-               startserver();
-       }
-
-       if (isclient) {
-               // server's address.
-               memset(&dst, 0, sizeof(dst));
-               dst.sin_family = AF_INET;
-               dst.sin_addr.s_addr = inet_addr("127.0.0.1");
-               dst.sin_port = htons(port);
-
-
-               // start the client.  bind it to the server.
-               // starts a thread to listen for replies and hand them to
-               // the correct waiting caller thread. there should probably
-               // be only one rpcc per process. you probably need one
-               // rpcc per server.
-               for (int i = 0; i < NUM_CL; i++) {
-                       clients[i] = new rpcc(dst);
-                       VERIFY (clients[i]->bind() == 0);
-               }
-
-               simple_tests(clients[0]);
-               concurrent_test(10);
-               lossy_test();
-               if (isserver) {
-                       failure_test();
-               }
-
-               printf("rpctest OK\n");
-
-               exit(0);
-       }
-
-       while (1) {
-               sleep(1);
-       }
+    setvbuf(stdout, NULL, _IONBF, 0);
+    setvbuf(stderr, NULL, _IONBF, 0);
+    int debug_level = 0;
+
+    bool isclient = false;
+    bool isserver = false;
+
+    srandom((uint32_t)getpid());
+    port = 20000 + (getpid() % 10000);
+
+    int ch = 0;
+    while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) {
+        switch (ch) {
+            case 'c':
+                isclient = true;
+                break;
+            case 's':
+                isserver = true;
+                break;
+            case 'd':
+                debug_level = atoi(optarg);
+                break;
+            case 'p':
+                port = atoi(optarg);
+                break;
+            case 'l':
+                VERIFY(setenv("RPC_LOSSY", "5", 1) == 0);
+                break;
+            default:
+                break;
+        }
+    }
+
+    if (!isserver && !isclient)  {
+        isserver = isclient = true;
+    }
+
+    if (debug_level > 0) {
+        DEBUG_LEVEL = debug_level;
+        IF_LEVEL(1) LOG_NONMEMBER("DEBUG LEVEL: " << debug_level);
+    }
+
+    testmarshall();
+
+    if (isserver) {
+        cout << "starting server on port " << port << " RPC_HEADER_SZ " << (int)RPC_HEADER_SZ << endl;
+        startserver();
+    }
+
+    if (isclient) {
+        // server's address.
+        dst = "127.0.0.1:" + std::to_string(port);
+
+
+        // start the client.  bind it to the server.
+        // starts a thread to listen for replies and hand them to
+        // the correct waiting caller thread. there should probably
+        // be only one rpcc per process. you probably need one
+        // rpcc per server.
+        for (int i = 0; i < NUM_CL; i++) {
+            clients[i] = new rpcc(dst);
+            VERIFY (clients[i]->bind() == 0);
+        }
+
+        simple_tests(clients[0]);
+        concurrent_test(10);
+        lossy_test();
+        if (isserver) {
+            failure_test();
+        }
+
+        cout << "rpctest OK" << endl;
+
+        exit(0);
+    }
+
+    while (1) {
+        sleep(1);
+    }
 }