Build on wheezy, and presumably precise
[invirt/third/libt4.git] / config.cc
1 #include <sstream>
2 #include <iostream>
3 #include <stdio.h>
4 #include "config.h"
5 #include "paxos.h"
6 #include "handle.h"
7 #include "tprintf.h"
8 #include "lang/verify.h"
9
10 // The config module maintains views. As a node joins or leaves a
11 // view, the next view will be the same as previous view, except with
12 // the new node added or removed. The first view contains only node
13 // 1. If node 2 joins after the first node (it will download the views
14 // from node 1), it will learn about view 1 with the first node as the
15 // only member.  It will then invoke Paxos to create the next view.
16 // It will tell Paxos to ask the nodes in view 1 to agree on the value
17 // {1, 2}.  If Paxos returns success, then it moves to view 2 with
18 // {1,2} as the members. When node 3 joins, the config module runs
19 // Paxos with the nodes in view 2 and the proposed value to be
20 // {1,2,3}. And so on.  When a node discovers that some node of the
21 // current view is not responding, it kicks off Paxos to propose a new
22 // value (the current view minus the node that isn't responding). The
23 // config module uses Paxos to create a total order of views, and it
24 // is ensured that the majority of the previous view agrees to the
25 // next view.  The Paxos log contains all the values (i.e., views)
26 // agreed on.
27 //
28 // The RSM module informs config to add nodes. The config module
29 // runs a heartbeater thread that checks in with nodes.  If a node
30 // doesn't respond, the config module will invoke Paxos's proposer to
31 // remove the node.  Higher layers will learn about this change when a
32 // Paxos acceptor accepts the new proposed value through
33 // paxos_commit().
34 //
35 // To be able to bring other nodes up to date to the latest formed
36 // view, each node will have a complete history of all view numbers
37 // and their values that it knows about. At any time a node can reboot
38 // and when it re-joins, it may be many views behind; by remembering
39 // all views, the other nodes can bring this re-joined node up to
40 // date.
41
42 static void *
43 heartbeatthread(void *x)
44 {
45   config *r = (config *) x;
46   r->heartbeater();
47   return 0;
48 }
49
50 config::config(std::string _first, std::string _me, config_view_change *_vc) 
51   : myvid (0), first (_first), me (_me), vc (_vc)
52 {
53   VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
54   VERIFY(pthread_cond_init(&config_cond, NULL) == 0);  
55
56   std::ostringstream ost;
57   ost << me;
58
59   acc = new acceptor(this, me == _first, me, ost.str());
60   pro = new proposer(this, acc, me);
61
62   // XXX hack; maybe should have its own port number
63   pxsrpc = acc->get_rpcs();
64   pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
65
66   {
67       ScopedLock ml(&cfg_mutex);
68
69       reconstruct();
70
71       pthread_t th;
72       VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
73   }
74 }
75
76 void
77 config::restore(std::string s)
78 {
79   ScopedLock ml(&cfg_mutex);
80   acc->restore(s);
81   reconstruct();
82 }
83
84 std::vector<std::string>
85 config::get_view(unsigned instance)
86 {
87   ScopedLock ml(&cfg_mutex);
88   return get_view_wo(instance);
89 }
90
91 // caller should hold cfg_mutex
92 std::vector<std::string>
93 config::get_view_wo(unsigned instance)
94 {
95   std::string value = acc->value(instance);
96   tprintf("get_view(%d): returns %s\n", instance, value.c_str());
97   return members(value);
98 }
99
100 std::vector<std::string>
101 config::members(std::string value)
102 {
103   std::istringstream ist(value);
104   std::string m;
105   std::vector<std::string> view;
106   while (ist >> m) {
107     view.push_back(m);
108   }
109   return view;
110 }
111
112 std::string
113 config::value(std::vector<std::string> m)
114 {
115   std::ostringstream ost;
116   for (unsigned i = 0; i < m.size(); i++)  {
117     ost << m[i];
118     ost << " ";
119   }
120   return ost.str();
121 }
122
123 // caller should hold cfg_mutex
124 void
125 config::reconstruct()
126 {
127   if (acc->instance() > 0) {
128     std::string m;
129     myvid = acc->instance();
130     mems = get_view_wo(myvid);
131     tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
132   }
133 }
134
135 // Called by Paxos's acceptor.
136 void
137 config::paxos_commit(unsigned instance, std::string value)
138 {
139   std::string m;
140   std::vector<std::string> newmem;
141   ScopedLock ml(&cfg_mutex);
142
143   newmem = members(value);
144   tprintf("config::paxos_commit: %d: %s\n", instance, 
145          print_members(newmem).c_str());
146
147   for (unsigned i = 0; i < mems.size(); i++) {
148     tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
149     if (!isamember(mems[i], newmem) && me != mems[i]) {
150       tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
151       mgr.delete_handle(mems[i]);
152     }
153   }
154
155   mems = newmem;
156   myvid = instance;
157   if (vc) {
158     unsigned vid = myvid;
159     VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
160     vc->commit_change(vid);
161     VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
162   }
163 }
164
165 bool
166 config::ismember(std::string m, unsigned vid)
167 {
168   bool r;
169   ScopedLock ml(&cfg_mutex);
170   std::vector<std::string> v = get_view_wo(vid);
171   r = isamember(m, v);
172   return r;
173 }
174
175 bool
176 config::add(std::string new_m, unsigned vid)
177 {
178   std::vector<std::string> m;
179   std::vector<std::string> curm;
180   ScopedLock ml(&cfg_mutex);
181   if (vid != myvid)
182     return false;
183   tprintf("config::add %s\n", new_m.c_str());
184   m = mems;
185   m.push_back(new_m);
186   curm = mems;
187   std::string v = value(m);
188   int nextvid = myvid + 1;
189   VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
190   bool r = pro->run(nextvid, curm, v);
191   VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
192   if (r) {
193     tprintf("config::add: proposer returned success\n");
194   } else {
195     tprintf("config::add: proposer returned failure\n");
196   }
197   return r;
198 }
199
200 // caller should hold cfg_mutex
201 bool
202 config::remove_wo(std::string m)
203 {
204   tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
205   std::vector<std::string> n;
206   for (unsigned i = 0; i < mems.size(); i++) {
207     if (mems[i] != m) n.push_back(mems[i]);
208   }
209   std::string v = value(n);
210   std::vector<std::string> cmems = mems;
211   int nextvid = myvid + 1;
212   VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
213   bool r = pro->run(nextvid, cmems, v);
214   VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
215   if (r) {
216     tprintf("config::remove: proposer returned success\n");
217   } else {
218     tprintf("config::remove: proposer returned failure\n");
219   }
220   return r;
221 }
222
223 void
224 config::heartbeater()
225 {
226   struct timeval now;
227   struct timespec next_timeout;
228   std::string m;
229   heartbeat_t h;
230   bool stable;
231   unsigned vid;
232   std::vector<std::string> cmems;
233   ScopedLock ml(&cfg_mutex);
234   
235   while (1) {
236
237     gettimeofday(&now, NULL);
238     next_timeout.tv_sec = now.tv_sec + 3;
239     next_timeout.tv_nsec = 0;
240     tprintf("heartbeater: go to sleep\n");
241     pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
242
243     stable = true;
244     vid = myvid;
245     cmems = get_view_wo(vid);
246     tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
247
248     if (!isamember(me, cmems)) {
249       tprintf("heartbeater: not member yet; skip hearbeat\n");
250       continue;
251     }
252
253     //find the node with the smallest id
254     m = me;
255     for (unsigned i = 0; i < cmems.size(); i++) {
256       if (m > cmems[i])
257         m = cmems[i];
258     }
259
260     if (m == me) {
261       //if i am the one with smallest id, ping the rest of the nodes
262       for (unsigned i = 0; i < cmems.size(); i++) {
263         if (cmems[i] != me) {
264           if ((h = doheartbeat(cmems[i])) != OK) {
265             stable = false;
266             m = cmems[i];
267             break;
268           }
269         }
270       }
271     } else {
272       //the rest of the nodes ping the one with smallest id
273         if ((h = doheartbeat(m)) != OK) 
274             stable = false;
275     }
276
277     if (!stable && vid == myvid) {
278       remove_wo(m);
279     }
280   }
281 }
282
283 paxos_protocol::status
284 config::heartbeat(std::string m, unsigned vid, int &r)
285 {
286   ScopedLock ml(&cfg_mutex);
287   int ret = paxos_protocol::ERR;
288   r = (int) myvid;
289   tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
290   if (vid == myvid) {
291     ret = paxos_protocol::OK;
292   } else if (pro->isrunning()) {
293     VERIFY (vid == myvid + 1 || vid + 1 == myvid);
294     ret = paxos_protocol::OK;
295   } else {
296     ret = paxos_protocol::ERR;
297   }
298   return ret;
299 }
300
301 config::heartbeat_t
302 config::doheartbeat(std::string m)
303 {
304   int ret = rpc_const::timeout_failure;
305   int r;
306   unsigned vid = myvid;
307   heartbeat_t res = OK;
308
309   tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
310   handle h(m);
311   VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
312   rpcc *cl = h.safebind();
313   if (cl) {
314     ret = cl->call(paxos_protocol::heartbeat, me, vid, r, 
315                    rpcc::to(1000));
316   } 
317   VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
318   if (ret != paxos_protocol::OK) {
319     if (ret == rpc_const::atmostonce_failure || 
320         ret == rpc_const::oldsrv_failure) {
321       mgr.delete_handle(m);
322     } else {
323       tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", 
324              m.c_str(), ret, vid, r);
325       if (ret < 0) res = FAILURE;
326       else res = VIEWERR;
327     }
328   }
329   tprintf("doheartbeat done %d\n", res);
330   return res;
331 }
332