Imported from 6.824 labs
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3 // #include <signal.h>
4 #include <stdio.h>
5 #include "tprintf.h"
6 #include "lang/verify.h"
7
8 // This module implements the proposer and acceptor of the Paxos
9 // distributed algorithm as described by Lamport's "Paxos Made
10 // Simple".  To kick off an instance of Paxos, the caller supplies a
11 // list of nodes, a proposed value, and invokes the proposer.  If the
12 // majority of the nodes agree on the proposed value after running
13 // this instance of Paxos, the acceptor invokes the upcall
14 // paxos_commit to inform higher layers of the agreed value for this
15 // instance.
16
17
18 bool
19 operator> (const prop_t &a, const prop_t &b)
20 {
21   return (a.n > b.n || (a.n == b.n && a.m > b.m));
22 }
23
24 bool
25 operator>= (const prop_t &a, const prop_t &b)
26 {
27   return (a.n > b.n || (a.n == b.n && a.m >= b.m));
28 }
29
30 std::string
31 print_members(const std::vector<std::string> &nodes)
32 {
33   std::string s;
34   s.clear();
35   for (unsigned i = 0; i < nodes.size(); i++) {
36     s += nodes[i];
37     if (i < (nodes.size()-1))
38       s += ",";
39   }
40   return s;
41 }
42
43 bool isamember(std::string m, const std::vector<std::string> &nodes)
44 {
45   for (unsigned i = 0; i < nodes.size(); i++) {
46     if (nodes[i] == m) return 1;
47   }
48   return 0;
49 }
50
51 bool
52 proposer::isrunning()
53 {
54   bool r;
55   ScopedLock ml(pxs_mutex);
56   r = !stable;
57   return r;
58 }
59
60 // check if the servers in l2 contains a majority of servers in l1
61 bool
62 proposer::majority(const std::vector<std::string> &l1, 
63                 const std::vector<std::string> &l2)
64 {
65   unsigned n = 0;
66
67   for (unsigned i = 0; i < l1.size(); i++) {
68     if (isamember(l1[i], l2))
69       n++;
70   }
71   return n >= (l1.size() >> 1) + 1;
72 }
73
74 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, 
75                    std::string _me)
76   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), 
77     stable (true)
78 {
79   my_n.n = 0;
80   my_n.m = me;
81 }
82
83 void
84 proposer::setn()
85 {
86   my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
87 }
88
89 bool
90 proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
91 {
92   std::vector<std::string> accepts;
93   std::vector<std::string> nodes;
94   std::string v;
95   bool r = false;
96
97   ScopedLock ml(pxs_mutex);
98   tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
99          print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
100   if (!stable) {  // already running proposer?
101     tprintf("proposer::run: already running\n");
102     return false;
103   }
104   stable = false;
105   setn();
106   accepts.clear();
107   v.clear();
108   if (prepare(instance, accepts, cur_nodes, v)) {
109
110     if (majority(cur_nodes, accepts)) {
111       tprintf("paxos::manager: received a majority of prepare responses\n");
112
113       if (v.size() == 0)
114         v = newv;
115
116       breakpoint1();
117
118       nodes = accepts;
119       accepts.clear();
120       accept(instance, accepts, nodes, v);
121
122       if (majority(cur_nodes, accepts)) {
123         tprintf("paxos::manager: received a majority of accept responses\n");
124
125         breakpoint2();
126
127         decide(instance, accepts, v);
128         r = true;
129       } else {
130         tprintf("paxos::manager: no majority of accept responses\n");
131       }
132     } else {
133       tprintf("paxos::manager: no majority of prepare responses\n");
134     }
135   } else {
136     tprintf("paxos::manager: prepare is rejected %d\n", stable);
137   }
138   stable = true;
139   return r;
140 }
141
142 // proposer::run() calls prepare to send prepare RPCs to nodes
143 // and collect responses. if one of those nodes
144 // replies with an oldinstance, return false.
145 // otherwise fill in accepts with set of nodes that accepted,
146 // set v to the v_a with the highest n_a, and return true.
147 bool
148 proposer::prepare(unsigned instance, std::vector<std::string> &accepts, 
149          std::vector<std::string> nodes,
150          std::string &v)
151 {
152     struct paxos_protocol::preparearg arg = { instance, my_n };
153     struct paxos_protocol::prepareres res;
154     prop_t n_a = { 0, "" };
155     rpcc *r;
156     for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
157         handle h(*i);
158         if (!(r = h.safebind()))
159             continue;
160         int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
161         if (status == paxos_protocol::OK) {
162             if (res.oldinstance) {
163                 tprintf("commiting old instance!\n");
164                 acc->commit(instance, res.v_a);
165                 return false;
166             }
167             if (res.accept) {
168                 accepts.push_back(*i);
169                 if (res.n_a >= n_a) {
170                     tprintf("found a newer accepted proposal\n");
171                     v = res.v_a;
172                     n_a = res.n_a;
173                 }
174             }
175         }
176     }
177     return true;
178 }
179
180 // run() calls this to send out accept RPCs to accepts.
181 // fill in accepts with list of nodes that accepted.
182 void
183 proposer::accept(unsigned instance, std::vector<std::string> &accepts,
184         std::vector<std::string> nodes, std::string v)
185 {
186     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
187     rpcc *r;
188     for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
189         handle h(*i);
190         if (!(r = h.safebind()))
191             continue;
192         bool accept = false;
193         int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
194         if (status == paxos_protocol::OK) {
195             if (accept)
196                 accepts.push_back(*i);
197         }
198     }
199 }
200
201 void
202 proposer::decide(unsigned instance, std::vector<std::string> accepts, 
203               std::string v)
204 {
205     struct paxos_protocol::decidearg arg = { instance, v };
206     rpcc *r;
207     for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
208         handle h(*i);
209         if (!(r = h.safebind()))
210             continue;
211         int res = 0;
212         r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
213     }
214 }
215
216 acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, 
217              std::string _value)
218   : cfg(_cfg), me (_me), instance_h(0)
219 {
220   n_h.n = 0;
221   n_h.m = me;
222   n_a.n = 0;
223   n_a.m = me;
224   v_a.clear();
225
226   l = new log (this, me);
227
228   if (instance_h == 0 && _first) {
229     values[1] = _value;
230     l->loginstance(1, _value);
231     instance_h = 1;
232   }
233
234   pxs = new rpcs(atoi(_me.c_str()));
235   pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
236   pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
237   pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
238 }
239
240 paxos_protocol::status
241 acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
242     paxos_protocol::prepareres &r)
243 {
244     ScopedLock ml(pxs_mutex);
245     r.oldinstance = false;
246     r.accept = false;
247     r.n_a = n_a;
248     r.v_a = v_a;
249     if (a.instance <= instance_h) {
250         r.oldinstance = true;
251         r.v_a = values[a.instance];
252     } else if (a.n > n_h) {
253         n_h = a.n;
254         l->logprop(n_h);
255         r.accept = true;
256     } else {
257         tprintf("I totally rejected this request.  Ha.\n");
258     }
259     return paxos_protocol::OK;
260 }
261
262 paxos_protocol::status
263 acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
264 {
265     ScopedLock ml(pxs_mutex);
266     r = false;
267     if (a.n >= n_h) {
268         n_a = a.n;
269         v_a = a.v;
270         l->logaccept(n_a, v_a);
271         r = true;
272     }
273     return paxos_protocol::OK;
274 }
275
276 // the src argument is only for debug purpose
277 paxos_protocol::status
278 acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
279 {
280   ScopedLock ml(pxs_mutex);
281   tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
282          a.instance, instance_h, v_a.c_str());
283   if (a.instance == instance_h + 1) {
284     VERIFY(v_a == a.v);
285     commit_wo(a.instance, v_a);
286   } else if (a.instance <= instance_h) {
287     // we are ahead ignore.
288   } else {
289     // we are behind
290     VERIFY(0);
291   }
292   return paxos_protocol::OK;
293 }
294
295 void
296 acceptor::commit_wo(unsigned instance, std::string value)
297 {
298   //assume pxs_mutex is held
299   tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
300   if (instance > instance_h) {
301     tprintf("commit: highestaccepteinstance = %d\n", instance);
302     values[instance] = value;
303     l->loginstance(instance, value);
304     instance_h = instance;
305     n_h.n = 0;
306     n_h.m = me;
307     n_a.n = 0;
308     n_a.m = me;
309     v_a.clear();
310     if (cfg) {
311       pxs_mutex.release();
312       cfg->paxos_commit(instance, value);
313       pxs_mutex.acquire();
314     }
315   }
316 }
317
318 void
319 acceptor::commit(unsigned instance, std::string value)
320 {
321   ScopedLock ml(pxs_mutex);
322   commit_wo(instance, value);
323 }
324
325 std::string
326 acceptor::dump()
327 {
328   return l->dump();
329 }
330
331 void
332 acceptor::restore(std::string s)
333 {
334   l->restore(s);
335   l->logread();
336 }
337
338
339
340 // For testing purposes
341
342 // Call this from your code between phases prepare and accept of proposer
343 void
344 proposer::breakpoint1()
345 {
346   if (break1) {
347     tprintf("Dying at breakpoint 1!\n");
348     exit(1);
349   }
350 }
351
352 // Call this from your code between phases accept and decide of proposer
353 void
354 proposer::breakpoint2()
355 {
356   if (break2) {
357     tprintf("Dying at breakpoint 2!\n");
358     exit(1);
359   }
360 }
361
362 void
363 proposer::breakpoint(int b)
364 {
365   if (b == 3) {
366     tprintf("Proposer: breakpoint 1\n");
367     break1 = true;
368   } else if (b == 4) {
369     tprintf("Proposer: breakpoint 2\n");
370     break2 = true;
371   }
372 }