So many changes. Broken.
[invirt/third/libt4.git] / paxos.cc
1 #include "include/paxos.h"
2
3 using namespace std::placeholders;
4 using namespace std::chrono;
5
6 paxos_change::~paxos_change() {}
7
8 bool isamember(const node_t & m, const nodes_t & nodes) {
9     return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
10 }
11
12 // check if l2 contains a majority of the elements of l1
13 bool majority(const nodes_t & l1, const nodes_t & l2) {
14     auto overlap = (size_t)std::count_if(
15             l1.begin(), l1.end(), std::bind(isamember, _1, l2));
16     return overlap >= (l1.size() >> 1) + 1;
17 }
18
19 // This module implements the proposer and acceptor of the Paxos
20 // distributed algorithm as described by Lamport's "Paxos Made
21 // Simple".  To kick off an instance of Paxos, the caller supplies a
22 // list of nodes, a proposed value, and invokes the proposer.  If the
23 // majority of the nodes agree on the proposed value after running
24 // this instance of Paxos, the acceptor invokes the upcall
25 // paxos_commit to inform higher layers of the agreed value for this
26 // instance.
27
28 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
29         bool _first, const node_t & _me, const value_t & _value)
30     : delegate(_delegate), me (_me)
31 {
32     l.handler<log_instance>([this] (auto entry) {
33         instance_h = entry.number;
34         values[entry.number] = entry.value;
35         accepted = promise = {0, me};
36         accepted_value.clear();
37     });
38     l.handler<log_proposal>([this] (auto entry) {
39         promise = entry.promise;
40     });
41     l.handler<log_accept>([this] (auto entry) {
42         accepted = entry.number;
43         accepted_value = entry.value;
44     });
45
46     if (instance_h == 0 && _first)
47         l.append(log_instance{1, _value});
48
49     l.replay();
50
51     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
52     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
53     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
54 }
55
56 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
57 {
58     lock ml(proposer_mutex);
59     LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
60     if (!stable) {  // already running proposer?
61         LOG << "paxos proposer already running";
62         return false;
63     }
64     stable = false;
65     bool r = false;
66     proposal.n = std::max(promise.n, proposal.n) + 1;
67     nodes_t accepts;
68     value_t v;
69     if (prepare(instance, accepts, cur_nodes, v)) {
70
71         if (majority(cur_nodes, accepts)) {
72             LOG << "received a majority of prepare responses";
73
74             if (!v.size())
75                 v = newv;
76
77             breakpoint1();
78
79             nodes_t nodes;
80             nodes.swap(accepts);
81             accept(instance, accepts, nodes, v);
82
83             if (majority(cur_nodes, accepts)) {
84                 LOG << "received a majority of accept responses";
85
86                 breakpoint2();
87
88                 decide(instance, accepts, v);
89                 r = true;
90             } else {
91                 LOG << "no majority of accept responses";
92             }
93         } else {
94             LOG << "no majority of prepare responses";
95         }
96     } else {
97         LOG << "prepare is rejected " << stable;
98     }
99     stable = true;
100     return r;
101 }
102
103 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
104         const nodes_t & nodes, value_t & v) {
105     LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
106     prepareres res;
107     prop_t highest_n_a{0, ""};
108     for (auto i : nodes) {
109         auto cl = rpcc::bind_cached(i);
110         if (!cl)
111             continue;
112         int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
113                 res, me, instance, proposal);
114         if (status == paxos_protocol::OK) {
115             LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
116                 << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\"";
117             if (res.type == prepareres::oldinstance) {
118                 LOG << "commiting old instance!";
119                 lock ml(acceptor_mutex);
120                 commit(instance, res.v_a, ml);
121                 return false;
122             } else if (res.type == prepareres::accept) {
123                 accepts.push_back(i);
124                 if (res.n_a >= highest_n_a) {
125                     LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
126                     v = res.v_a;
127                     highest_n_a = res.n_a;
128                 }
129             }
130         }
131     }
132     return true;
133 }
134
135 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
136         const nodes_t & nodes, const value_t & v) {
137     bool accept = false;
138     for (auto i : nodes) {
139         if (auto cl = rpcc::bind_cached(i)) {
140             int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
141                     accept, me, instance, proposal, v);
142             if (status == paxos_protocol::OK && accept)
143                 accepts.push_back(i);
144         }
145     }
146 }
147
148 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
149     int res = 0;
150     for (auto i : accepts)
151         if (auto cl = rpcc::bind_cached(i))
152             cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
153 }
154
155 paxos_protocol::status
156 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
157     LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
158     lock ml(acceptor_mutex);
159     if (instance <= instance_h) {
160         LOG << "old instance " << instance << " has value " << values[instance];
161         r = prepareres{prepareres::oldinstance, accepted, values[instance]};
162     } else if (n > promise) {
163         LOG << "looks good to me";
164         promise = n;
165         l.append(log_proposal{n});
166         r = prepareres{prepareres::accept, accepted, accepted_value};
167     } else {
168         LOG << "I totally rejected this request.  Ha.";
169         r = prepareres{prepareres::reject, accepted, accepted_value};
170     }
171     LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept
172         << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\"";
173     return paxos_protocol::OK;
174 }
175
176 paxos_protocol::status
177 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
178     lock ml(acceptor_mutex);
179     r = false;
180     if (instance != instance_h + 1)
181         return paxos_protocol::ERR;
182     if (n >= promise) {
183         accepted = n;
184         accepted_value = v;
185         l.append(log_accept{n, v});
186         r = true;
187     }
188     return paxos_protocol::OK;
189 }
190
191 paxos_protocol::status
192 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
193     lock ml(acceptor_mutex);
194     LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
195     if (instance == instance_h + 1) {
196         VERIFY(accepted_value == v);
197         commit(instance, v, ml);
198     } else if (instance <= instance_h) {
199         // we are ahead; ignore.
200     } else {
201         // we are behind.
202         VERIFY(0);
203     }
204     return paxos_protocol::OK;
205 }
206
207 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) {
208     VERIFY(&value != &accepted_value); // eited by aliasing?
209     VERIFY(acceptor_mutex_lock);
210     LOG << "instance=" << instance << " has v=" << value;
211     if (instance > instance_h) {
212         LOG << "highestacceptedinstance = " << instance;
213         l.append(log_instance{instance, value});
214         instance_h = instance;
215         values[instance] = value;
216         accepted = promise = {0, me};
217         accepted_value.clear();
218         if (delegate) {
219             acceptor_mutex_lock.unlock();
220             delegate->paxos_commit(instance, value);
221             acceptor_mutex_lock.lock();
222         }
223     }
224 }
225
226 // For testing purposes
227 void proposer_acceptor::breakpoint1() {
228     if (break1) {
229         LOG << "Dying at breakpoint 1!";
230         exit(1);
231     }
232 }
233
234 void proposer_acceptor::breakpoint2() {
235     if (break2) {
236         LOG << "Dying at breakpoint 2!";
237         exit(1);
238     }
239 }
240
241 void proposer_acceptor::breakpoint(int b) {
242     if (b == 3) {
243         LOG << "breakpoint 1";
244         break1 = true;
245     } else if (b == 4) {
246         LOG << "breakpoint 2";
247         break2 = true;
248     }
249 }