4 // The config module maintains views. As a node joins or leaves a
5 // view, the next view will be the same as previous view, except with
6 // the new node added or removed. The first view contains only node
7 // 1. If node 2 joins after the first node (it will download the views
8 // from node 1), it will learn about view 1 with the first node as the
9 // only member. It will then invoke Paxos to create the next view.
10 // It will tell Paxos to ask the nodes in view 1 to agree on the value
11 // {1, 2}. If Paxos returns success, then it moves to view 2 with
12 // {1,2} as the members. When node 3 joins, the config module runs
13 // Paxos with the nodes in view 2 and the proposed value to be
14 // {1,2,3}. And so on. When a node discovers that some node of the
15 // current view is not responding, it kicks off Paxos to propose a new
16 // value (the current view minus the node that isn't responding). The
17 // config module uses Paxos to create a total order of views, and it
18 // is ensured that the majority of the previous view agrees to the
19 // next view. The Paxos log contains all the values (i.e., views)
22 // The RSM module informs config to add nodes. The config module
23 // runs a heartbeater thread that checks in with nodes. If a node
24 // doesn't respond, the config module will invoke Paxos's proposer to
25 // remove the node. Higher layers will learn about this change when a
26 // Paxos acceptor accepts the new proposed value through
29 // To be able to bring other nodes up to date to the latest formed
30 // view, each node will have a complete history of all view numbers
31 // and their values that it knows about. At any time a node can reboot
32 // and when it re-joins, it may be many views behind; by remembering
33 // all views, the other nodes can bring this re-joined node up to
36 config_view_change::~config_view_change() {}
38 config::config(const string & _first, const string & _me, config_view_change *_vc)
39 : my_view_id(0), first(_first), me(_me), vc(_vc),
40 paxos(this, me == _first, me, me)
42 get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
43 lock cfg_mutex_lock(cfg_mutex);
44 reconstruct(cfg_mutex_lock);
45 thread(&config::heartbeater, this).detach();
48 void config::restore(const string & s) {
49 lock cfg_mutex_lock(cfg_mutex);
51 reconstruct(cfg_mutex_lock);
54 void config::get_view(unsigned instance, vector<string> & m) {
55 lock cfg_mutex_lock(cfg_mutex);
56 get_view(instance, m, cfg_mutex_lock);
59 void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
60 VERIFY(cfg_mutex_lock);
61 string value = paxos.value(instance);
62 LOG << "get_view(" << instance << "): returns " << value;
66 void config::reconstruct(lock & cfg_mutex_lock) {
67 VERIFY(cfg_mutex_lock);
68 my_view_id = paxos.instance();
70 get_view(my_view_id, mems, cfg_mutex_lock);
71 LOG << "view " << my_view_id << " " << mems;
75 // Called by Paxos's acceptor.
76 void config::paxos_commit(unsigned instance, const string & value) {
77 lock cfg_mutex_lock(cfg_mutex);
79 vector<string> newmem = explode(value);
80 LOG << "instance " << instance << ": " << newmem;
82 for (auto mem : mems) {
83 LOG << "is " << mem << " still a member?";
84 if (!isamember(mem, newmem) && me != mem) {
85 LOG << "delete " << mem;
86 handle(mem).invalidate();
91 my_view_id = instance;
93 cfg_mutex_lock.unlock();
94 vc->commit_change(instance);
95 cfg_mutex_lock.lock();
99 bool config::ismember(const string & m, unsigned vid) {
100 lock cfg_mutex_lock(cfg_mutex);
102 get_view(vid, v, cfg_mutex_lock);
103 return isamember(m, v);
106 bool config::add(const string & new_m, unsigned vid) {
107 lock cfg_mutex_lock(cfg_mutex);
108 LOG << "adding " << new_m << " to " << vid;
109 if (vid != my_view_id) {
110 LOG << "that's not my view id, " << my_view_id << "!";
113 LOG << "calling down to paxos layer";
114 vector<string> m(mems), cmems(mems);
116 LOG << "old mems " << cmems << " " << implode(cmems);
117 LOG << "new mems " << m << " " << implode(m);
118 unsigned nextvid = my_view_id + 1;
119 cfg_mutex_lock.unlock();
120 bool r = paxos.run(nextvid, cmems, implode(m));
121 cfg_mutex_lock.lock();
122 LOG << "paxos proposer returned " << (r ? "success" : "failure");
126 // caller should hold cfg_mutex
127 bool config::remove(const string & m, lock & cfg_mutex_lock) {
128 VERIFY(cfg_mutex_lock);
129 LOG << "my_view_id " << my_view_id << " remove? " << m;
131 for (auto mem : mems) {
135 vector<string> cmems = mems;
136 unsigned nextvid = my_view_id + 1;
137 cfg_mutex_lock.unlock();
138 bool r = paxos.run(nextvid, cmems, implode(n));
139 cfg_mutex_lock.lock();
140 LOG << "proposer returned " << (r ? "success" : "failure");
144 void config::heartbeater() {
145 lock cfg_mutex_lock(cfg_mutex);
148 auto next_timeout = steady_clock::now() + milliseconds(300);
149 LOG << "go to sleep";
150 config_cond.wait_until(cfg_mutex_lock, next_timeout);
152 unsigned vid = my_view_id;
153 vector<string> cmems;
154 get_view(vid, cmems, cfg_mutex_lock);
155 LOG << "current membership " << cmems;
157 if (!isamember(me, cmems)) {
158 LOG << "not member yet; skip hearbeat";
162 // who has the smallest ID?
163 string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
166 // ping the other nodes
167 for (string mem : cmems) {
168 if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
170 if (vid == my_view_id)
171 remove(mem, cfg_mutex_lock);
175 // ping the node with the smallest ID
176 if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
177 remove(m, cfg_mutex_lock);
182 paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
183 lock cfg_mutex_lock(cfg_mutex);
184 r = (int) my_view_id;
185 LOG<< "heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id;
186 if (vid == my_view_id)
187 return paxos_protocol::OK;
188 else if (paxos.isrunning()) {
189 VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
190 return paxos_protocol::OK;
192 return paxos_protocol::ERR;
195 config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
196 VERIFY(cfg_mutex_lock);
197 unsigned vid = my_view_id;
198 LOG << "heartbeat to " << m << " (" << vid << ")";
201 cfg_mutex_lock.unlock();
202 int r = 0, ret = rpc_protocol::bind_failure;
203 if (rpcc *cl = h.safebind())
204 ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
205 cfg_mutex_lock.lock();
207 heartbeat_t res = OK;
209 case paxos_protocol::OK:
211 case rpc_protocol::atmostonce_failure:
212 case rpc_protocol::oldsrv_failure:
216 LOG << "problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r;
217 res = (ret < 0) ? FAILURE : VIEWERR;
219 LOG << "done " << res;