Imported from 6.824 labs
[invirt/third/libt4.git] / rsm_client.cc
1 #include "rsm_client.h"
2 #include <vector>
3 #include <arpa/inet.h>
4 #include <stdio.h>
5 #include <handle.h>
6 #include "lang/verify.h"
7
8
9 rsm_client::rsm_client(std::string dst) {
10     printf("create rsm_client\n");
11     std::vector<std::string> mems;
12
13     pthread_mutex_init(&rsm_client_mutex, NULL);
14     sockaddr_in dstsock;
15     make_sockaddr(dst.c_str(), &dstsock);
16     primary = dst;
17
18     {
19         ScopedLock ml(&rsm_client_mutex);
20         VERIFY (init_members());
21     }
22     printf("rsm_client: done\n");
23 }
24
25 // Assumes caller holds rsm_client_mutex
26 void rsm_client::primary_failure() {
27     primary = known_mems.back();
28     known_mems.pop_back();
29 }
30
31 rsm_protocol::status rsm_client::invoke(int proc, std::string req, std::string &rep) {
32     int ret;
33     ScopedLock ml(&rsm_client_mutex);
34     while (1) {
35         printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
36         handle h(primary);
37
38         VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
39         rpcc *cl = h.safebind();
40         if (cl)
41             ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
42         VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
43
44         if (!cl)
45             goto prim_fail;
46
47         printf("rsm_client::invoke proc %x primary %s ret %d\n", proc,
48                 primary.c_str(), ret);
49         if (ret == rsm_client_protocol::OK)
50             break;
51         if (ret == rsm_client_protocol::BUSY) {
52             printf("rsm is busy %s\n", primary.c_str());
53             sleep(3);
54             continue;
55         }
56         if (ret == rsm_client_protocol::NOTPRIMARY) {
57             printf("primary %s isn't the primary--let's get a complete list of mems\n",
58                     primary.c_str());
59             if (init_members())
60                 continue;
61         }
62 prim_fail:
63         printf("primary %s failed ret %d\n", primary.c_str(), ret);
64         primary_failure();
65         printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str());
66     }
67     return ret;
68 }
69
70 bool rsm_client::init_members() {
71     printf("rsm_client::init_members get members!\n");
72     handle h(primary);
73     VERIFY(pthread_mutex_unlock(&rsm_client_mutex)==0);
74     int ret;
75     rpcc *cl = h.safebind();
76     if (cl) {
77         ret = cl->call(rsm_client_protocol::members, 0, known_mems,
78                 rpcc::to(1000));
79     }
80     VERIFY(pthread_mutex_lock(&rsm_client_mutex)==0);
81     if (cl == 0 || ret != rsm_protocol::OK)
82         return false;
83     if (known_mems.size() < 1) {
84         printf("rsm_client::init_members do not know any members!\n");
85         VERIFY(0);
86     }
87
88     primary = known_mems.back();
89     known_mems.pop_back();
90
91     printf("rsm_client::init_members: primary %s\n", primary.c_str());
92
93     return true;
94 }