9 #include "lang/verify.h"
11 using namespace std::chrono;
15 using std::ostringstream;
16 using std::istringstream;
18 // The config module maintains views. As a node joins or leaves a
19 // view, the next view will be the same as previous view, except with
20 // the new node added or removed. The first view contains only node
21 // 1. If node 2 joins after the first node (it will download the views
22 // from node 1), it will learn about view 1 with the first node as the
23 // only member. It will then invoke Paxos to create the next view.
24 // It will tell Paxos to ask the nodes in view 1 to agree on the value
25 // {1, 2}. If Paxos returns success, then it moves to view 2 with
26 // {1,2} as the members. When node 3 joins, the config module runs
27 // Paxos with the nodes in view 2 and the proposed value to be
28 // {1,2,3}. And so on. When a node discovers that some node of the
29 // current view is not responding, it kicks off Paxos to propose a new
30 // value (the current view minus the node that isn't responding). The
31 // config module uses Paxos to create a total order of views, and it
32 // is ensured that the majority of the previous view agrees to the
33 // next view. The Paxos log contains all the values (i.e., views)
36 // The RSM module informs config to add nodes. The config module
37 // runs a heartbeater thread that checks in with nodes. If a node
38 // doesn't respond, the config module will invoke Paxos's proposer to
39 // remove the node. Higher layers will learn about this change when a
40 // Paxos acceptor accepts the new proposed value through
43 // To be able to bring other nodes up to date to the latest formed
44 // view, each node will have a complete history of all view numbers
45 // and their values that it knows about. At any time a node can reboot
46 // and when it re-joins, it may be many views behind; by remembering
47 // all views, the other nodes can bring this re-joined node up to
53 config_view_change *_vc)
54 : my_view_id(0), first(_first), me(_me), vc(_vc)
56 paxos_acceptor = new acceptor(this, me == _first, me, me);
57 paxos_proposer = new proposer(this, paxos_acceptor, me);
59 // XXX hack; maybe should have its own port number
60 paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
65 thread(&config::heartbeater, this).detach();
70 config::restore(const string &s)
73 paxos_acceptor->restore(s);
78 config::get_view(unsigned instance, vector<string> &m)
81 get_view(instance, m, ml);
84 // caller should hold cfg_mutex
86 config::get_view(unsigned instance, vector<string> &m, lock &)
88 string value = paxos_acceptor->value(instance);
89 tprintf("get_view(%d): returns %s\n", instance, value.c_str());
94 config::members(const string &value, vector<string> &view) const
96 istringstream ist(value);
104 config::value(const vector<string> &m) const
107 for (unsigned i = 0; i < m.size(); i++) {
115 config::reconstruct(lock &cfg_mutex_lock)
117 VERIFY(cfg_mutex_lock);
118 if (paxos_acceptor->instance() > 0) {
119 my_view_id = paxos_acceptor->instance();
120 get_view(my_view_id, mems, cfg_mutex_lock);
121 tprintf("config::reconstruct: %d %s\n",
122 my_view_id, print_members(mems).c_str());
126 // Called by Paxos's acceptor.
128 config::paxos_commit(unsigned instance, const string &value)
130 vector<string> newmem;
133 members(value, newmem);
134 tprintf("config::paxos_commit: %d: %s\n", instance,
135 print_members(newmem).c_str());
137 for (unsigned i = 0; i < mems.size(); i++) {
138 tprintf("config::paxos_commit: is %s still a member?\n",
140 if (!isamember(mems[i], newmem) && me != mems[i]) {
141 tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
142 mgr.delete_handle(mems[i]);
147 my_view_id = instance;
150 vc->commit_change(instance);
156 config::ismember(const string &m, unsigned vid)
160 get_view(vid, v, ml);
161 return isamember(m, v);
165 config::add(const string &new_m, unsigned vid)
170 if (vid != my_view_id)
172 tprintf("config::add %s\n", new_m.c_str());
177 unsigned nextvid = my_view_id + 1;
181 r = paxos_proposer->run(nextvid, curm, v);
184 tprintf("config::add: proposer returned %s\n",
185 r ? "success" : "failure");
189 // caller should hold cfg_mutex
191 config::remove(const string &m)
193 adopt_lock ml(cfg_mutex);
194 tprintf("config::remove: my_view_id %d remove? %s\n",
195 my_view_id, m.c_str());
197 for (unsigned i = 0; i < mems.size(); i++) {
199 n.push_back(mems[i]);
202 vector<string> cmems = mems;
203 unsigned nextvid = my_view_id + 1;
207 r = paxos_proposer->run(nextvid, cmems, v);
210 tprintf("config::remove: proposer returned %s\n",
211 r ? "success" : "failure");
216 config::heartbeater() [[noreturn]]
222 vector<string> cmems;
226 auto next_timeout = steady_clock::now() + seconds(3);
227 tprintf("heartbeater: go to sleep\n");
228 config_cond.wait_until(ml, next_timeout);
232 get_view(vid, cmems, ml);
233 tprintf("heartbeater: current membership %s\n",
234 print_members(cmems).c_str());
236 if (!isamember(me, cmems)) {
237 tprintf("heartbeater: not member yet; skip hearbeat\n");
241 // who has the smallest ID?
243 for (unsigned i = 0; i < cmems.size(); i++) {
249 // ping the other nodes
250 for (unsigned i = 0; i < cmems.size(); i++) {
251 if (cmems[i] != me) {
252 if ((h = doheartbeat(cmems[i])) != OK) {
260 // ping the node with the smallest ID
261 if ((h = doheartbeat(m)) != OK)
265 if (!stable && vid == my_view_id) {
271 paxos_protocol::status
272 config::heartbeat(int &r, string m, unsigned vid)
275 int ret = paxos_protocol::ERR;
276 r = (int) my_view_id;
277 tprintf("heartbeat from %s(%d) my_view_id %d\n",
278 m.c_str(), vid, my_view_id);
279 if (vid == my_view_id) {
280 ret = paxos_protocol::OK;
281 } else if (paxos_proposer->isrunning()) {
282 VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
283 ret = paxos_protocol::OK;
285 ret = paxos_protocol::ERR;
291 config::doheartbeat(const string &m)
293 adopt_lock ml(cfg_mutex);
294 int ret = rpc_const::timeout_failure;
296 unsigned vid = my_view_id;
297 heartbeat_t res = OK;
299 tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
303 rpcc *cl = h.safebind();
305 ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
309 if (ret != paxos_protocol::OK) {
310 if (ret == rpc_const::atmostonce_failure ||
311 ret == rpc_const::oldsrv_failure) {
312 mgr.delete_handle(m);
314 tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
315 m.c_str(), ret, vid, r);
316 if (ret < 0) res = FAILURE;
320 tprintf("doheartbeat done %d\n", res);