Clean-ups and fixes to compile with more warnings enabled and with g++.
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3
4 paxos_change::~paxos_change() {}
5
6 bool isamember(const node_t & m, const nodes_t & nodes) {
7     return find(nodes.begin(), nodes.end(), m) != nodes.end();
8 }
9
10 // check if l2 contains a majority of the elements of l1
11 bool majority(const nodes_t & l1, const nodes_t & l2) {
12     auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
13     return overlap >= (l1.size() >> 1) + 1;
14 }
15
16 // This module implements the proposer and acceptor of the Paxos
17 // distributed algorithm as described by Lamport's "Paxos Made
18 // Simple".  To kick off an instance of Paxos, the caller supplies a
19 // list of nodes, a proposed value, and invokes the proposer.  If the
20 // majority of the nodes agree on the proposed value after running
21 // this instance of Paxos, the acceptor invokes the upcall
22 // paxos_commit to inform higher layers of the agreed value for this
23 // instance.
24
25 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
26         bool _first, const node_t & _me, const value_t & _value)
27     : delegate(_delegate), me (_me)
28 {
29     // at this point, the log has already been replayed
30     if (instance_h == 0 && _first) {
31         values[1] = _value;
32         l.loginstance(1, _value);
33         instance_h = 1;
34     }
35
36     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
37     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
38     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
39 }
40
41 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
42 {
43     lock ml(proposer_mutex);
44     LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
45     if (!stable) {  // already running proposer?
46         LOG("paxos proposer already running");
47         return false;
48     }
49     stable = false;
50     bool r = false;
51     proposal.n = max(promise.n, proposal.n) + 1;
52     nodes_t accepts;
53     value_t v;
54     if (prepare(instance, accepts, cur_nodes, v)) {
55
56         if (majority(cur_nodes, accepts)) {
57             LOG("received a majority of prepare responses");
58
59             if (!v.size())
60                 v = newv;
61
62             breakpoint1();
63
64             nodes_t nodes;
65             nodes.swap(accepts);
66             accept(instance, accepts, nodes, v);
67
68             if (majority(cur_nodes, accepts)) {
69                 LOG("received a majority of accept responses");
70
71                 breakpoint2();
72
73                 decide(instance, accepts, v);
74                 r = true;
75             } else {
76                 LOG("no majority of accept responses");
77             }
78         } else {
79             LOG("no majority of prepare responses");
80         }
81     } else {
82         LOG("prepare is rejected " << stable);
83     }
84     stable = true;
85     return r;
86 }
87
88 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
89         const nodes_t & nodes, value_t & v) {
90     LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
91     prepareres res;
92     prop_t highest_n_a{0, ""};
93     for (auto i : nodes) {
94         handle h(i);
95         rpcc *r = h.safebind();
96         if (!r)
97             continue;
98         auto status = (paxos_protocol::status)r->call_timeout(
99                 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
100         if (status == paxos_protocol::OK) {
101             if (res.oldinstance) {
102                 LOG("commiting old instance!");
103                 commit(instance, res.v_a);
104                 return false;
105             }
106             LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
107                     "v_a=\"" << res.v_a << "\"");
108             if (res.accept) {
109                 accepts.push_back(i);
110                 if (res.n_a >= highest_n_a) {
111                     LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
112                     v = res.v_a;
113                     highest_n_a = res.n_a;
114                 }
115             }
116         }
117     }
118     return true;
119 }
120
121 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
122         const nodes_t & nodes, const value_t & v) {
123     for (auto i : nodes) {
124         handle h(i);
125         rpcc *r = h.safebind();
126         if (!r)
127             continue;
128         bool accept = false;
129         int status = r->call_timeout(
130                 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
131         if (status == paxos_protocol::OK && accept)
132             accepts.push_back(i);
133     }
134 }
135
136 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
137     for (auto i : accepts) {
138         handle h(i);
139         rpcc *r = h.safebind();
140         if (!r)
141             continue;
142         int res = 0;
143         r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
144     }
145 }
146
147 paxos_protocol::status
148 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
149     LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
150     lock ml(acceptor_mutex);
151     r.oldinstance = false;
152     r.accept = false;
153     r.n_a = accepted;
154     r.v_a = accepted_value;
155     if (instance <= instance_h) {
156         LOG("old instance " << instance << " has value " << values[instance]);
157         r.oldinstance = true;
158         r.v_a = values[instance];
159     } else if (n > promise) {
160         LOG("looks good to me");
161         promise = n;
162         l.logprop(promise);
163         r.accept = true;
164     } else {
165         LOG("I totally rejected this request.  Ha.");
166     }
167     LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
168         "v_a=\"" << r.v_a << "\"");
169     return paxos_protocol::OK;
170 }
171
172 paxos_protocol::status
173 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
174     lock ml(acceptor_mutex);
175     r = false;
176     if (instance == instance_h + 1) {
177         if (n >= promise) {
178             accepted = n;
179             accepted_value = v;
180             l.logaccept(accepted, accepted_value);
181             r = true;
182         }
183         return paxos_protocol::OK;
184     } else {
185         return paxos_protocol::ERR;
186     }
187 }
188
189 paxos_protocol::status
190 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
191     lock ml(acceptor_mutex);
192     LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
193     if (instance == instance_h + 1) {
194         VERIFY(accepted_value == v);
195         commit(instance, accepted_value, ml);
196     } else if (instance <= instance_h) {
197         // we are ahead; ignore.
198     } else {
199         // we are behind.
200         VERIFY(0);
201     }
202     return paxos_protocol::OK;
203 }
204
205 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
206     lock ml(acceptor_mutex);
207     commit(instance, value, ml);
208 }
209
210 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
211     LOG("instance=" << instance << " has v=" << value);
212     if (instance > instance_h) {
213         LOG("highestacceptedinstance = " << instance);
214         values[instance] = value;
215         l.loginstance(instance, value);
216         instance_h = instance;
217         accepted = promise = {0, me};
218         string v = value; // gaaahhh aliasing of value and accepted_value
219         accepted_value.clear(); // this wipes out "value", too
220         if (delegate) {
221             pxs_mutex_lock.unlock();
222             delegate->paxos_commit(instance, v);
223             pxs_mutex_lock.lock();
224         }
225     }
226 }
227
228 // For testing purposes
229 void proposer_acceptor::breakpoint1() {
230     if (break1) {
231         LOG("Dying at breakpoint 1!");
232         exit(1);
233     }
234 }
235
236 void proposer_acceptor::breakpoint2() {
237     if (break2) {
238         LOG("Dying at breakpoint 2!");
239         exit(1);
240     }
241 }
242
243 void proposer_acceptor::breakpoint(int b) {
244     if (b == 3) {
245         LOG("breakpoint 1");
246         break1 = true;
247     } else if (b == 4) {
248         LOG("breakpoint 2");
249         break2 = true;
250     }
251 }