Cosmetic improvements.
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3
4 bool isamember(const node_t & m, const nodes_t & nodes) {
5     return find(nodes.begin(), nodes.end(), m) != nodes.end();
6 }
7
8 // check if l2 contains a majority of the elements of l1
9 bool majority(const nodes_t & l1, const nodes_t & l2) {
10     auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
11     return overlap >= (l1.size() >> 1) + 1;
12 }
13
14 // This module implements the proposer and acceptor of the Paxos
15 // distributed algorithm as described by Lamport's "Paxos Made
16 // Simple".  To kick off an instance of Paxos, the caller supplies a
17 // list of nodes, a proposed value, and invokes the proposer.  If the
18 // majority of the nodes agree on the proposed value after running
19 // this instance of Paxos, the acceptor invokes the upcall
20 // paxos_commit to inform higher layers of the agreed value for this
21 // instance.
22
23 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
24         bool _first, const node_t & _me, const value_t & _value)
25     : delegate(_delegate), me (_me)
26 {
27     // at this point, the log has already been replayed
28     if (instance_h == 0 && _first) {
29         values[1] = _value;
30         l.loginstance(1, _value);
31         instance_h = 1;
32     }
33
34     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
35     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
36     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
37 }
38
39 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
40 {
41     lock ml(proposer_mutex);
42     LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
43     if (!stable) {  // already running proposer?
44         LOG("paxos proposer already running");
45         return false;
46     }
47     stable = false;
48     bool r = false;
49     proposal.n = max(promise.n, proposal.n) + 1;
50     nodes_t accepts;
51     value_t v;
52     if (prepare(instance, accepts, cur_nodes, v)) {
53
54         if (majority(cur_nodes, accepts)) {
55             LOG("received a majority of prepare responses");
56
57             if (!v.size())
58                 v = newv;
59
60             breakpoint1();
61
62             nodes_t nodes;
63             nodes.swap(accepts);
64             accept(instance, accepts, nodes, v);
65
66             if (majority(cur_nodes, accepts)) {
67                 LOG("received a majority of accept responses");
68
69                 breakpoint2();
70
71                 decide(instance, accepts, v);
72                 r = true;
73             } else {
74                 LOG("no majority of accept responses");
75             }
76         } else {
77             LOG("no majority of prepare responses");
78         }
79     } else {
80         LOG("prepare is rejected " << stable);
81     }
82     stable = true;
83     return r;
84 }
85
86 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
87         const nodes_t & nodes, value_t & v) {
88     LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
89     prepareres res;
90     prop_t highest_n_a{0, ""};
91     for (auto i : nodes) {
92         handle h(i);
93         rpcc *r = h.safebind();
94         if (!r)
95             continue;
96         auto status = (paxos_protocol::status)r->call_timeout(
97                 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
98         if (status == paxos_protocol::OK) {
99             if (res.oldinstance) {
100                 LOG("commiting old instance!");
101                 commit(instance, res.v_a);
102                 return false;
103             }
104             LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
105                     "v_a=\"" << res.v_a << "\"");
106             if (res.accept) {
107                 accepts.push_back(i);
108                 if (res.n_a >= highest_n_a) {
109                     LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
110                     v = res.v_a;
111                     highest_n_a = res.n_a;
112                 }
113             }
114         }
115     }
116     return true;
117 }
118
119 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
120         const nodes_t & nodes, const value_t & v) {
121     for (auto i : nodes) {
122         handle h(i);
123         rpcc *r = h.safebind();
124         if (!r)
125             continue;
126         bool accept = false;
127         int status = r->call_timeout(
128                 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
129         if (status == paxos_protocol::OK && accept)
130             accepts.push_back(i);
131     }
132 }
133
134 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
135     for (auto i : accepts) {
136         handle h(i);
137         rpcc *r = h.safebind();
138         if (!r)
139             continue;
140         int res = 0;
141         r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
142     }
143 }
144
145 paxos_protocol::status
146 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
147     LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
148     lock ml(acceptor_mutex);
149     r.oldinstance = false;
150     r.accept = false;
151     r.n_a = accepted;
152     r.v_a = accepted_value;
153     if (instance <= instance_h) {
154         LOG("old instance " << instance << " has value " << values[instance]);
155         r.oldinstance = true;
156         r.v_a = values[instance];
157     } else if (n > promise) {
158         LOG("looks good to me");
159         promise = n;
160         l.logprop(promise);
161         r.accept = true;
162     } else {
163         LOG("I totally rejected this request.  Ha.");
164     }
165     LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
166         "v_a=\"" << r.v_a << "\"");
167     return paxos_protocol::OK;
168 }
169
170 paxos_protocol::status
171 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
172     lock ml(acceptor_mutex);
173     r = false;
174     if (instance == instance_h + 1) {
175         if (n >= promise) {
176             accepted = n;
177             accepted_value = v;
178             l.logaccept(accepted, accepted_value);
179             r = true;
180         }
181         return paxos_protocol::OK;
182     } else {
183         return paxos_protocol::ERR;
184     }
185 }
186
187 paxos_protocol::status
188 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
189     lock ml(acceptor_mutex);
190     LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
191     if (instance == instance_h + 1) {
192         VERIFY(accepted_value == v);
193         commit(instance, accepted_value, ml);
194     } else if (instance <= instance_h) {
195         // we are ahead; ignore.
196     } else {
197         // we are behind.
198         VERIFY(0);
199     }
200     return paxos_protocol::OK;
201 }
202
203 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
204     lock ml(acceptor_mutex);
205     commit(instance, value, ml);
206 }
207
208 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
209     LOG("instance=" << instance << " has v=" << value);
210     if (instance > instance_h) {
211         LOG("highestacceptedinstance = " << instance);
212         values[instance] = value;
213         l.loginstance(instance, value);
214         instance_h = instance;
215         accepted = promise = {0, me};
216         string v = value; // gaaahhh aliasing of value and accepted_value
217         accepted_value.clear(); // this wipes out "value", too
218         if (delegate) {
219             pxs_mutex_lock.unlock();
220             delegate->paxos_commit(instance, v);
221             pxs_mutex_lock.lock();
222         }
223     }
224 }
225
226 // For testing purposes
227 void proposer_acceptor::breakpoint1() {
228     if (break1) {
229         LOG("Dying at breakpoint 1!");
230         exit(1);
231     }
232 }
233
234 void proposer_acceptor::breakpoint2() {
235     if (break2) {
236         LOG("Dying at breakpoint 2!");
237         exit(1);
238     }
239 }
240
241 void proposer_acceptor::breakpoint(int b) {
242     if (b == 3) {
243         LOG("breakpoint 1");
244         break1 = true;
245     } else if (b == 4) {
246         LOG("breakpoint 2");
247         break2 = true;
248     }
249 }