Cleanups
[invirt/third/libt4.git] / config.cc
1 #include <thread>
2 #include <sstream>
3 #include <iostream>
4 #include <stdio.h>
5 #include "config.h"
6 #include "paxos.h"
7 #include "handle.h"
8 #include "tprintf.h"
9 #include "lang/verify.h"
10
11 using namespace std::chrono;
12 using std::string;
13 using std::vector;
14 using std::thread;
15 using std::ostringstream;
16 using std::istringstream;
17
18 // The config module maintains views. As a node joins or leaves a
19 // view, the next view will be the same as previous view, except with
20 // the new node added or removed. The first view contains only node
21 // 1. If node 2 joins after the first node (it will download the views
22 // from node 1), it will learn about view 1 with the first node as the
23 // only member.  It will then invoke Paxos to create the next view.
24 // It will tell Paxos to ask the nodes in view 1 to agree on the value
25 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
26 // {1,2} as the members. When node 3 joins, the config module runs
27 // Paxos with the nodes in view 2 and the proposed value to be
28 // {1,2,3}. And so on.  When a node discovers that some node of the
29 // current view is not responding, it kicks off Paxos to propose a new
30 // value (the current view minus the node that isn't responding). The
31 // config module uses Paxos to create a total order of views, and it
32 // is ensured that the majority of the previous view agrees to the
33 // next view.  The Paxos log contains all the values (i.e., views)
34 // agreed on.
35 //
36 // The RSM module informs config to add nodes. The config module
37 // runs a heartbeater thread that checks in with nodes.  If a node
38 // doesn't respond, the config module will invoke Paxos's proposer to
39 // remove the node.  Higher layers will learn about this change when a
40 // Paxos acceptor accepts the new proposed value through
41 // paxos_commit().
42 //
43 // To be able to bring other nodes up to date to the latest formed
44 // view, each node will have a complete history of all view numbers
45 // and their values that it knows about. At any time a node can reboot
46 // and when it re-joins, it may be many views behind; by remembering
47 // all views, the other nodes can bring this re-joined node up to
48 // date.
49
50 config::config(
51         const string &_first,
52         const string &_me,
53         config_view_change *_vc)
54     : my_view_id(0), first(_first), me(_me), vc(_vc)
55 {
56     paxos_acceptor = new acceptor(this, me == _first, me, me);
57     paxos_proposer = new proposer(this, paxos_acceptor, me);
58
59     // XXX hack; maybe should have its own port number
60     paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
61
62     {
63         lock ml(cfg_mutex);
64         reconstruct(ml);
65         thread(&config::heartbeater, this).detach();
66     }
67 }
68
69 void
70 config::restore(const string &s)
71 {
72     lock ml(cfg_mutex);
73     paxos_acceptor->restore(s);
74     reconstruct(ml);
75 }
76
77 void
78 config::get_view(unsigned instance, vector<string> &m)
79 {
80     lock ml(cfg_mutex);
81     get_view(instance, m, ml);
82 }
83
84 // caller should hold cfg_mutex
85 void
86 config::get_view(unsigned instance, vector<string> &m, lock &)
87 {
88     string value = paxos_acceptor->value(instance);
89     tprintf("get_view(%d): returns %s\n", instance, value.c_str());
90     members(value, m);
91 }
92
93 void
94 config::members(const string &value, vector<string> &view) const
95 {
96     istringstream ist(value);
97     string m;
98     view.clear();
99     while (ist >> m)
100         view.push_back(m);
101 }
102
103 string
104 config::value(const vector<string> &m) const
105 {
106     ostringstream ost;
107     for (unsigned i = 0; i < m.size(); i++)  {
108         ost << m[i];
109         ost << " ";
110     }
111     return ost.str();
112 }
113
114 void
115 config::reconstruct(lock &cfg_mutex_lock)
116 {
117     VERIFY(cfg_mutex_lock);
118     if (paxos_acceptor->instance() > 0) {
119         my_view_id = paxos_acceptor->instance();
120         get_view(my_view_id, mems, cfg_mutex_lock);
121         tprintf("config::reconstruct: %d %s\n",
122                 my_view_id, print_members(mems).c_str());
123     }
124 }
125
126 // Called by Paxos's acceptor.
127 void
128 config::paxos_commit(unsigned instance, const string &value)
129 {
130     vector<string> newmem;
131     lock ml(cfg_mutex);
132
133     members(value, newmem);
134     tprintf("config::paxos_commit: %d: %s\n", instance,
135                  print_members(newmem).c_str());
136
137     for (unsigned i = 0; i < mems.size(); i++) {
138         tprintf("config::paxos_commit: is %s still a member?\n",
139                 mems[i].c_str());
140         if (!isamember(mems[i], newmem) && me != mems[i]) {
141             tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
142             mgr.delete_handle(mems[i]);
143         }
144     }
145
146     mems = newmem;
147     my_view_id = instance;
148     if (vc) {
149         ml.unlock();
150         vc->commit_change(instance);
151         ml.lock();
152     }
153 }
154
155 bool
156 config::ismember(const string &m, unsigned vid)
157 {
158     lock ml(cfg_mutex);
159     vector<string> v;
160     get_view(vid, v, ml);
161     return isamember(m, v);
162 }
163
164 bool
165 config::add(const string &new_m, unsigned vid)
166 {
167     vector<string> m;
168     vector<string> curm;
169     lock ml(cfg_mutex);
170     if (vid != my_view_id)
171         return false;
172     tprintf("config::add %s\n", new_m.c_str());
173     m = mems;
174     m.push_back(new_m);
175     curm = mems;
176     string v = value(m);
177     unsigned nextvid = my_view_id + 1;
178     bool r;
179     {
180         ml.unlock();
181         r = paxos_proposer->run(nextvid, curm, v);
182         ml.lock();
183     }
184     tprintf("config::add: proposer returned %s\n",
185             r ? "success" : "failure");
186     return r;
187 }
188
189 // caller should hold cfg_mutex
190 bool
191 config::remove(const string &m)
192 {
193     adopt_lock ml(cfg_mutex);
194     tprintf("config::remove: my_view_id %d remove? %s\n",
195             my_view_id, m.c_str());
196     vector<string> n;
197     for (unsigned i = 0; i < mems.size(); i++) {
198         if (mems[i] != m)
199             n.push_back(mems[i]);
200     }
201     string v = value(n);
202     vector<string> cmems = mems;
203     unsigned nextvid = my_view_id + 1;
204     bool r;
205     {
206         ml.unlock();
207         r = paxos_proposer->run(nextvid, cmems, v);
208         ml.lock();
209     }
210     tprintf("config::remove: proposer returned %s\n",
211             r ? "success" : "failure");
212     return r;
213 }
214
215 void
216 config::heartbeater() [[noreturn]]
217 {
218     string m;
219     heartbeat_t h;
220     bool stable;
221     unsigned vid;
222     vector<string> cmems;
223     lock ml(cfg_mutex);
224
225     while (1) {
226         auto next_timeout = steady_clock::now() + seconds(3);
227         tprintf("heartbeater: go to sleep\n");
228         config_cond.wait_until(ml, next_timeout);
229
230         stable = true;
231         vid = my_view_id;
232         get_view(vid, cmems, ml);
233         tprintf("heartbeater: current membership %s\n",
234                 print_members(cmems).c_str());
235
236         if (!isamember(me, cmems)) {
237             tprintf("heartbeater: not member yet; skip hearbeat\n");
238             continue;
239         }
240
241         // who has the smallest ID?
242         m = me;
243         for (unsigned i = 0; i < cmems.size(); i++) {
244             if (m > cmems[i])
245                 m = cmems[i];
246         }
247
248         if (m == me) {
249             // ping the other nodes
250             for (unsigned i = 0; i < cmems.size(); i++) {
251                 if (cmems[i] != me) {
252                     if ((h = doheartbeat(cmems[i])) != OK) {
253                         stable = false;
254                         m = cmems[i];
255                         break;
256                     }
257                 }
258             }
259         } else {
260             // ping the node with the smallest ID
261             if ((h = doheartbeat(m)) != OK)
262                 stable = false;
263         }
264
265         if (!stable && vid == my_view_id) {
266             remove(m);
267         }
268     }
269 }
270
271 paxos_protocol::status
272 config::heartbeat(int &r, string m, unsigned vid)
273 {
274     lock ml(cfg_mutex);
275     int ret = paxos_protocol::ERR;
276     r = (int) my_view_id;
277     tprintf("heartbeat from %s(%d) my_view_id %d\n",
278             m.c_str(), vid, my_view_id);
279     if (vid == my_view_id) {
280         ret = paxos_protocol::OK;
281     } else if (paxos_proposer->isrunning()) {
282         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
283         ret = paxos_protocol::OK;
284     } else {
285         ret = paxos_protocol::ERR;
286     }
287     return ret;
288 }
289
290 config::heartbeat_t
291 config::doheartbeat(const string &m)
292 {
293     adopt_lock ml(cfg_mutex);
294     int ret = rpc_const::timeout_failure;
295     int r = 0;
296     unsigned vid = my_view_id;
297     heartbeat_t res = OK;
298
299     tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
300     handle h(m);
301     {
302         ml.unlock();
303         rpcc *cl = h.safebind();
304         if (cl) {
305             ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
306         }
307         ml.lock();
308     }
309     if (ret != paxos_protocol::OK) {
310         if (ret == rpc_const::atmostonce_failure ||
311             ret == rpc_const::oldsrv_failure) {
312             mgr.delete_handle(m);
313         } else {
314             tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
315                          m.c_str(), ret, vid, r);
316             if (ret < 0) res = FAILURE;
317             else res = VIEWERR;
318         }
319     }
320     tprintf("doheartbeat done %d\n", res);
321     return res;
322 }
323