46d9c1c1cc9bb5c0bcf946fb73872212114c83e7
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3 #include "threaded_log.h"
4 #include "lang/verify.h"
5 #include "lock.h"
6
7 using std::stoi;
8
9 // This module implements the proposer and acceptor of the Paxos
10 // distributed algorithm as described by Lamport's "Paxos Made
11 // Simple".  To kick off an instance of Paxos, the caller supplies a
12 // list of nodes, a proposed value, and invokes the proposer.  If the
13 // majority of the nodes agree on the proposed value after running
14 // this instance of Paxos, the acceptor invokes the upcall
15 // paxos_commit to inform higher layers of the agreed value for this
16 // instance.
17
18 bool operator> (const prop_t &a, const prop_t &b) {
19     return (a.n > b.n || (a.n == b.n && a.m > b.m));
20 }
21
22 bool operator>= (const prop_t &a, const prop_t &b) {
23     return (a.n > b.n || (a.n == b.n && a.m >= b.m));
24 }
25
26 string
27 print_members(const vector<string> &nodes) {
28     string s;
29     s.clear();
30     for (unsigned i = 0; i < nodes.size(); i++) {
31         s += nodes[i];
32         if (i < (nodes.size()-1))
33             s += ",";
34     }
35     return s;
36 }
37
38
39 bool isamember(const string & m, const vector<string> & nodes) {
40     for (auto n : nodes) {
41         if (n == m)
42             return 1;
43     }
44     return 0;
45 }
46
47 bool proposer::isrunning() {
48     bool r;
49     lock ml(pxs_mutex);
50     r = !stable;
51     return r;
52 }
53
54 // check if the servers in l2 contains a majority of servers in l1
55 bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
56     unsigned n = 0;
57
58     for (unsigned i = 0; i < l1.size(); i++) {
59         if (isamember(l1[i], l2))
60             n++;
61     }
62     return n >= (l1.size() >> 1) + 1;
63 }
64
65 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
66   : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
67     stable (true)
68 {
69     my_n.n = 0;
70     my_n.m = me;
71 }
72
73 void proposer::setn()
74 {
75     my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
76 }
77
78 bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
79 {
80     vector<string> accepts;
81     vector<string> nodes;
82     string v;
83     bool r = false;
84
85     lock ml(pxs_mutex);
86     LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
87     if (!stable) {  // already running proposer?
88         LOG("proposer::run: already running");
89         return false;
90     }
91     stable = false;
92     setn();
93     accepts.clear();
94     v.clear();
95     if (prepare(instance, accepts, cur_nodes, v)) {
96
97         if (majority(cur_nodes, accepts)) {
98             LOG("paxos::manager: received a majority of prepare responses");
99
100             if (v.size() == 0)
101                 v = newv;
102
103             breakpoint1();
104
105             nodes = accepts;
106             accepts.clear();
107             accept(instance, accepts, nodes, v);
108
109             if (majority(cur_nodes, accepts)) {
110                 LOG("paxos::manager: received a majority of accept responses");
111
112                 breakpoint2();
113
114                 decide(instance, accepts, v);
115                 r = true;
116             } else {
117                 LOG("paxos::manager: no majority of accept responses");
118             }
119         } else {
120             LOG("paxos::manager: no majority of prepare responses");
121         }
122     } else {
123         LOG("paxos::manager: prepare is rejected " << stable);
124     }
125     stable = true;
126     return r;
127 }
128
129 // proposer::run() calls prepare to send prepare RPCs to nodes
130 // and collect responses. if one of those nodes
131 // replies with an oldinstance, return false.
132 // otherwise fill in accepts with set of nodes that accepted,
133 // set v to the v_a with the highest n_a, and return true.
134 bool
135 proposer::prepare(unsigned instance, vector<string> & accepts,
136         const vector<string> & nodes,
137         string & v)
138 {
139     struct paxos_protocol::preparearg arg = { instance, my_n };
140     struct paxos_protocol::prepareres res;
141     prop_t n_a = { 0, "" };
142     rpcc *r;
143     for (auto i : nodes) {
144         handle h(i);
145         if (!(r = h.safebind()))
146             continue;
147         int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
148         if (status == paxos_protocol::OK) {
149             if (res.oldinstance) {
150                 LOG("commiting old instance!");
151                 acc->commit(instance, res.v_a);
152                 return false;
153             }
154             if (res.accept) {
155                 accepts.push_back(i);
156                 if (res.n_a >= n_a) {
157                     LOG("found a newer accepted proposal");
158                     v = res.v_a;
159                     n_a = res.n_a;
160                 }
161             }
162         }
163     }
164     return true;
165 }
166
167 // run() calls this to send out accept RPCs to accepts.
168 // fill in accepts with list of nodes that accepted.
169 void
170 proposer::accept(unsigned instance, vector<string> & accepts,
171         const vector<string> & nodes, const string & v)
172 {
173     struct paxos_protocol::acceptarg arg = { instance, my_n, v };
174     rpcc *r;
175     for (auto i : nodes) {
176         handle h(i);
177         if (!(r = h.safebind()))
178             continue;
179         bool accept = false;
180         int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
181         if (status == paxos_protocol::OK && accept)
182             accepts.push_back(i);
183     }
184 }
185
186 void
187 proposer::decide(unsigned instance, const vector<string> & accepts,
188         const string & v)
189 {
190     struct paxos_protocol::decidearg arg = { instance, v };
191     rpcc *r;
192     for (auto i : accepts) {
193         handle h(i);
194         if (!(r = h.safebind()))
195             continue;
196         int res = 0;
197         r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
198     }
199 }
200
201 acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
202         const string & _value)
203   : cfg(_cfg), me (_me), instance_h(0)
204 {
205     n_h.n = 0;
206     n_h.m = me;
207     n_a.n = 0;
208     n_a.m = me;
209     v_a.clear();
210
211     l = new log (this, me);
212
213     if (instance_h == 0 && _first) {
214         values[1] = _value;
215         l->loginstance(1, _value);
216         instance_h = 1;
217     }
218
219     pxs = new rpcs((uint32_t)stoi(_me));
220     pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
221     pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
222     pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
223 }
224
225 paxos_protocol::status
226 acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
227         paxos_protocol::preparearg a)
228 {
229     lock ml(pxs_mutex);
230     r.oldinstance = false;
231     r.accept = false;
232     r.n_a = n_a;
233     r.v_a = v_a;
234     if (a.instance <= instance_h) {
235         r.oldinstance = true;
236         r.v_a = values[a.instance];
237     } else if (a.n > n_h) {
238         n_h = a.n;
239         l->logprop(n_h);
240         r.accept = true;
241     } else {
242         LOG("I totally rejected this request.  Ha.");
243     }
244     return paxos_protocol::OK;
245 }
246
247 paxos_protocol::status
248 acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
249 {
250     lock ml(pxs_mutex);
251     r = false;
252     if (a.n >= n_h) {
253         n_a = a.n;
254         v_a = a.v;
255         l->logaccept(n_a, v_a);
256         r = true;
257     }
258     return paxos_protocol::OK;
259 }
260
261 // the src argument is only for debugging
262 paxos_protocol::status
263 acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
264 {
265     lock ml(pxs_mutex);
266     LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
267     if (a.instance == instance_h + 1) {
268         VERIFY(v_a == a.v);
269         commit(a.instance, v_a, ml);
270     } else if (a.instance <= instance_h) {
271         // we are ahead ignore.
272     } else {
273         // we are behind
274         VERIFY(0);
275     }
276     return paxos_protocol::OK;
277 }
278
279 void
280 acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
281 {
282     LOG("acceptor::commit: instance=" << instance << " has v=" << value);
283     if (instance > instance_h) {
284         LOG("commit: highestaccepteinstance = " << instance);
285         values[instance] = value;
286         l->loginstance(instance, value);
287         instance_h = instance;
288         n_h.n = 0;
289         n_h.m = me;
290         n_a.n = 0;
291         n_a.m = me;
292         v_a.clear();
293         if (cfg) {
294             pxs_mutex_lock.unlock();
295             cfg->paxos_commit(instance, value);
296             pxs_mutex_lock.lock();
297         }
298     }
299 }
300
301 void
302 acceptor::commit(unsigned instance, const string & value)
303 {
304     lock ml(pxs_mutex);
305     commit(instance, value, ml);
306 }
307
308 string
309 acceptor::dump()
310 {
311     return l->dump();
312 }
313
314 void
315 acceptor::restore(const string & s)
316 {
317     l->restore(s);
318     l->logread();
319 }
320
321
322
323 // For testing purposes
324
325 // Call this from your code between phases prepare and accept of proposer
326 void
327 proposer::breakpoint1()
328 {
329     if (break1) {
330         LOG("Dying at breakpoint 1!");
331         exit(1);
332     }
333 }
334
335 // Call this from your code between phases accept and decide of proposer
336 void
337 proposer::breakpoint2()
338 {
339     if (break2) {
340         LOG("Dying at breakpoint 2!");
341         exit(1);
342     }
343 }
344
345 void
346 proposer::breakpoint(int b)
347 {
348     if (b == 3) {
349         LOG("Proposer: breakpoint 1");
350         break1 = true;
351     } else if (b == 4) {
352         LOG("Proposer: breakpoint 2");
353         break2 = true;
354     }
355 }