9 #include "lang/verify.h"
11 // The config module maintains views. As a node joins or leaves a
12 // view, the next view will be the same as previous view, except with
13 // the new node added or removed. The first view contains only node
14 // 1. If node 2 joins after the first node (it will download the views
15 // from node 1), it will learn about view 1 with the first node as the
16 // only member. It will then invoke Paxos to create the next view.
17 // It will tell Paxos to ask the nodes in view 1 to agree on the value
18 // {1, 2}. If Paxos returns success, then it moves to view 2 with
19 // {1,2} as the members. When node 3 joins, the config module runs
20 // Paxos with the nodes in view 2 and the proposed value to be
21 // {1,2,3}. And so on. When a node discovers that some node of the
22 // current view is not responding, it kicks off Paxos to propose a new
23 // value (the current view minus the node that isn't responding). The
24 // config module uses Paxos to create a total order of views, and it
25 // is ensured that the majority of the previous view agrees to the
26 // next view. The Paxos log contains all the values (i.e., views)
29 // The RSM module informs config to add nodes. The config module
30 // runs a heartbeater thread that checks in with nodes. If a node
31 // doesn't respond, the config module will invoke Paxos's proposer to
32 // remove the node. Higher layers will learn about this change when a
33 // Paxos acceptor accepts the new proposed value through
36 // To be able to bring other nodes up to date to the latest formed
37 // view, each node will have a complete history of all view numbers
38 // and their values that it knows about. At any time a node can reboot
39 // and when it re-joins, it may be many views behind; by remembering
40 // all views, the other nodes can bring this re-joined node up to
44 const std::string &_first,
45 const std::string &_me,
46 config_view_change *_vc)
47 : my_view_id(0), first(_first), me(_me), vc(_vc)
49 paxos_acceptor = new acceptor(this, me == _first, me, me);
50 paxos_proposer = new proposer(this, paxos_acceptor, me);
52 // XXX hack; maybe should have its own port number
53 paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
58 std::thread(&config::heartbeater, this).detach();
63 config::restore(const std::string &s)
66 paxos_acceptor->restore(s);
71 config::get_view(unsigned instance, std::vector<std::string> &m)
74 get_view_wo(instance, m);
77 // caller should hold cfg_mutex
79 config::get_view_wo(unsigned instance, std::vector<std::string> &m)
81 std::string value = paxos_acceptor->value(instance);
82 tprintf("get_view(%d): returns %s\n", instance, value.c_str());
87 config::members(const std::string &value, std::vector<std::string> &view) const
89 std::istringstream ist(value);
98 config::value(const std::vector<std::string> &m) const
100 std::ostringstream ost;
101 for (unsigned i = 0; i < m.size(); i++) {
108 // caller should hold cfg_mutex
110 config::reconstruct()
112 if (paxos_acceptor->instance() > 0) {
114 my_view_id = paxos_acceptor->instance();
115 get_view_wo(my_view_id, mems);
116 tprintf("config::reconstruct: %d %s\n",
117 my_view_id, print_members(mems).c_str());
121 // Called by Paxos's acceptor.
123 config::paxos_commit(unsigned instance, const std::string &value)
126 std::vector<std::string> newmem;
129 members(value, newmem);
130 tprintf("config::paxos_commit: %d: %s\n", instance,
131 print_members(newmem).c_str());
133 for (unsigned i = 0; i < mems.size(); i++) {
134 tprintf("config::paxos_commit: is %s still a member?\n",
136 if (!isamember(mems[i], newmem) && me != mems[i]) {
137 tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
138 mgr.delete_handle(mems[i]);
143 my_view_id = instance;
146 vc->commit_change(instance);
152 config::ismember(const std::string &m, unsigned vid)
155 std::vector<std::string> v;
157 return isamember(m, v);
161 config::add(const std::string &new_m, unsigned vid)
163 std::vector<std::string> m;
164 std::vector<std::string> curm;
166 if (vid != my_view_id)
168 tprintf("config::add %s\n", new_m.c_str());
172 std::string v = value(m);
173 int nextvid = my_view_id + 1;
177 r = paxos_proposer->run(nextvid, curm, v);
180 tprintf("config::add: proposer returned %s\n",
181 r ? "success" : "failure");
185 // caller should hold cfg_mutex
187 config::remove(const std::string &m)
189 adopt_lock ml(cfg_mutex);
190 tprintf("config::remove: my_view_id %d remove? %s\n",
191 my_view_id, m.c_str());
192 std::vector<std::string> n;
193 for (unsigned i = 0; i < mems.size(); i++) {
195 n.push_back(mems[i]);
197 std::string v = value(n);
198 std::vector<std::string> cmems = mems;
199 int nextvid = my_view_id + 1;
203 r = paxos_proposer->run(nextvid, cmems, v);
206 tprintf("config::remove: proposer returned %s\n",
207 r ? "success" : "failure");
212 config::heartbeater()
218 std::vector<std::string> cmems;
222 auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
223 tprintf("heartbeater: go to sleep\n");
224 config_cond.wait_until(ml, next_timeout);
228 get_view_wo(vid, cmems);
229 tprintf("heartbeater: current membership %s\n",
230 print_members(cmems).c_str());
232 if (!isamember(me, cmems)) {
233 tprintf("heartbeater: not member yet; skip hearbeat\n");
237 // who has the smallest ID?
239 for (unsigned i = 0; i < cmems.size(); i++) {
245 // ping the other nodes
246 for (unsigned i = 0; i < cmems.size(); i++) {
247 if (cmems[i] != me) {
248 if ((h = doheartbeat(cmems[i])) != OK) {
256 // ping the node with the smallest ID
257 if ((h = doheartbeat(m)) != OK)
261 if (!stable && vid == my_view_id) {
267 paxos_protocol::status
268 config::heartbeat(std::string m, unsigned vid, int &r)
271 int ret = paxos_protocol::ERR;
272 r = (int) my_view_id;
273 tprintf("heartbeat from %s(%d) my_view_id %d\n",
274 m.c_str(), vid, my_view_id);
275 if (vid == my_view_id) {
276 ret = paxos_protocol::OK;
277 } else if (paxos_proposer->isrunning()) {
278 VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
279 ret = paxos_protocol::OK;
281 ret = paxos_protocol::ERR;
287 config::doheartbeat(const std::string &m)
289 adopt_lock ml(cfg_mutex);
290 int ret = rpc_const::timeout_failure;
292 unsigned vid = my_view_id;
293 heartbeat_t res = OK;
295 tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
299 rpcc *cl = h.safebind();
301 ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
305 if (ret != paxos_protocol::OK) {
306 if (ret == rpc_const::atmostonce_failure ||
307 ret == rpc_const::oldsrv_failure) {
308 mgr.delete_handle(m);
310 tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
311 m.c_str(), ret, vid, r);
312 if (ret < 0) res = FAILURE;
316 tprintf("doheartbeat done %d\n", res);