83bf4f1f39973a60f4005a3f5d6aaec2539e783d
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3 #include <stdio.h>
4 #include "tprintf.h"
5 #include "lang/verify.h"
6 #include "lock.h"
7
8 // This module implements the proposer and acceptor of the Paxos
9 // distributed algorithm as described by Lamport's "Paxos Made
10 // Simple".  To kick off an instance of Paxos, the caller supplies a
11 // list of nodes, a proposed value, and invokes the proposer.  If the
12 // majority of the nodes agree on the proposed value after running
13 // this instance of Paxos, the acceptor invokes the upcall
14 // paxos_commit to inform higher layers of the agreed value for this
15 // instance.
16
17 bool operator> (const prop_t &a, const prop_t &b) {
18     return (a.n > b.n || (a.n == b.n && a.m > b.m));
19 }
20
21 bool operator>= (const prop_t &a, const prop_t &b) {
22     return (a.n > b.n || (a.n == b.n && a.m >= b.m));
23 }
24
25 std::string
26 print_members(const std::vector<std::string> &nodes) {
27     std::string s;
28     s.clear();
29     for (unsigned i = 0; i < nodes.size(); i++) {
30         s += nodes[i];
31         if (i < (nodes.size()-1))
32             s += ",";
33     }
34     return s;
35 }
36
37
38 bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
39     for (auto n : nodes) {
40         if (n == m)
41             return 1;
42     }
43     return 0;
44 }
45
46 bool proposer::isrunning() {
47     bool r;
48     lock ml(pxs_mutex);
49     r = !stable;
50     return r;
51 }
52
53 // check if the servers in l2 contains a majority of servers in l1
54 bool proposer::majority(const std::vector<std::string> &l1,
55                 const std::vector<std::string> &l2) {
56     unsigned n = 0;
57
58     for (unsigned i = 0; i < l1.size(); i++) {
59         if (isamember(l1[i], l2))
60             n++;
61     }
62     return n >= (l1.size() >> 1) + 1;
63 }
64
65 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
66         const std::string &_me)
67   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
68     stable (true)
69 {
70     my_n.n = 0;
71     my_n.m = me;
72 }
73
74 void proposer::setn()
75 {
76     my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
77 }
78
79 bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
80         const std::string & newv)
81 {
82     std::vector<std::string> accepts;
83     std::vector<std::string> nodes;
84     std::string v;
85     bool r = false;
86
87     lock ml(pxs_mutex);
88     tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
89             print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
90     if (!stable) {  // already running proposer?
91         tprintf("proposer::run: already running\n");
92         return false;
93     }
94     stable = false;
95     setn();
96     accepts.clear();
97     v.clear();
98     if (prepare(instance, accepts, cur_nodes, v)) {
99
100         if (majority(cur_nodes, accepts)) {
101             tprintf("paxos::manager: received a majority of prepare responses\n");
102
103             if (v.size() == 0)
104                 v = newv;
105
106             breakpoint1();
107
108             nodes = accepts;
109             accepts.clear();
110             accept(instance, accepts, nodes, v);
111
112             if (majority(cur_nodes, accepts)) {
113                 tprintf("paxos::manager: received a majority of accept responses\n");
114
115                 breakpoint2();
116
117                 decide(instance, accepts, v);
118                 r = true;
119             } else {
120                 tprintf("paxos::manager: no majority of accept responses\n");
121             }
122         } else {
123             tprintf("paxos::manager: no majority of prepare responses\n");
124         }
125     } else {
126         tprintf("paxos::manager: prepare is rejected %d\n", stable);
127     }
128     stable = true;
129     return r;
130 }
131
132 // proposer::run() calls prepare to send prepare RPCs to nodes
133 // and collect responses. if one of those nodes
134 // replies with an oldinstance, return false.
135 // otherwise fill in accepts with set of nodes that accepted,
136 // set v to the v_a with the highest n_a, and return true.
137 bool
138 proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
139         const std::vector<std::string> & nodes,
140         std::string & v)
141 {
142     struct paxos_protocol::preparearg arg = { instance, my_n };
143     struct paxos_protocol::prepareres res;
144     prop_t n_a = { 0, "" };
145     rpcc *r;
146     for (auto i : nodes) {
147         handle h(i);
148         if (!(r = h.safebind()))
149             continue;
150         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
151         if (status == paxos_protocol::OK) {
152             if (res.oldinstance) {
153                 tprintf("commiting old instance!\n");
154                 acc->commit(instance, res.v_a);
155                 return false;
156             }
157             if (res.accept) {
158                 accepts.push_back(i);
159                 if (res.n_a >= n_a) {
160                     tprintf("found a newer accepted proposal\n");
161                     v = res.v_a;
162                     n_a = res.n_a;
163                 }
164             }
165         }
166     }
167     return true;
168 }
169
170 // run() calls this to send out accept RPCs to accepts.
171 // fill in accepts with list of nodes that accepted.
172 void
173 proposer::accept(unsigned instance, std::vector<std::string> & accepts,
174         const std::vector<std::string> & nodes, const std::string & v)
175 {
176     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
177     rpcc *r;
178     for (auto i : nodes) {
179         handle h(i);
180         if (!(r = h.safebind()))
181             continue;
182         bool accept = false;
183         int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
184         if (status == paxos_protocol::OK && accept)
185             accepts.push_back(i);
186     }
187 }
188
189 void
190 proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
191         const std::string & v)
192 {
193     struct paxos_protocol::decidearg arg = { instance, v };
194     rpcc *r;
195     for (auto i : accepts) {
196         handle h(i);
197         if (!(r = h.safebind()))
198             continue;
199         int res = 0;
200         r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
201     }
202 }
203
204 acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
205         const std::string & _value)
206   : cfg(_cfg), me (_me), instance_h(0)
207 {
208     n_h.n = 0;
209     n_h.m = me;
210     n_a.n = 0;
211     n_a.m = me;
212     v_a.clear();
213
214     l = new log (this, me);
215
216     if (instance_h == 0 && _first) {
217         values[1] = _value;
218         l->loginstance(1, _value);
219         instance_h = 1;
220     }
221
222     pxs = new rpcs((uint32_t)std::stoi(_me));
223     pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
224     pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
225     pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
226 }
227
228 paxos_protocol::status
229 acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
230         paxos_protocol::preparearg a)
231 {
232     lock ml(pxs_mutex);
233     r.oldinstance = false;
234     r.accept = false;
235     r.n_a = n_a;
236     r.v_a = v_a;
237     if (a.instance <= instance_h) {
238         r.oldinstance = true;
239         r.v_a = values[a.instance];
240     } else if (a.n > n_h) {
241         n_h = a.n;
242         l->logprop(n_h);
243         r.accept = true;
244     } else {
245         tprintf("I totally rejected this request.  Ha.\n");
246     }
247     return paxos_protocol::OK;
248 }
249
250 paxos_protocol::status
251 acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
252 {
253     lock ml(pxs_mutex);
254     r = false;
255     if (a.n >= n_h) {
256         n_a = a.n;
257         v_a = a.v;
258         l->logaccept(n_a, v_a);
259         r = true;
260     }
261     return paxos_protocol::OK;
262 }
263
264 // the src argument is only for debugging
265 paxos_protocol::status
266 acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
267 {
268     lock ml(pxs_mutex);
269     tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
270             a.instance, instance_h, v_a.c_str());
271     if (a.instance == instance_h + 1) {
272         VERIFY(v_a == a.v);
273         commit(a.instance, v_a, ml);
274     } else if (a.instance <= instance_h) {
275         // we are ahead ignore.
276     } else {
277         // we are behind
278         VERIFY(0);
279     }
280     return paxos_protocol::OK;
281 }
282
283 void
284 acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
285 {
286     tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
287     if (instance > instance_h) {
288         tprintf("commit: highestaccepteinstance = %d\n", instance);
289         values[instance] = value;
290         l->loginstance(instance, value);
291         instance_h = instance;
292         n_h.n = 0;
293         n_h.m = me;
294         n_a.n = 0;
295         n_a.m = me;
296         v_a.clear();
297         if (cfg) {
298             pxs_mutex_lock.unlock();
299             cfg->paxos_commit(instance, value);
300             pxs_mutex_lock.lock();
301         }
302     }
303 }
304
305 void
306 acceptor::commit(unsigned instance, const std::string & value)
307 {
308     lock ml(pxs_mutex);
309     commit(instance, value, ml);
310 }
311
312 std::string
313 acceptor::dump()
314 {
315     return l->dump();
316 }
317
318 void
319 acceptor::restore(const std::string & s)
320 {
321     l->restore(s);
322     l->logread();
323 }
324
325
326
327 // For testing purposes
328
329 // Call this from your code between phases prepare and accept of proposer
330 void
331 proposer::breakpoint1()
332 {
333     if (break1) {
334         tprintf("Dying at breakpoint 1!\n");
335         exit(1);
336     }
337 }
338
339 // Call this from your code between phases accept and decide of proposer
340 void
341 proposer::breakpoint2()
342 {
343     if (break2) {
344         tprintf("Dying at breakpoint 2!\n");
345         exit(1);
346     }
347 }
348
349 void
350 proposer::breakpoint(int b)
351 {
352     if (b == 3) {
353         tprintf("Proposer: breakpoint 1\n");
354         break1 = true;
355     } else if (b == 4) {
356         tprintf("Proposer: breakpoint 2\n");
357         break2 = true;
358     }
359 }