More logging clean-ups. Static type-checking for RPC calls and
[invirt/third/libt4.git] / config.cc
1 #include "config.h"
2 #include "handle.h"
3
4 // The config module maintains views. As a node joins or leaves a
5 // view, the next view will be the same as previous view, except with
6 // the new node added or removed. The first view contains only node
7 // 1. If node 2 joins after the first node (it will download the views
8 // from node 1), it will learn about view 1 with the first node as the
9 // only member.  It will then invoke Paxos to create the next view.
10 // It will tell Paxos to ask the nodes in view 1 to agree on the value
11 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
12 // {1,2} as the members. When node 3 joins, the config module runs
13 // Paxos with the nodes in view 2 and the proposed value to be
14 // {1,2,3}. And so on.  When a node discovers that some node of the
15 // current view is not responding, it kicks off Paxos to propose a new
16 // value (the current view minus the node that isn't responding). The
17 // config module uses Paxos to create a total order of views, and it
18 // is ensured that the majority of the previous view agrees to the
19 // next view.  The Paxos log contains all the values (i.e., views)
20 // agreed on.
21 //
22 // The RSM module informs config to add nodes. The config module
23 // runs a heartbeater thread that checks in with nodes.  If a node
24 // doesn't respond, the config module will invoke Paxos's proposer to
25 // remove the node.  Higher layers will learn about this change when a
26 // Paxos acceptor accepts the new proposed value through
27 // paxos_commit().
28 //
29 // To be able to bring other nodes up to date to the latest formed
30 // view, each node will have a complete history of all view numbers
31 // and their values that it knows about. At any time a node can reboot
32 // and when it re-joins, it may be many views behind; by remembering
33 // all views, the other nodes can bring this re-joined node up to
34 // date.
35
36 config::config(const string &_first, const string &_me, config_view_change *_vc)
37     : my_view_id(0), first(_first), me(_me), vc(_vc),
38       paxos(this, me == _first, me, me)
39 {
40     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
41     lock cfg_mutex_lock(cfg_mutex);
42     reconstruct(cfg_mutex_lock);
43     thread(&config::heartbeater, this).detach();
44 }
45
46 void config::restore(const string &s) {
47     lock cfg_mutex_lock(cfg_mutex);
48     paxos.restore(s);
49     reconstruct(cfg_mutex_lock);
50 }
51
52 void config::get_view(unsigned instance, vector<string> &m) {
53     lock cfg_mutex_lock(cfg_mutex);
54     get_view(instance, m, cfg_mutex_lock);
55 }
56
57 void config::get_view(unsigned instance, vector<string> &m, lock &) {
58     string value = paxos.value(instance);
59     LOG("get_view(" << instance << "): returns " << value);
60     m = members(value);
61 }
62
63 vector<string> config::members(const string &value) const {
64     return explode(value);
65 }
66
67 string config::value(const vector<string> &members) const {
68     return implode(members);
69 }
70
71 void config::reconstruct(lock &cfg_mutex_lock) {
72     VERIFY(cfg_mutex_lock);
73     my_view_id = paxos.instance();
74     if (my_view_id > 0) {
75         get_view(my_view_id, mems, cfg_mutex_lock);
76         LOG("view " << my_view_id << " " << mems);
77     }
78 }
79
80 // Called by Paxos's acceptor.
81 void config::paxos_commit(unsigned instance, const string &value) {
82     lock cfg_mutex_lock(cfg_mutex);
83
84     vector<string> newmem = members(value);
85     LOG("instance " << instance << ": " << newmem);
86
87     for (auto mem : mems) {
88         LOG("is " << mem << " still a member?");
89         if (!isamember(mem, newmem) && me != mem) {
90             LOG("delete " << mem);
91             invalidate_handle(mem);
92             //handle(mem).invalidate();
93         }
94     }
95
96     mems = newmem;
97     my_view_id = instance;
98     if (vc) {
99         cfg_mutex_lock.unlock();
100         vc->commit_change(instance);
101         cfg_mutex_lock.lock();
102     }
103 }
104
105 bool config::ismember(const string &m, unsigned vid) {
106     lock cfg_mutex_lock(cfg_mutex);
107     vector<string> v;
108     get_view(vid, v, cfg_mutex_lock);
109     return isamember(m, v);
110 }
111
112 bool config::add(const string &new_m, unsigned vid) {
113     lock cfg_mutex_lock(cfg_mutex);
114     LOG("adding " << new_m << " to " << vid);
115     if (vid != my_view_id) {
116         LOG("that's not my view id, " << my_view_id << "!");
117         return false;
118     }
119     LOG("calling down to paxos layer");
120     vector<string> m(mems), cmems(mems);
121     m.push_back(new_m);
122     LOG("old mems " << cmems << " " << value(cmems));
123     LOG("new mems " << m << " " << value(m));
124     unsigned nextvid = my_view_id + 1;
125     cfg_mutex_lock.unlock();
126     bool r = paxos.run(nextvid, cmems, value(m));
127     cfg_mutex_lock.lock();
128     LOG("paxos proposer returned " << (r ? "success" : "failure"));
129     return r;
130 }
131
132 // caller should hold cfg_mutex
133 bool config::remove(const string &m, lock &cfg_mutex_lock) {
134     LOG("my_view_id " << my_view_id << " remove? " << m);
135     vector<string> n;
136     for (auto mem : mems) {
137         if (mem != m)
138             n.push_back(mem);
139     }
140     vector<string> cmems = mems;
141     unsigned nextvid = my_view_id + 1;
142     cfg_mutex_lock.unlock();
143     bool r = paxos.run(nextvid, cmems, value(n));
144     cfg_mutex_lock.lock();
145     LOG("proposer returned " << (r ? "success" : "failure"));
146     return r;
147 }
148
149 void config::heartbeater() [[noreturn]] {
150     lock cfg_mutex_lock(cfg_mutex);
151
152     while (1) {
153         auto next_timeout = steady_clock::now() + milliseconds(300);
154         LOG("go to sleep");
155         config_cond.wait_until(cfg_mutex_lock, next_timeout);
156
157         unsigned vid = my_view_id;
158         vector<string> cmems;
159         get_view(vid, cmems, cfg_mutex_lock);
160         LOG("current membership " << cmems);
161
162         if (!isamember(me, cmems)) {
163             LOG("not member yet; skip hearbeat");
164             continue;
165         }
166
167         // who has the smallest ID?
168         string m = min(me, *min_element(cmems.begin(), cmems.end()));
169
170         if (m == me) {
171             // ping the other nodes
172             for (string mem : cmems) {
173                 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
174                     continue;
175                 if (vid == my_view_id)
176                     remove(mem, cfg_mutex_lock);
177                 break;
178             }
179         } else {
180             // ping the node with the smallest ID
181             if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
182                 remove(m, cfg_mutex_lock);
183         }
184     }
185 }
186
187 paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
188     lock cfg_mutex_lock(cfg_mutex);
189     r = (int) my_view_id;
190     LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
191     if (vid == my_view_id)
192         return paxos_protocol::OK;
193     else if (paxos.isrunning()) {
194         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
195         return paxos_protocol::OK;
196     }
197     return paxos_protocol::ERR;
198 }
199
200 config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
201     unsigned vid = my_view_id;
202     LOG("heartbeat to " << m << " (" << vid << ")");
203     handle h(m);
204
205     cfg_mutex_lock.unlock();
206     int r = 0, ret = rpc_protocol::bind_failure;
207     if (rpcc *cl = h.safebind())
208         ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
209     cfg_mutex_lock.lock();
210
211     heartbeat_t res = OK;
212     switch (ret) {
213         case paxos_protocol::OK:
214             break;
215         case rpc_protocol::atmostonce_failure:
216         case rpc_protocol::oldsrv_failure:
217             invalidate_handle(m);
218             //h.invalidate();
219             break;
220         default:
221             LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
222             res = (ret < 0) ? FAILURE : VIEWERR;
223     }
224     LOG("done " << res);
225     return res;
226 }