Clean-ups
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2
3 using namespace std::placeholders;
4
5 paxos_change::~paxos_change() {}
6
7 bool isamember(const node_t & m, const nodes_t & nodes) {
8     return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
9 }
10
11 // check if l2 contains a majority of the elements of l1
12 bool majority(const nodes_t & l1, const nodes_t & l2) {
13     auto overlap = (size_t)std::count_if(
14             l1.begin(), l1.end(), std::bind(isamember, _1, l2));
15     return overlap >= (l1.size() >> 1) + 1;
16 }
17
18 // This module implements the proposer and acceptor of the Paxos
19 // distributed algorithm as described by Lamport's "Paxos Made
20 // Simple".  To kick off an instance of Paxos, the caller supplies a
21 // list of nodes, a proposed value, and invokes the proposer.  If the
22 // majority of the nodes agree on the proposed value after running
23 // this instance of Paxos, the acceptor invokes the upcall
24 // paxos_commit to inform higher layers of the agreed value for this
25 // instance.
26
27 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
28         bool _first, const node_t & _me, const value_t & _value)
29     : delegate(_delegate), me (_me)
30 {
31     // at this point, the log has already been replayed
32     if (instance_h == 0 && _first) {
33         values[1] = _value;
34         l.loginstance(1, _value);
35         instance_h = 1;
36     }
37
38     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
39     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
40     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
41 }
42
43 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
44 {
45     lock ml(proposer_mutex);
46     LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
47     if (!stable) {  // already running proposer?
48         LOG << "paxos proposer already running";
49         return false;
50     }
51     stable = false;
52     bool r = false;
53     proposal.n = std::max(promise.n, proposal.n) + 1;
54     nodes_t accepts;
55     value_t v;
56     if (prepare(instance, accepts, cur_nodes, v)) {
57
58         if (majority(cur_nodes, accepts)) {
59             LOG << "received a majority of prepare responses";
60
61             if (!v.size())
62                 v = newv;
63
64             breakpoint1();
65
66             nodes_t nodes;
67             nodes.swap(accepts);
68             accept(instance, accepts, nodes, v);
69
70             if (majority(cur_nodes, accepts)) {
71                 LOG << "received a majority of accept responses";
72
73                 breakpoint2();
74
75                 decide(instance, accepts, v);
76                 r = true;
77             } else {
78                 LOG << "no majority of accept responses";
79             }
80         } else {
81             LOG << "no majority of prepare responses";
82         }
83     } else {
84         LOG << "prepare is rejected " << stable;
85     }
86     stable = true;
87     return r;
88 }
89
90 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
91         const nodes_t & nodes, value_t & v) {
92     LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
93     prepareres res;
94     prop_t highest_n_a{0, ""};
95     for (auto i : nodes) {
96         auto r = rpcc::bind_cached(i);
97         if (!r)
98             continue;
99         auto status = (paxos_protocol::status)r->call_timeout(
100                 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
101         if (status == paxos_protocol::OK) {
102             if (res.oldinstance) {
103                 LOG << "commiting old instance!";
104                 commit(instance, res.v_a);
105                 return false;
106             }
107             LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
108                 << "v_a=\"" << res.v_a << "\"";
109             if (res.accept) {
110                 accepts.push_back(i);
111                 if (res.n_a >= highest_n_a) {
112                     LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
113                     v = res.v_a;
114                     highest_n_a = res.n_a;
115                 }
116             }
117         }
118     }
119     return true;
120 }
121
122 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
123         const nodes_t & nodes, const value_t & v) {
124     for (auto i : nodes) {
125         auto r = rpcc::bind_cached(i);
126         if (!r)
127             continue;
128         bool accept = false;
129         int status = r->call_timeout(
130                 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
131         if (status == paxos_protocol::OK && accept)
132             accepts.push_back(i);
133     }
134 }
135
136 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
137     for (auto i : accepts) {
138         auto r = rpcc::bind_cached(i);
139         if (!r)
140             continue;
141         int res = 0;
142         r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
143     }
144 }
145
146 paxos_protocol::status
147 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
148     LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
149     lock ml(acceptor_mutex);
150     r.oldinstance = false;
151     r.accept = false;
152     r.n_a = accepted;
153     r.v_a = accepted_value;
154     if (instance <= instance_h) {
155         LOG << "old instance " << instance << " has value " << values[instance];
156         r.oldinstance = true;
157         r.v_a = values[instance];
158     } else if (n > promise) {
159         LOG << "looks good to me";
160         promise = n;
161         l.logprop(promise);
162         r.accept = true;
163     } else {
164         LOG << "I totally rejected this request.  Ha.";
165     }
166     LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
167         << "v_a=\"" << r.v_a << "\"";
168     return paxos_protocol::OK;
169 }
170
171 paxos_protocol::status
172 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
173     lock ml(acceptor_mutex);
174     r = false;
175     if (instance == instance_h + 1) {
176         if (n >= promise) {
177             accepted = n;
178             accepted_value = v;
179             l.logaccept(accepted, accepted_value);
180             r = true;
181         }
182         return paxos_protocol::OK;
183     } else {
184         return paxos_protocol::ERR;
185     }
186 }
187
188 paxos_protocol::status
189 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
190     lock ml(acceptor_mutex);
191     LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
192     if (instance == instance_h + 1) {
193         VERIFY(accepted_value == v);
194         commit(instance, accepted_value, ml);
195     } else if (instance <= instance_h) {
196         // we are ahead; ignore.
197     } else {
198         // we are behind.
199         VERIFY(0);
200     }
201     return paxos_protocol::OK;
202 }
203
204 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
205     lock ml(acceptor_mutex);
206     commit(instance, value, ml);
207 }
208
209 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
210     LOG << "instance=" << instance << " has v=" << value;
211     if (instance > instance_h) {
212         LOG << "highestacceptedinstance = " << instance;
213         values[instance] = value;
214         l.loginstance(instance, value);
215         instance_h = instance;
216         accepted = promise = {0, me};
217         string v = value; // gaaahhh aliasing of value and accepted_value
218         accepted_value.clear(); // this wipes out "value", too
219         if (delegate) {
220             pxs_mutex_lock.unlock();
221             delegate->paxos_commit(instance, v);
222             pxs_mutex_lock.lock();
223         }
224     }
225 }
226
227 // For testing purposes
228 void proposer_acceptor::breakpoint1() {
229     if (break1) {
230         LOG << "Dying at breakpoint 1!";
231         exit(1);
232     }
233 }
234
235 void proposer_acceptor::breakpoint2() {
236     if (break2) {
237         LOG << "Dying at breakpoint 2!";
238         exit(1);
239     }
240 }
241
242 void proposer_acceptor::breakpoint(int b) {
243     if (b == 3) {
244         LOG << "breakpoint 1";
245         break1 = true;
246     } else if (b == 4) {
247         LOG << "breakpoint 2";
248         break2 = true;
249     }
250 }