a055c366b5bc9fbefd46679931a09b7257d5f094
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3
4 using namespace std::placeholders;
5
6 paxos_change::~paxos_change() {}
7
8 bool isamember(const node_t & m, const nodes_t & nodes) {
9     return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
10 }
11
12 // check if l2 contains a majority of the elements of l1
13 bool majority(const nodes_t & l1, const nodes_t & l2) {
14     auto overlap = (size_t)std::count_if(
15             l1.begin(), l1.end(), std::bind(isamember, _1, l2));
16     return overlap >= (l1.size() >> 1) + 1;
17 }
18
19 // This module implements the proposer and acceptor of the Paxos
20 // distributed algorithm as described by Lamport's "Paxos Made
21 // Simple".  To kick off an instance of Paxos, the caller supplies a
22 // list of nodes, a proposed value, and invokes the proposer.  If the
23 // majority of the nodes agree on the proposed value after running
24 // this instance of Paxos, the acceptor invokes the upcall
25 // paxos_commit to inform higher layers of the agreed value for this
26 // instance.
27
28 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
29         bool _first, const node_t & _me, const value_t & _value)
30     : delegate(_delegate), me (_me)
31 {
32     // at this point, the log has already been replayed
33     if (instance_h == 0 && _first) {
34         values[1] = _value;
35         l.loginstance(1, _value);
36         instance_h = 1;
37     }
38
39     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
40     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
41     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
42 }
43
44 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
45 {
46     lock ml(proposer_mutex);
47     LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
48     if (!stable) {  // already running proposer?
49         LOG << "paxos proposer already running";
50         return false;
51     }
52     stable = false;
53     bool r = false;
54     proposal.n = std::max(promise.n, proposal.n) + 1;
55     nodes_t accepts;
56     value_t v;
57     if (prepare(instance, accepts, cur_nodes, v)) {
58
59         if (majority(cur_nodes, accepts)) {
60             LOG << "received a majority of prepare responses";
61
62             if (!v.size())
63                 v = newv;
64
65             breakpoint1();
66
67             nodes_t nodes;
68             nodes.swap(accepts);
69             accept(instance, accepts, nodes, v);
70
71             if (majority(cur_nodes, accepts)) {
72                 LOG << "received a majority of accept responses";
73
74                 breakpoint2();
75
76                 decide(instance, accepts, v);
77                 r = true;
78             } else {
79                 LOG << "no majority of accept responses";
80             }
81         } else {
82             LOG << "no majority of prepare responses";
83         }
84     } else {
85         LOG << "prepare is rejected " << stable;
86     }
87     stable = true;
88     return r;
89 }
90
91 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
92         const nodes_t & nodes, value_t & v) {
93     LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
94     prepareres res;
95     prop_t highest_n_a{0, ""};
96     for (auto i : nodes) {
97         handle h(i);
98         rpcc *r = h.safebind();
99         if (!r)
100             continue;
101         auto status = (paxos_protocol::status)r->call_timeout(
102                 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
103         if (status == paxos_protocol::OK) {
104             if (res.oldinstance) {
105                 LOG << "commiting old instance!";
106                 commit(instance, res.v_a);
107                 return false;
108             }
109             LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
110                 << "v_a=\"" << res.v_a << "\"";
111             if (res.accept) {
112                 accepts.push_back(i);
113                 if (res.n_a >= highest_n_a) {
114                     LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
115                     v = res.v_a;
116                     highest_n_a = res.n_a;
117                 }
118             }
119         }
120     }
121     return true;
122 }
123
124 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
125         const nodes_t & nodes, const value_t & v) {
126     for (auto i : nodes) {
127         handle h(i);
128         rpcc *r = h.safebind();
129         if (!r)
130             continue;
131         bool accept = false;
132         int status = r->call_timeout(
133                 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
134         if (status == paxos_protocol::OK && accept)
135             accepts.push_back(i);
136     }
137 }
138
139 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
140     for (auto i : accepts) {
141         handle h(i);
142         rpcc *r = h.safebind();
143         if (!r)
144             continue;
145         int res = 0;
146         r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
147     }
148 }
149
150 paxos_protocol::status
151 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
152     LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
153     lock ml(acceptor_mutex);
154     r.oldinstance = false;
155     r.accept = false;
156     r.n_a = accepted;
157     r.v_a = accepted_value;
158     if (instance <= instance_h) {
159         LOG << "old instance " << instance << " has value " << values[instance];
160         r.oldinstance = true;
161         r.v_a = values[instance];
162     } else if (n > promise) {
163         LOG << "looks good to me";
164         promise = n;
165         l.logprop(promise);
166         r.accept = true;
167     } else {
168         LOG << "I totally rejected this request.  Ha.";
169     }
170     LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
171         << "v_a=\"" << r.v_a << "\"";
172     return paxos_protocol::OK;
173 }
174
175 paxos_protocol::status
176 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
177     lock ml(acceptor_mutex);
178     r = false;
179     if (instance == instance_h + 1) {
180         if (n >= promise) {
181             accepted = n;
182             accepted_value = v;
183             l.logaccept(accepted, accepted_value);
184             r = true;
185         }
186         return paxos_protocol::OK;
187     } else {
188         return paxos_protocol::ERR;
189     }
190 }
191
192 paxos_protocol::status
193 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
194     lock ml(acceptor_mutex);
195     LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
196     if (instance == instance_h + 1) {
197         VERIFY(accepted_value == v);
198         commit(instance, accepted_value, ml);
199     } else if (instance <= instance_h) {
200         // we are ahead; ignore.
201     } else {
202         // we are behind.
203         VERIFY(0);
204     }
205     return paxos_protocol::OK;
206 }
207
208 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
209     lock ml(acceptor_mutex);
210     commit(instance, value, ml);
211 }
212
213 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
214     LOG << "instance=" << instance << " has v=" << value;
215     if (instance > instance_h) {
216         LOG << "highestacceptedinstance = " << instance;
217         values[instance] = value;
218         l.loginstance(instance, value);
219         instance_h = instance;
220         accepted = promise = {0, me};
221         string v = value; // gaaahhh aliasing of value and accepted_value
222         accepted_value.clear(); // this wipes out "value", too
223         if (delegate) {
224             pxs_mutex_lock.unlock();
225             delegate->paxos_commit(instance, v);
226             pxs_mutex_lock.lock();
227         }
228     }
229 }
230
231 // For testing purposes
232 void proposer_acceptor::breakpoint1() {
233     if (break1) {
234         LOG << "Dying at breakpoint 1!";
235         exit(1);
236     }
237 }
238
239 void proposer_acceptor::breakpoint2() {
240     if (break2) {
241         LOG << "Dying at breakpoint 2!";
242         exit(1);
243     }
244 }
245
246 void proposer_acceptor::breakpoint(int b) {
247     if (b == 3) {
248         LOG << "breakpoint 1";
249         break1 = true;
250     } else if (b == 4) {
251         LOG << "breakpoint 2";
252         break2 = true;
253     }
254 }