095d56afd17ff21e29d59f2c645d7c239c6a8482
[invirt/third/libt4.git] / paxos.cc
1 #include "paxos.h"
2 #include "handle.h"
3
4 string print_members(const nodes_t &nodes) {
5     ostringstream ost;
6     copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
7     return ost.str();
8 }
9
10 bool isamember(const node_t & m, const nodes_t & nodes) {
11     return find(nodes.begin(), nodes.end(), m) != nodes.end();
12 }
13
14 // check if l2 contains a majority of the elements of l1
15 bool majority(const nodes_t &l1, const nodes_t &l2) {
16     auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
17     return overlap >= (l1.size() >> 1) + 1;
18 }
19
20 // This module implements the proposer and acceptor of the Paxos
21 // distributed algorithm as described by Lamport's "Paxos Made
22 // Simple".  To kick off an instance of Paxos, the caller supplies a
23 // list of nodes, a proposed value, and invokes the proposer.  If the
24 // majority of the nodes agree on the proposed value after running
25 // this instance of Paxos, the acceptor invokes the upcall
26 // paxos_commit to inform higher layers of the agreed value for this
27 // instance.
28
29 proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
30         bool _first, const node_t & _me, const value_t & _value)
31     : delegate(_delegate), me (_me)
32 {
33     // at this point, the log has already been replayed
34     if (instance_h == 0 && _first) {
35         values[1] = _value;
36         l.loginstance(1, _value);
37         instance_h = 1;
38     }
39
40     pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
41     pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
42     pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
43 }
44
45 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
46 {
47     lock ml(proposer_mutex);
48     LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
49     if (!stable) {  // already running proposer?
50         LOG("proposer::run: already running");
51         return false;
52     }
53     stable = false;
54     bool r = false;
55     my_n.n = std::max(n_h.n, my_n.n) + 1;
56     nodes_t accepts;
57     value_t v = newv;
58     if (prepare(instance, accepts, cur_nodes, v)) {
59
60         if (majority(cur_nodes, accepts)) {
61             LOG("paxos::run: received a majority of prepare responses");
62
63             breakpoint1();
64
65             nodes_t nodes;
66             nodes.swap(accepts);
67             accept(instance, accepts, nodes, v);
68
69             if (majority(cur_nodes, accepts)) {
70                 LOG("paxos::run: received a majority of accept responses");
71
72                 breakpoint2();
73
74                 decide(instance, accepts, v);
75                 r = true;
76             } else {
77                 LOG("paxos::run: no majority of accept responses");
78             }
79         } else {
80             LOG("paxos::run: no majority of prepare responses");
81         }
82     } else {
83         LOG("paxos::run: prepare is rejected " << stable);
84     }
85     stable = true;
86     return r;
87 }
88
89 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
90         const nodes_t & nodes, value_t & v) {
91     prepareres res;
92     prop_t highest_n_a{0, ""};
93     for (auto i : nodes) {
94         handle h(i);
95         rpcc *r = h.safebind();
96         if (!r)
97             continue;
98         auto status = (paxos_protocol::status)r->call_timeout(
99                 paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
100         if (status == paxos_protocol::OK) {
101             if (res.oldinstance) {
102                 LOG("commiting old instance!");
103                 commit(instance, res.v_a);
104                 return false;
105             }
106             if (res.accept) {
107                 accepts.push_back(i);
108                 if (res.n_a >= highest_n_a) {
109                     LOG("found a newer accepted proposal");
110                     v = res.v_a;
111                     highest_n_a = res.n_a;
112                 }
113             }
114         }
115     }
116     return true;
117 }
118
119 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
120         const nodes_t & nodes, const value_t & v) {
121     for (auto i : nodes) {
122         handle h(i);
123         rpcc *r = h.safebind();
124         if (!r)
125             continue;
126         bool accept = false;
127         int status = r->call_timeout(
128                 paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
129         if (status == paxos_protocol::OK && accept)
130             accepts.push_back(i);
131     }
132 }
133
134 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
135     for (auto i : accepts) {
136         handle h(i);
137         rpcc *r = h.safebind();
138         if (!r)
139             continue;
140         int res = 0;
141         r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
142     }
143 }
144
145 paxos_protocol::status
146 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
147     lock ml(acceptor_mutex);
148     r.oldinstance = false;
149     r.accept = false;
150     r.n_a = n_a;
151     r.v_a = v_a;
152     if (instance <= instance_h) {
153         r.oldinstance = true;
154         r.v_a = values[instance];
155     } else if (n > n_h) {
156         n_h = n;
157         l.logprop(n_h);
158         r.accept = true;
159     } else {
160         LOG("I totally rejected this request.  Ha.");
161     }
162     return paxos_protocol::OK;
163 }
164
165 paxos_protocol::status
166 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
167     lock ml(acceptor_mutex);
168     r = false;
169     if (instance == instance_h + 1) {
170         if (n >= n_h) {
171             n_a = n;
172             v_a = v;
173             l.logaccept(n_a, v_a);
174             r = true;
175         }
176         return paxos_protocol::OK;
177     } else {
178         return paxos_protocol::ERR;
179     }
180 }
181
182 paxos_protocol::status
183 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
184     lock ml(acceptor_mutex);
185     LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
186     if (instance == instance_h + 1) {
187         VERIFY(v_a == v);
188         commit(instance, v_a, ml);
189     } else if (instance <= instance_h) {
190         // we are ahead; ignore.
191     } else {
192         // we are behind.
193         VERIFY(0);
194     }
195     return paxos_protocol::OK;
196 }
197
198 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
199     lock ml(acceptor_mutex);
200     commit(instance, value, ml);
201 }
202
203 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
204     LOG("acceptor::commit: instance=" << instance << " has v=" << value);
205     if (instance > instance_h) {
206         LOG("commit: highestacceptedinstance = " << instance);
207         values[instance] = value;
208         l.loginstance(instance, value);
209         instance_h = instance;
210         n_a = n_h = {0, me};
211         v_a.clear();
212         if (delegate) {
213             pxs_mutex_lock.unlock();
214             delegate->paxos_commit(instance, value);
215             pxs_mutex_lock.lock();
216         }
217     }
218 }
219
220 // For testing purposes
221 void proposer_acceptor::breakpoint1() {
222     if (break1) {
223         LOG("Dying at breakpoint 1!");
224         exit(1);
225     }
226 }
227
228 void proposer_acceptor::breakpoint2() {
229     if (break2) {
230         LOG("Dying at breakpoint 2!");
231         exit(1);
232     }
233 }
234
235 void proposer_acceptor::breakpoint(int b) {
236     if (b == 3) {
237         LOG("Proposer: breakpoint 1");
238         break1 = true;
239     } else if (b == 4) {
240         LOG("Proposer: breakpoint 2");
241         break2 = true;
242     }
243 }