So many changes. Broken.
[invirt/third/libt4.git] / rsm_client.cc
index 69fec20..b24f056 100644 (file)
@@ -1,95 +1,75 @@
-#include "rsm_client.h"
-#include <vector>
+#include "include/rsm_client.h"
 #include <arpa/inet.h>
-#include <stdio.h>
-#include <handle.h>
 #include <unistd.h>
-#include "lang/verify.h"
 
+using namespace std::chrono;
 
-rsm_client::rsm_client(std::string dst) {
-    printf("create rsm_client\n");
-    std::vector<std::string> mems;
-
-    pthread_mutex_init(&rsm_client_mutex, NULL);
-    sockaddr_in dstsock;
-    make_sockaddr(dst.c_str(), &dstsock);
-    primary = dst;
-
-    {
-        ScopedLock ml(&rsm_client_mutex);
-        VERIFY (init_members());
-    }
-    printf("rsm_client: done\n");
+rsm_client::rsm_client(string dst) : primary(dst) {
+    LOG << "create rsm_client";
+    lock ml(rsm_client_mutex);
+    VERIFY (init_members(ml));
+    LOG << "done";
 }
 
-// Assumes caller holds rsm_client_mutex
-void rsm_client::primary_failure() {
-    primary = known_mems.back();
-    known_mems.pop_back();
-}
-
-rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
-    int ret;
-    ScopedLock ml(&rsm_client_mutex);
+rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) {
+    lock ml(rsm_client_mutex);
     while (1) {
-        printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
-        handle h(primary);
+        LOG << "proc " << std::hex << proc << " primary " << primary;
+        string prim = primary;
 
-        VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
-        rpcc *cl = h.safebind();
+        ml.unlock();
+        auto cl = rpcc::bind_cached(prim);
+        auto ret = rsm_client_protocol::OK;
         if (cl)
-            ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
-        VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+            ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req);
+        ml.lock();
 
         if (!cl)
             goto prim_fail;
 
-        printf("rsm_client::invoke proc %x primary %s ret %d\n", proc,
-                primary.c_str(), ret);
+        LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret;
         if (ret == rsm_client_protocol::OK)
-            break;
+            return rsm_protocol::OK;
         if (ret == rsm_client_protocol::BUSY) {
-            printf("rsm is busy %s\n", primary.c_str());
-            sleep(3);
+            LOG << "rsm is busy " << prim;
+            std::this_thread::sleep_for(300ms);
             continue;
         }
         if (ret == rsm_client_protocol::NOTPRIMARY) {
-            printf("primary %s isn't the primary--let's get a complete list of mems\n",
-                    primary.c_str());
-            if (init_members())
+            LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems";
+            if (init_members(ml))
                 continue;
         }
 prim_fail:
-        printf("primary %s failed ret %d\n", primary.c_str(), ret);
-        primary_failure();
-        printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str());
+        LOG << "primary " << prim << " failed ret " << std::dec << ret;
+        primary = known_mems.back();
+        known_mems.pop_back();
+        LOG << "retry new primary " << prim;
     }
-    return ret;
 }
 
-bool rsm_client::init_members() {
-    printf("rsm_client::init_members get members!\n");
-    handle h(primary);
-    VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
-    int ret;
-    rpcc *cl = h.safebind();
-    if (cl) {
-        ret = cl->call(rsm_client_protocol::members, 0, known_mems,
-                rpcc::to(1000));
+bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
+    LOG << "get members!";
+    string prim = primary;
+    int ret = rsm_client_protocol::ERR;
+    shared_ptr<rpcc> cl;
+    {
+        rsm_client_mutex_lock.unlock();
+        if ((cl = rpcc::bind_cached(prim)))
+            ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0);
+        rsm_client_mutex_lock.lock();
     }
-    VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
-    if (cl == 0 || ret != rsm_protocol::OK)
+    if (ret != rsm_protocol::OK)
         return false;
-    if (known_mems.size() < 1) {
-        printf("rsm_client::init_members do not know any members!\n");
+    if (!known_mems.size()) {
+        LOG << "do not know any members!";
         VERIFY(0);
     }
 
     primary = known_mems.back();
     known_mems.pop_back();
 
-    printf("rsm_client::init_members: primary %s\n", primary.c_str());
+    LOG << "primary " << primary;
 
     return true;
 }