Variadic templates for RPCs
[invirt/third/libt4.git] / config.cc
1 #include <thread>
2 #include <sstream>
3 #include <iostream>
4 #include <stdio.h>
5 #include "config.h"
6 #include "paxos.h"
7 #include "handle.h"
8 #include "tprintf.h"
9 #include "lang/verify.h"
10
11 // The config module maintains views. As a node joins or leaves a
12 // view, the next view will be the same as previous view, except with
13 // the new node added or removed. The first view contains only node
14 // 1. If node 2 joins after the first node (it will download the views
15 // from node 1), it will learn about view 1 with the first node as the
16 // only member.  It will then invoke Paxos to create the next view.
17 // It will tell Paxos to ask the nodes in view 1 to agree on the value
18 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
19 // {1,2} as the members. When node 3 joins, the config module runs
20 // Paxos with the nodes in view 2 and the proposed value to be
21 // {1,2,3}. And so on.  When a node discovers that some node of the
22 // current view is not responding, it kicks off Paxos to propose a new
23 // value (the current view minus the node that isn't responding). The
24 // config module uses Paxos to create a total order of views, and it
25 // is ensured that the majority of the previous view agrees to the
26 // next view.  The Paxos log contains all the values (i.e., views)
27 // agreed on.
28 //
29 // The RSM module informs config to add nodes. The config module
30 // runs a heartbeater thread that checks in with nodes.  If a node
31 // doesn't respond, the config module will invoke Paxos's proposer to
32 // remove the node.  Higher layers will learn about this change when a
33 // Paxos acceptor accepts the new proposed value through
34 // paxos_commit().
35 //
36 // To be able to bring other nodes up to date to the latest formed
37 // view, each node will have a complete history of all view numbers
38 // and their values that it knows about. At any time a node can reboot
39 // and when it re-joins, it may be many views behind; by remembering
40 // all views, the other nodes can bring this re-joined node up to
41 // date.
42
43 config::config(
44         const std::string &_first,
45         const std::string &_me,
46         config_view_change *_vc)
47     : my_view_id(0), first(_first), me(_me), vc(_vc)
48 {
49     paxos_acceptor = new acceptor(this, me == _first, me, me);
50     paxos_proposer = new proposer(this, paxos_acceptor, me);
51
52     // XXX hack; maybe should have its own port number
53     paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
54
55     {
56         lock ml(cfg_mutex);
57         reconstruct();
58         std::thread(&config::heartbeater, this).detach();
59     }
60 }
61
62 void
63 config::restore(const std::string &s)
64 {
65     lock ml(cfg_mutex);
66     paxos_acceptor->restore(s);
67     reconstruct();
68 }
69
70 void
71 config::get_view(unsigned instance, std::vector<std::string> &m)
72 {
73     lock ml(cfg_mutex);
74     get_view_wo(instance, m);
75 }
76
77 // caller should hold cfg_mutex
78 void
79 config::get_view_wo(unsigned instance, std::vector<std::string> &m)
80 {
81     std::string value = paxos_acceptor->value(instance);
82     tprintf("get_view(%d): returns %s\n", instance, value.c_str());
83     members(value, m);
84 }
85
86 void
87 config::members(const std::string &value, std::vector<std::string> &view) const
88 {
89     std::istringstream ist(value);
90     std::string m;
91     view.clear();
92     while (ist >> m) {
93         view.push_back(m);
94     }
95 }
96
97 std::string
98 config::value(const std::vector<std::string> &m) const
99 {
100     std::ostringstream ost;
101     for (unsigned i = 0; i < m.size(); i++)  {
102         ost << m[i];
103         ost << " ";
104     }
105     return ost.str();
106 }
107
108 // caller should hold cfg_mutex
109 void
110 config::reconstruct()
111 {
112     if (paxos_acceptor->instance() > 0) {
113         std::string m;
114         my_view_id = paxos_acceptor->instance();
115         get_view_wo(my_view_id, mems);
116         tprintf("config::reconstruct: %d %s\n",
117                 my_view_id, print_members(mems).c_str());
118     }
119 }
120
121 // Called by Paxos's acceptor.
122 void
123 config::paxos_commit(unsigned instance, const std::string &value)
124 {
125     std::string m;
126     std::vector<std::string> newmem;
127     lock ml(cfg_mutex);
128
129     members(value, newmem);
130     tprintf("config::paxos_commit: %d: %s\n", instance,
131                  print_members(newmem).c_str());
132
133     for (unsigned i = 0; i < mems.size(); i++) {
134         tprintf("config::paxos_commit: is %s still a member?\n",
135                 mems[i].c_str());
136         if (!isamember(mems[i], newmem) && me != mems[i]) {
137             tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
138             mgr.delete_handle(mems[i]);
139         }
140     }
141
142     mems = newmem;
143     my_view_id = instance;
144     if (vc) {
145         ml.unlock();
146         vc->commit_change(instance);
147         ml.lock();
148     }
149 }
150
151 bool
152 config::ismember(const std::string &m, unsigned vid)
153 {
154     lock ml(cfg_mutex);
155     std::vector<std::string> v;
156     get_view_wo(vid, v);
157     return isamember(m, v);
158 }
159
160 bool
161 config::add(const std::string &new_m, unsigned vid)
162 {
163     std::vector<std::string> m;
164     std::vector<std::string> curm;
165     lock ml(cfg_mutex);
166     if (vid != my_view_id)
167         return false;
168     tprintf("config::add %s\n", new_m.c_str());
169     m = mems;
170     m.push_back(new_m);
171     curm = mems;
172     std::string v = value(m);
173     int nextvid = my_view_id + 1;
174     bool r;
175     {
176         ml.unlock();
177         r = paxos_proposer->run(nextvid, curm, v);
178         ml.lock();
179     }
180     tprintf("config::add: proposer returned %s\n",
181             r ? "success" : "failure");
182     return r;
183 }
184
185 // caller should hold cfg_mutex
186 bool
187 config::remove(const std::string &m)
188 {
189     adopt_lock ml(cfg_mutex);
190     tprintf("config::remove: my_view_id %d remove? %s\n",
191             my_view_id, m.c_str());
192     std::vector<std::string> n;
193     for (unsigned i = 0; i < mems.size(); i++) {
194         if (mems[i] != m)
195             n.push_back(mems[i]);
196     }
197     std::string v = value(n);
198     std::vector<std::string> cmems = mems;
199     int nextvid = my_view_id + 1;
200     bool r;
201     {
202         ml.unlock();
203         r = paxos_proposer->run(nextvid, cmems, v);
204         ml.lock();
205     }
206     tprintf("config::remove: proposer returned %s\n",
207             r ? "success" : "failure");
208     return r;
209 }
210
211 void
212 config::heartbeater()
213 {
214     std::string m;
215     heartbeat_t h;
216     bool stable;
217     unsigned vid;
218     std::vector<std::string> cmems;
219     lock ml(cfg_mutex);
220
221     while (1) {
222         auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
223         tprintf("heartbeater: go to sleep\n");
224         config_cond.wait_until(ml, next_timeout);
225
226         stable = true;
227         vid = my_view_id;
228         get_view_wo(vid, cmems);
229         tprintf("heartbeater: current membership %s\n",
230                 print_members(cmems).c_str());
231
232         if (!isamember(me, cmems)) {
233             tprintf("heartbeater: not member yet; skip hearbeat\n");
234             continue;
235         }
236
237         // who has the smallest ID?
238         m = me;
239         for (unsigned i = 0; i < cmems.size(); i++) {
240             if (m > cmems[i])
241                 m = cmems[i];
242         }
243
244         if (m == me) {
245             // ping the other nodes
246             for (unsigned i = 0; i < cmems.size(); i++) {
247                 if (cmems[i] != me) {
248                     if ((h = doheartbeat(cmems[i])) != OK) {
249                         stable = false;
250                         m = cmems[i];
251                         break;
252                     }
253                 }
254             }
255         } else {
256             // ping the node with the smallest ID
257             if ((h = doheartbeat(m)) != OK)
258                 stable = false;
259         }
260
261         if (!stable && vid == my_view_id) {
262             remove(m);
263         }
264     }
265 }
266
267 paxos_protocol::status
268 config::heartbeat(std::string m, unsigned vid, int &r)
269 {
270     lock ml(cfg_mutex);
271     int ret = paxos_protocol::ERR;
272     r = (int) my_view_id;
273     tprintf("heartbeat from %s(%d) my_view_id %d\n",
274             m.c_str(), vid, my_view_id);
275     if (vid == my_view_id) {
276         ret = paxos_protocol::OK;
277     } else if (paxos_proposer->isrunning()) {
278         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
279         ret = paxos_protocol::OK;
280     } else {
281         ret = paxos_protocol::ERR;
282     }
283     return ret;
284 }
285
286 config::heartbeat_t
287 config::doheartbeat(const std::string &m)
288 {
289     adopt_lock ml(cfg_mutex);
290     int ret = rpc_const::timeout_failure;
291     int r;
292     unsigned vid = my_view_id;
293     heartbeat_t res = OK;
294
295     tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
296     handle h(m);
297     {
298         ml.unlock();
299         rpcc *cl = h.safebind();
300         if (cl) {
301             ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
302         }
303         ml.lock();
304     }
305     if (ret != paxos_protocol::OK) {
306         if (ret == rpc_const::atmostonce_failure ||
307             ret == rpc_const::oldsrv_failure) {
308             mgr.delete_handle(m);
309         } else {
310             tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
311                          m.c_str(), ret, vid, r);
312             if (ret < 0) res = FAILURE;
313             else res = VIEWERR;
314         }
315     }
316     tprintf("doheartbeat done %d\n", res);
317     return res;
318 }
319