b68ef0cf5c440bd5f28c03bc67bd365ccad11c9e
[invirt/third/libt4.git] / rsm_client.cc
1 #include "rsm_client.h"
2 #include <vector>
3 #include <arpa/inet.h>
4 #include <stdio.h>
5 #include <handle.h>
6 #include <unistd.h>
7 #include "lang/verify.h"
8 #include "lock.h"
9 #include "tprintf.h"
10
11 rsm_client::rsm_client(std::string dst) {
12     LOG("create rsm_client");
13     std::vector<std::string> mems;
14
15     sockaddr_in dstsock;
16     make_sockaddr(dst.c_str(), &dstsock);
17     primary = dst;
18
19     {
20         lock ml(rsm_client_mutex);
21         VERIFY (init_members());
22     }
23     LOG("rsm_client: done");
24 }
25
26 // Assumes caller holds rsm_client_mutex
27 void rsm_client::primary_failure() {
28     primary = known_mems.back();
29     known_mems.pop_back();
30 }
31
32 rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
33     int ret = 0;
34     lock ml(rsm_client_mutex);
35     while (1) {
36         LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
37         handle h(primary);
38
39         ml.unlock();
40         rpcc *cl = h.safebind();
41         if (cl)
42             ret = cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(5000), rep, proc, req);
43         ml.lock();
44
45         if (!cl)
46             goto prim_fail;
47
48         LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
49         if (ret == rsm_client_protocol::OK)
50             break;
51         if (ret == rsm_client_protocol::BUSY) {
52             LOG("rsm is busy " << primary);
53             sleep(3);
54             continue;
55         }
56         if (ret == rsm_client_protocol::NOTPRIMARY) {
57             LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
58             if (init_members())
59                 continue;
60         }
61 prim_fail:
62         LOG("primary " << primary << " failed ret " << std::dec << ret);
63         primary_failure();
64         LOG("rsm_client::invoke: retry new primary " << primary);
65     }
66     return ret;
67 }
68
69 bool rsm_client::init_members() {
70     LOG("rsm_client::init_members get members!");
71     handle h(primary);
72     int ret = rsm_client_protocol::ERR;
73     rpcc *cl;
74     {
75         adopt_lock ml(rsm_client_mutex);
76         ml.unlock();
77         cl = h.safebind();
78         if (cl)
79             ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
80         ml.lock();
81     }
82     if (cl == 0 || ret != rsm_protocol::OK)
83         return false;
84     if (known_mems.size() < 1) {
85         LOG("rsm_client::init_members do not know any members!");
86         VERIFY(0);
87     }
88
89     primary = known_mems.back();
90     known_mems.pop_back();
91
92     LOG("rsm_client::init_members: primary " << primary);
93
94     return true;
95 }