-#include "rsm_client.h"
-#include <vector>
+#include "include/rsm_client.h"
#include <arpa/inet.h>
-#include <stdio.h>
-#include <handle.h>
#include <unistd.h>
-#include "lang/verify.h"
+using namespace std::chrono;
-rsm_client::rsm_client(std::string dst) {
- printf("create rsm_client\n");
- std::vector<std::string> mems;
-
- pthread_mutex_init(&rsm_client_mutex, NULL);
- sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
- primary = dst;
-
- {
- ScopedLock ml(&rsm_client_mutex);
- VERIFY (init_members());
- }
- printf("rsm_client: done\n");
+rsm_client::rsm_client(string dst) : primary(dst) {
+ LOG << "create rsm_client";
+ lock ml(rsm_client_mutex);
+ VERIFY (init_members(ml));
+ LOG << "done";
}
-// Assumes caller holds rsm_client_mutex
-void rsm_client::primary_failure() {
- primary = known_mems.back();
- known_mems.pop_back();
-}
-
-rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
- int ret;
- ScopedLock ml(&rsm_client_mutex);
+rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) {
+ lock ml(rsm_client_mutex);
while (1) {
- printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
- handle h(primary);
+ LOG << "proc " << std::hex << proc << " primary " << primary;
+ string prim = primary;
- VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
- rpcc *cl = h.safebind();
+ ml.unlock();
+ auto cl = rpcc::bind_cached(prim);
+ auto ret = rsm_client_protocol::OK;
if (cl)
- ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
- VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
+ ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req);
+ ml.lock();
if (!cl)
goto prim_fail;
- printf("rsm_client::invoke proc %x primary %s ret %d\n", proc,
- primary.c_str(), ret);
+ LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret;
if (ret == rsm_client_protocol::OK)
- break;
+ return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
- printf("rsm is busy %s\n", primary.c_str());
- sleep(3);
+ LOG << "rsm is busy " << prim;
+ std::this_thread::sleep_for(300ms);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
- printf("primary %s isn't the primary--let's get a complete list of mems\n",
- primary.c_str());
- if (init_members())
+ LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems";
+ if (init_members(ml))
continue;
}
prim_fail:
- printf("primary %s failed ret %d\n", primary.c_str(), ret);
- primary_failure();
- printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str());
+ LOG << "primary " << prim << " failed ret " << std::dec << ret;
+ primary = known_mems.back();
+ known_mems.pop_back();
+ LOG << "retry new primary " << prim;
}
- return ret;
}
-bool rsm_client::init_members() {
- printf("rsm_client::init_members get members!\n");
- handle h(primary);
- VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
- int ret;
- rpcc *cl = h.safebind();
- if (cl) {
- ret = cl->call(rsm_client_protocol::members, 0, known_mems,
- rpcc::to(1000));
+bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
+ LOG << "get members!";
+ string prim = primary;
+ int ret = rsm_client_protocol::ERR;
+ shared_ptr<rpcc> cl;
+ {
+ rsm_client_mutex_lock.unlock();
+ if ((cl = rpcc::bind_cached(prim)))
+ ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0);
+ rsm_client_mutex_lock.lock();
}
- VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
- if (cl == 0 || ret != rsm_protocol::OK)
+ if (ret != rsm_protocol::OK)
return false;
- if (known_mems.size() < 1) {
- printf("rsm_client::init_members do not know any members!\n");
+ if (!known_mems.size()) {
+ LOG << "do not know any members!";
VERIFY(0);
}
primary = known_mems.back();
known_mems.pop_back();
- printf("rsm_client::init_members: primary %s\n", primary.c_str());
+ LOG << "primary " << primary;
return true;
}