+#include "types.h"
#include "rsm_client.h"
-#include <vector>
#include <arpa/inet.h>
-#include <stdio.h>
#include <handle.h>
#include <unistd.h>
-#include "lang/verify.h"
-#include "lock.h"
-#include "tprintf.h"
-rsm_client::rsm_client(std::string dst) {
+rsm_client::rsm_client(string dst) : primary(dst) {
LOG("create rsm_client");
- std::vector<std::string> mems;
-
- sockaddr_in dstsock;
- make_sockaddr(dst.c_str(), &dstsock);
- primary = dst;
-
- {
- lock ml(rsm_client_mutex);
- VERIFY (init_members());
- }
+ lock ml(rsm_client_mutex);
+ VERIFY (init_members(ml));
LOG("rsm_client: done");
}
-// Assumes caller holds rsm_client_mutex
-void rsm_client::primary_failure() {
+void rsm_client::primary_failure(lock &) {
primary = known_mems.back();
known_mems.pop_back();
}
-rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
- int ret = 0;
+rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
lock ml(rsm_client_mutex);
while (1) {
- LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
+ LOG("proc " << hex << proc << " primary " << primary);
handle h(primary);
ml.unlock();
rpcc *cl = h.safebind();
+ auto ret = rsm_client_protocol::OK;
if (cl)
- ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
+ ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
ml.lock();
if (!cl)
goto prim_fail;
- LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
+ LOG("proc " << hex << proc << " primary " << primary << " ret " << dec << ret);
if (ret == rsm_client_protocol::OK)
- break;
+ return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
LOG("rsm is busy " << primary);
- sleep(3);
+ usleep(300000);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
- if (init_members())
+ if (init_members(ml))
continue;
}
prim_fail:
- LOG("primary " << primary << " failed ret " << std::dec << ret);
- primary_failure();
- LOG("rsm_client::invoke: retry new primary " << primary);
+ LOG("primary " << primary << " failed ret " << dec << ret);
+ primary_failure(ml);
+ LOG("retry new primary " << primary);
}
- return ret;
}
-bool rsm_client::init_members() {
- LOG("rsm_client::init_members get members!");
+bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
+ LOG("get members!");
handle h(primary);
int ret = rsm_client_protocol::ERR;
rpcc *cl;
{
- adopt_lock ml(rsm_client_mutex);
- ml.unlock();
+ rsm_client_mutex_lock.unlock();
cl = h.safebind();
if (cl)
- ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
- ml.lock();
+ ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
+ rsm_client_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK)
return false;
if (known_mems.size() < 1) {
- LOG("rsm_client::init_members do not know any members!");
+ LOG("do not know any members!");
VERIFY(0);
}
primary = known_mems.back();
known_mems.pop_back();
- LOG("rsm_client::init_members: primary " << primary);
+ LOG("primary " << primary);
return true;
}