So many changes. Broken.
[invirt/third/libt4.git] / rsm_client.cc
1 #include "include/rsm_client.h"
2 #include <arpa/inet.h>
3 #include <unistd.h>
4
5 using namespace std::chrono;
6
7 rsm_client::rsm_client(string dst) : primary(dst) {
8     LOG << "create rsm_client";
9     lock ml(rsm_client_mutex);
10     VERIFY (init_members(ml));
11     LOG << "done";
12 }
13
14 rsm_protocol::status rsm_client::invoke(rpc_protocol::proc_id_t proc, string & rep, const string & req) {
15     lock ml(rsm_client_mutex);
16     while (1) {
17         LOG << "proc " << std::hex << proc << " primary " << primary;
18         string prim = primary;
19
20         ml.unlock();
21         auto cl = rpcc::bind_cached(prim);
22         auto ret = rsm_client_protocol::OK;
23         if (cl)
24             ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, 500ms, rep, proc, req);
25         ml.lock();
26
27         if (!cl)
28             goto prim_fail;
29
30         LOG << "proc " << std::hex << proc << " primary " << prim << " ret " << std::dec << ret;
31         if (ret == rsm_client_protocol::OK)
32             return rsm_protocol::OK;
33         if (ret == rsm_client_protocol::BUSY) {
34             LOG << "rsm is busy " << prim;
35             std::this_thread::sleep_for(300ms);
36             continue;
37         }
38         if (ret == rsm_client_protocol::NOTPRIMARY) {
39             LOG << "primary " << prim << " isn't the primary--let's get a complete list of mems";
40             if (init_members(ml))
41                 continue;
42         }
43 prim_fail:
44         LOG << "primary " << prim << " failed ret " << std::dec << ret;
45         primary = known_mems.back();
46         known_mems.pop_back();
47         LOG << "retry new primary " << prim;
48     }
49 }
50
51 bool rsm_client::init_members(lock & rsm_client_mutex_lock) {
52     LOG << "get members!";
53     string prim = primary;
54     int ret = rsm_client_protocol::ERR;
55     shared_ptr<rpcc> cl;
56     {
57         rsm_client_mutex_lock.unlock();
58         if ((cl = rpcc::bind_cached(prim)))
59             ret = cl->call_timeout(rsm_client_protocol::members, 100ms, known_mems, 0);
60         rsm_client_mutex_lock.lock();
61     }
62     if (ret != rsm_protocol::OK)
63         return false;
64     if (!known_mems.size()) {
65         LOG << "do not know any members!";
66         VERIFY(0);
67     }
68
69     primary = known_mems.back();
70     known_mems.pop_back();
71
72     LOG << "primary " << primary;
73
74     return true;
75 }