4931a9f73dfc71369dafaed1d7395cf2e2b1b181
[invirt/third/libt4.git] / config.cc
1 #include "config.h"
2 #include "handle.h"
3
4 using std::vector;
5
6 // The config module maintains views. As a node joins or leaves a
7 // view, the next view will be the same as previous view, except with
8 // the new node added or removed. The first view contains only node
9 // 1. If node 2 joins after the first node (it will download the views
10 // from node 1), it will learn about view 1 with the first node as the
11 // only member.  It will then invoke Paxos to create the next view.
12 // It will tell Paxos to ask the nodes in view 1 to agree on the value
13 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
14 // {1,2} as the members. When node 3 joins, the config module runs
15 // Paxos with the nodes in view 2 and the proposed value to be
16 // {1,2,3}. And so on.  When a node discovers that some node of the
17 // current view is not responding, it kicks off Paxos to propose a new
18 // value (the current view minus the node that isn't responding). The
19 // config module uses Paxos to create a total order of views, and it
20 // is ensured that the majority of the previous view agrees to the
21 // next view.  The Paxos log contains all the values (i.e., views)
22 // agreed on.
23 //
24 // The RSM module informs config to add nodes. The config module
25 // runs a heartbeater thread that checks in with nodes.  If a node
26 // doesn't respond, the config module will invoke Paxos's proposer to
27 // remove the node.  Higher layers will learn about this change when a
28 // Paxos acceptor accepts the new proposed value through
29 // paxos_commit().
30 //
31 // To be able to bring other nodes up to date to the latest formed
32 // view, each node will have a complete history of all view numbers
33 // and their values that it knows about. At any time a node can reboot
34 // and when it re-joins, it may be many views behind; by remembering
35 // all views, the other nodes can bring this re-joined node up to
36 // date.
37
38 config_view_change::~config_view_change() {}
39
40 config::config(const string & _first, const string & _me, config_view_change *_vc)
41     : my_view_id(0), first(_first), me(_me), vc(_vc),
42       paxos(this, me == _first, me, me)
43 {
44     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
45     lock cfg_mutex_lock(cfg_mutex);
46     reconstruct(cfg_mutex_lock);
47     thread(&config::heartbeater, this).detach();
48 }
49
50 void config::restore(const string & s) {
51     lock cfg_mutex_lock(cfg_mutex);
52     paxos.restore(s);
53     reconstruct(cfg_mutex_lock);
54 }
55
56 void config::get_view(unsigned instance, vector<string> & m) {
57     lock cfg_mutex_lock(cfg_mutex);
58     get_view(instance, m, cfg_mutex_lock);
59 }
60
61 void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
62     VERIFY(cfg_mutex_lock);
63     string value = paxos.value(instance);
64     LOG << "get_view(" << instance << "): returns " << value;
65     m = explode(value);
66 }
67
68 void config::reconstruct(lock & cfg_mutex_lock) {
69     VERIFY(cfg_mutex_lock);
70     my_view_id = paxos.instance();
71     if (my_view_id > 0) {
72         get_view(my_view_id, mems, cfg_mutex_lock);
73         LOG << "view " << my_view_id << " " << mems;
74     }
75 }
76
77 // Called by Paxos's acceptor.
78 void config::paxos_commit(unsigned instance, const string & value) {
79     lock cfg_mutex_lock(cfg_mutex);
80
81     vector<string> newmem = explode(value);
82     LOG << "instance " << instance << ": " << newmem;
83
84     for (auto mem : mems) {
85         LOG << "is " << mem << " still a member?";
86         if (!isamember(mem, newmem) && me != mem) {
87             LOG << "delete " << mem;
88             handle(mem).invalidate();
89         }
90     }
91
92     mems = newmem;
93     my_view_id = instance;
94     if (vc) {
95         cfg_mutex_lock.unlock();
96         vc->commit_change(instance);
97         cfg_mutex_lock.lock();
98     }
99 }
100
101 bool config::ismember(const string & m, unsigned vid) {
102     lock cfg_mutex_lock(cfg_mutex);
103     vector<string> v;
104     get_view(vid, v, cfg_mutex_lock);
105     return isamember(m, v);
106 }
107
108 bool config::add(const string & new_m, unsigned vid) {
109     lock cfg_mutex_lock(cfg_mutex);
110     LOG << "adding " << new_m << " to " << vid;
111     if (vid != my_view_id) {
112         LOG << "that's not my view id, " << my_view_id << "!";
113         return false;
114     }
115     LOG << "calling down to paxos layer";
116     vector<string> m(mems), cmems(mems);
117     m.push_back(new_m);
118     LOG << "old mems " << cmems << " " << implode(cmems);
119     LOG << "new mems " << m << " " << implode(m);
120     unsigned nextvid = my_view_id + 1;
121     cfg_mutex_lock.unlock();
122     bool r = paxos.run(nextvid, cmems, implode(m));
123     cfg_mutex_lock.lock();
124     LOG << "paxos proposer returned " << (r ? "success" : "failure");
125     return r;
126 }
127
128 // caller should hold cfg_mutex
129 bool config::remove(const string & m, lock & cfg_mutex_lock) {
130     VERIFY(cfg_mutex_lock);
131     LOG << "my_view_id " << my_view_id << " remove? " << m;
132     vector<string> n;
133     for (auto mem : mems) {
134         if (mem != m)
135             n.push_back(mem);
136     }
137     vector<string> cmems = mems;
138     unsigned nextvid = my_view_id + 1;
139     cfg_mutex_lock.unlock();
140     bool r = paxos.run(nextvid, cmems, implode(n));
141     cfg_mutex_lock.lock();
142     LOG << "proposer returned " << (r ? "success" : "failure");
143     return r;
144 }
145
146 void config::heartbeater() {
147     lock cfg_mutex_lock(cfg_mutex);
148
149     while (1) {
150         auto next_timeout = steady_clock::now() + milliseconds(300);
151         LOG << "go to sleep";
152         config_cond.wait_until(cfg_mutex_lock, next_timeout);
153
154         unsigned vid = my_view_id;
155         vector<string> cmems;
156         get_view(vid, cmems, cfg_mutex_lock);
157         LOG << "current membership " << cmems;
158
159         if (!isamember(me, cmems)) {
160             LOG << "not member yet; skip hearbeat";
161             continue;
162         }
163
164         // who has the smallest ID?
165         string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
166
167         if (m == me) {
168             // ping the other nodes
169             for (string mem : cmems) {
170                 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
171                     continue;
172                 if (vid == my_view_id)
173                     remove(mem, cfg_mutex_lock);
174                 break;
175             }
176         } else {
177             // ping the node with the smallest ID
178             if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
179                 remove(m, cfg_mutex_lock);
180         }
181     }
182 }
183
184 paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
185     lock cfg_mutex_lock(cfg_mutex);
186     r = (int) my_view_id;
187     LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
188     if (vid == my_view_id)
189         return paxos_protocol::OK;
190     else if (paxos.isrunning()) {
191         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
192         return paxos_protocol::OK;
193     }
194     return paxos_protocol::ERR;
195 }
196
197 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
198     VERIFY(cfg_mutex_lock);
199     unsigned vid = my_view_id;
200     LOG << "heartbeat to " << m << " (" << vid << ")";
201     handle h(m);
202
203     cfg_mutex_lock.unlock();
204     int r = 0, ret = rpc_protocol::bind_failure;
205     if (rpcc *cl = h.safebind())
206         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
207     cfg_mutex_lock.lock();
208
209     heartbeat_t res = OK;
210     switch (ret) {
211         case paxos_protocol::OK:
212             break;
213         case rpc_protocol::atmostonce_failure:
214         case rpc_protocol::oldsrv_failure:
215             h.invalidate();
216             break;
217         default:
218             LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
219             res = (ret < 0) ? FAILURE : VIEWERR;
220     }
221     LOG << "done " << res;
222     return res;
223 }