4 // The config module maintains views. As a node joins or leaves a
5 // view, the next view will be the same as previous view, except with
6 // the new node added or removed. The first view contains only node
7 // 1. If node 2 joins after the first node (it will download the views
8 // from node 1), it will learn about view 1 with the first node as the
9 // only member. It will then invoke Paxos to create the next view.
10 // It will tell Paxos to ask the nodes in view 1 to agree on the value
11 // {1, 2}. If Paxos returns success, then it moves to view 2 with
12 // {1,2} as the members. When node 3 joins, the config module runs
13 // Paxos with the nodes in view 2 and the proposed value to be
14 // {1,2,3}. And so on. When a node discovers that some node of the
15 // current view is not responding, it kicks off Paxos to propose a new
16 // value (the current view minus the node that isn't responding). The
17 // config module uses Paxos to create a total order of views, and it
18 // is ensured that the majority of the previous view agrees to the
19 // next view. The Paxos log contains all the values (i.e., views)
22 // The RSM module informs config to add nodes. The config module
23 // runs a heartbeater thread that checks in with nodes. If a node
24 // doesn't respond, the config module will invoke Paxos's proposer to
25 // remove the node. Higher layers will learn about this change when a
26 // Paxos acceptor accepts the new proposed value through
29 // To be able to bring other nodes up to date to the latest formed
30 // view, each node will have a complete history of all view numbers
31 // and their values that it knows about. At any time a node can reboot
32 // and when it re-joins, it may be many views behind; by remembering
33 // all views, the other nodes can bring this re-joined node up to
36 config::config(const string &_first, const string &_me, config_view_change *_vc)
37 : my_view_id(0), first(_first), me(_me), vc(_vc),
38 paxos(this, me == _first, me, me)
40 get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
41 lock cfg_mutex_lock(cfg_mutex);
42 reconstruct(cfg_mutex_lock);
43 thread(&config::heartbeater, this).detach();
46 void config::restore(const string &s) {
47 lock cfg_mutex_lock(cfg_mutex);
49 reconstruct(cfg_mutex_lock);
52 void config::get_view(unsigned instance, vector<string> &m) {
53 lock cfg_mutex_lock(cfg_mutex);
54 get_view(instance, m, cfg_mutex_lock);
57 void config::get_view(unsigned instance, vector<string> &m, lock &) {
58 string value = paxos.value(instance);
59 LOG("get_view(" << instance << "): returns " << value);
63 vector<string> config::members(const string &value) const {
64 return explode(value);
67 string config::value(const vector<string> &members) const {
68 return implode(members);
71 void config::reconstruct(lock &cfg_mutex_lock) {
72 VERIFY(cfg_mutex_lock);
73 my_view_id = paxos.instance();
75 get_view(my_view_id, mems, cfg_mutex_lock);
76 LOG("config::reconstruct: " << my_view_id << " " << mems);
80 // Called by Paxos's acceptor.
81 void config::paxos_commit(unsigned instance, const string &value) {
82 lock cfg_mutex_lock(cfg_mutex);
84 vector<string> newmem = members(value);
85 LOG("config::paxos_commit: " << instance << ": " << newmem);
87 for (auto mem : mems) {
88 LOG("config::paxos_commit: is " << mem << " still a member?");
89 if (!isamember(mem, newmem) && me != mem) {
90 LOG("config::paxos_commit: delete " << mem);
91 invalidate_handle(mem);
96 my_view_id = instance;
98 cfg_mutex_lock.unlock();
99 vc->commit_change(instance);
100 cfg_mutex_lock.lock();
104 bool config::ismember(const string &m, unsigned vid) {
105 lock cfg_mutex_lock(cfg_mutex);
107 get_view(vid, v, cfg_mutex_lock);
108 return isamember(m, v);
111 bool config::add(const string &new_m, unsigned vid) {
112 lock cfg_mutex_lock(cfg_mutex);
113 LOG("adding " << new_m << " to " << vid);
114 if (vid != my_view_id) {
115 LOG("that's not my view id, " << my_view_id << "!");
118 vector<string> m = mems;
120 vector<string> cmems = mems;
121 unsigned nextvid = my_view_id + 1;
122 LOG("calling down to paxos layer");
123 cfg_mutex_lock.unlock();
124 bool r = paxos.run(nextvid, cmems, value(m));
125 cfg_mutex_lock.lock();
126 LOG("paxos proposer returned " << (r ? "success" : "failure"));
130 // caller should hold cfg_mutex
131 bool config::remove(const string &m, lock &cfg_mutex_lock) {
132 LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
134 for (auto mem : mems) {
138 vector<string> cmems = mems;
139 unsigned nextvid = my_view_id + 1;
140 cfg_mutex_lock.unlock();
141 bool r = paxos.run(nextvid, cmems, value(n));
142 cfg_mutex_lock.lock();
143 LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
147 void config::heartbeater() [[noreturn]] {
148 lock cfg_mutex_lock(cfg_mutex);
151 auto next_timeout = steady_clock::now() + seconds(3);
152 LOG("heartbeater: go to sleep");
153 config_cond.wait_until(cfg_mutex_lock, next_timeout);
155 unsigned vid = my_view_id;
156 vector<string> cmems;
157 get_view(vid, cmems, cfg_mutex_lock);
158 LOG("heartbeater: current membership " << cmems);
160 if (!isamember(me, cmems)) {
161 LOG("heartbeater: not member yet; skip hearbeat");
165 // who has the smallest ID?
166 string m = min(me, *min_element(cmems.begin(), cmems.end()));
169 // ping the other nodes
170 for (string mem : cmems) {
171 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
173 if (vid == my_view_id)
174 remove(mem, cfg_mutex_lock);
178 // ping the node with the smallest ID
179 if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
180 remove(m, cfg_mutex_lock);
185 paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
186 lock cfg_mutex_lock(cfg_mutex);
187 r = (int) my_view_id;
188 LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
189 if (vid == my_view_id)
190 return paxos_protocol::OK;
191 else if (paxos.isrunning()) {
192 VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
193 return paxos_protocol::OK;
195 return paxos_protocol::ERR;
198 config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
199 unsigned vid = my_view_id;
200 LOG("doheartbeater to " << m << " (" << vid << ")");
203 cfg_mutex_lock.unlock();
204 int r = 0, ret = rpc_const::bind_failure;
205 if (rpcc *cl = h.safebind())
206 ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
207 cfg_mutex_lock.lock();
209 heartbeat_t res = OK;
211 case paxos_protocol::OK:
213 case rpc_const::atmostonce_failure:
214 case rpc_const::oldsrv_failure:
215 invalidate_handle(m);
218 LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
219 res = (ret < 0) ? FAILURE : VIEWERR;
221 LOG("doheartbeat done " << res);