8 #include "lang/verify.h"
10 // The config module maintains views. As a node joins or leaves a
11 // view, the next view will be the same as previous view, except with
12 // the new node added or removed. The first view contains only node
13 // 1. If node 2 joins after the first node (it will download the views
14 // from node 1), it will learn about view 1 with the first node as the
15 // only member. It will then invoke Paxos to create the next view.
16 // It will tell Paxos to ask the nodes in view 1 to agree on the value
17 // {1, 2}. If Paxos returns success, then it moves to view 2 with
18 // {1,2} as the members. When node 3 joins, the config module runs
19 // Paxos with the nodes in view 2 and the proposed value to be
20 // {1,2,3}. And so on. When a node discovers that some node of the
21 // current view is not responding, it kicks off Paxos to propose a new
22 // value (the current view minus the node that isn't responding). The
23 // config module uses Paxos to create a total order of views, and it
24 // is ensured that the majority of the previous view agrees to the
25 // next view. The Paxos log contains all the values (i.e., views)
28 // The RSM module informs config to add nodes. The config module
29 // runs a heartbeater thread that checks in with nodes. If a node
30 // doesn't respond, the config module will invoke Paxos's proposer to
31 // remove the node. Higher layers will learn about this change when a
32 // Paxos acceptor accepts the new proposed value through
35 // To be able to bring other nodes up to date to the latest formed
36 // view, each node will have a complete history of all view numbers
37 // and their values that it knows about. At any time a node can reboot
38 // and when it re-joins, it may be many views behind; by remembering
39 // all views, the other nodes can bring this re-joined node up to
43 heartbeatthread(void *x)
45 config *r = (config *) x;
50 config::config(std::string _first, std::string _me, config_view_change *_vc)
51 : myvid (0), first (_first), me (_me), vc (_vc)
53 VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
54 VERIFY(pthread_cond_init(&config_cond, NULL) == 0);
56 std::ostringstream ost;
59 acc = new acceptor(this, me == _first, me, ost.str());
60 pro = new proposer(this, acc, me);
62 // XXX hack; maybe should have its own port number
63 pxsrpc = acc->get_rpcs();
64 pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
67 ScopedLock ml(&cfg_mutex);
72 VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
77 config::restore(std::string s)
79 ScopedLock ml(&cfg_mutex);
84 std::vector<std::string>
85 config::get_view(unsigned instance)
87 ScopedLock ml(&cfg_mutex);
88 return get_view_wo(instance);
91 // caller should hold cfg_mutex
92 std::vector<std::string>
93 config::get_view_wo(unsigned instance)
95 std::string value = acc->value(instance);
96 tprintf("get_view(%d): returns %s\n", instance, value.c_str());
97 return members(value);
100 std::vector<std::string>
101 config::members(std::string value)
103 std::istringstream ist(value);
105 std::vector<std::string> view;
113 config::value(std::vector<std::string> m)
115 std::ostringstream ost;
116 for (unsigned i = 0; i < m.size(); i++) {
123 // caller should hold cfg_mutex
125 config::reconstruct()
127 if (acc->instance() > 0) {
129 myvid = acc->instance();
130 mems = get_view_wo(myvid);
131 tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
135 // Called by Paxos's acceptor.
137 config::paxos_commit(unsigned instance, std::string value)
140 std::vector<std::string> newmem;
141 ScopedLock ml(&cfg_mutex);
143 newmem = members(value);
144 tprintf("config::paxos_commit: %d: %s\n", instance,
145 print_members(newmem).c_str());
147 for (unsigned i = 0; i < mems.size(); i++) {
148 tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
149 if (!isamember(mems[i], newmem) && me != mems[i]) {
150 tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
151 mgr.delete_handle(mems[i]);
158 unsigned vid = myvid;
159 VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
160 vc->commit_change(vid);
161 VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
166 config::ismember(std::string m, unsigned vid)
169 ScopedLock ml(&cfg_mutex);
170 std::vector<std::string> v = get_view_wo(vid);
176 config::add(std::string new_m, unsigned vid)
178 std::vector<std::string> m;
179 std::vector<std::string> curm;
180 ScopedLock ml(&cfg_mutex);
183 tprintf("config::add %s\n", new_m.c_str());
187 std::string v = value(m);
188 int nextvid = myvid + 1;
189 VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
190 bool r = pro->run(nextvid, curm, v);
191 VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
193 tprintf("config::add: proposer returned success\n");
195 tprintf("config::add: proposer returned failure\n");
200 // caller should hold cfg_mutex
202 config::remove_wo(std::string m)
204 tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
205 std::vector<std::string> n;
206 for (unsigned i = 0; i < mems.size(); i++) {
207 if (mems[i] != m) n.push_back(mems[i]);
209 std::string v = value(n);
210 std::vector<std::string> cmems = mems;
211 int nextvid = myvid + 1;
212 VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
213 bool r = pro->run(nextvid, cmems, v);
214 VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
216 tprintf("config::remove: proposer returned success\n");
218 tprintf("config::remove: proposer returned failure\n");
224 config::heartbeater()
227 struct timespec next_timeout;
232 std::vector<std::string> cmems;
233 ScopedLock ml(&cfg_mutex);
237 gettimeofday(&now, NULL);
238 next_timeout.tv_sec = now.tv_sec + 3;
239 next_timeout.tv_nsec = 0;
240 tprintf("heartbeater: go to sleep\n");
241 pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
245 cmems = get_view_wo(vid);
246 tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
248 if (!isamember(me, cmems)) {
249 tprintf("heartbeater: not member yet; skip hearbeat\n");
253 //find the node with the smallest id
255 for (unsigned i = 0; i < cmems.size(); i++) {
261 //if i am the one with smallest id, ping the rest of the nodes
262 for (unsigned i = 0; i < cmems.size(); i++) {
263 if (cmems[i] != me) {
264 if ((h = doheartbeat(cmems[i])) != OK) {
272 //the rest of the nodes ping the one with smallest id
273 if ((h = doheartbeat(m)) != OK)
277 if (!stable && vid == myvid) {
283 paxos_protocol::status
284 config::heartbeat(std::string m, unsigned vid, int &r)
286 ScopedLock ml(&cfg_mutex);
287 int ret = paxos_protocol::ERR;
289 tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
291 ret = paxos_protocol::OK;
292 } else if (pro->isrunning()) {
293 VERIFY (vid == myvid + 1 || vid + 1 == myvid);
294 ret = paxos_protocol::OK;
296 ret = paxos_protocol::ERR;
302 config::doheartbeat(std::string m)
304 int ret = rpc_const::timeout_failure;
306 unsigned vid = myvid;
307 heartbeat_t res = OK;
309 tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
311 VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
312 rpcc *cl = h.safebind();
314 ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
317 VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
318 if (ret != paxos_protocol::OK) {
319 if (ret == rpc_const::atmostonce_failure ||
320 ret == rpc_const::oldsrv_failure) {
321 mgr.delete_handle(m);
323 tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
324 m.c_str(), ret, vid, r);
325 if (ret < 0) res = FAILURE;
329 tprintf("doheartbeat done %d\n", res);