4 bool isamember(const node_t & m, const nodes_t & nodes) {
5 return find(nodes.begin(), nodes.end(), m) != nodes.end();
8 // check if l2 contains a majority of the elements of l1
9 bool majority(const nodes_t & l1, const nodes_t & l2) {
10 auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
11 return overlap >= (l1.size() >> 1) + 1;
14 // This module implements the proposer and acceptor of the Paxos
15 // distributed algorithm as described by Lamport's "Paxos Made
16 // Simple". To kick off an instance of Paxos, the caller supplies a
17 // list of nodes, a proposed value, and invokes the proposer. If the
18 // majority of the nodes agree on the proposed value after running
19 // this instance of Paxos, the acceptor invokes the upcall
20 // paxos_commit to inform higher layers of the agreed value for this
23 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
24 bool _first, const node_t & _me, const value_t & _value)
25 : delegate(_delegate), me (_me)
27 // at this point, the log has already been replayed
28 if (instance_h == 0 && _first) {
30 l.loginstance(1, _value);
34 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
35 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
36 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
39 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
41 lock ml(proposer_mutex);
42 LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
43 if (!stable) { // already running proposer?
44 LOG("paxos proposer already running");
49 proposal.n = max(promise.n, proposal.n) + 1;
52 if (prepare(instance, accepts, cur_nodes, v)) {
54 if (majority(cur_nodes, accepts)) {
55 LOG("received a majority of prepare responses");
64 accept(instance, accepts, nodes, v);
66 if (majority(cur_nodes, accepts)) {
67 LOG("received a majority of accept responses");
71 decide(instance, accepts, v);
74 LOG("no majority of accept responses");
77 LOG("no majority of prepare responses");
80 LOG("prepare is rejected " << stable);
86 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
87 const nodes_t & nodes, value_t & v) {
88 LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
90 prop_t highest_n_a{0, ""};
91 for (auto i : nodes) {
93 rpcc *r = h.safebind();
96 auto status = (paxos_protocol::status)r->call_timeout(
97 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
98 if (status == paxos_protocol::OK) {
99 if (res.oldinstance) {
100 LOG("commiting old instance!");
101 commit(instance, res.v_a);
104 LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
105 "v_a=\"" << res.v_a << "\"");
107 accepts.push_back(i);
108 if (res.n_a >= highest_n_a) {
109 LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
111 highest_n_a = res.n_a;
119 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
120 const nodes_t & nodes, const value_t & v) {
121 for (auto i : nodes) {
123 rpcc *r = h.safebind();
127 int status = r->call_timeout(
128 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
129 if (status == paxos_protocol::OK && accept)
130 accepts.push_back(i);
134 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
135 for (auto i : accepts) {
137 rpcc *r = h.safebind();
141 r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
145 paxos_protocol::status
146 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
147 LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
148 lock ml(acceptor_mutex);
149 r.oldinstance = false;
152 r.v_a = accepted_value;
153 if (instance <= instance_h) {
154 LOG("old instance " << instance << " has value " << values[instance]);
155 r.oldinstance = true;
156 r.v_a = values[instance];
157 } else if (n > promise) {
158 LOG("looks good to me");
163 LOG("I totally rejected this request. Ha.");
165 LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
166 "v_a=\"" << r.v_a << "\"");
167 return paxos_protocol::OK;
170 paxos_protocol::status
171 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
172 lock ml(acceptor_mutex);
174 if (instance == instance_h + 1) {
178 l.logaccept(accepted, accepted_value);
181 return paxos_protocol::OK;
183 return paxos_protocol::ERR;
187 paxos_protocol::status
188 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
189 lock ml(acceptor_mutex);
190 LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
191 if (instance == instance_h + 1) {
192 VERIFY(accepted_value == v);
193 commit(instance, accepted_value, ml);
194 } else if (instance <= instance_h) {
195 // we are ahead; ignore.
200 return paxos_protocol::OK;
203 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
204 lock ml(acceptor_mutex);
205 commit(instance, value, ml);
208 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
209 LOG("instance=" << instance << " has v=" << value);
210 if (instance > instance_h) {
211 LOG("highestacceptedinstance = " << instance);
212 values[instance] = value;
213 l.loginstance(instance, value);
214 instance_h = instance;
215 accepted = promise = {0, me};
216 string v = value; // gaaahhh aliasing of value and accepted_value
217 accepted_value.clear(); // this wipes out "value", too
219 pxs_mutex_lock.unlock();
220 delegate->paxos_commit(instance, v);
221 pxs_mutex_lock.lock();
226 // For testing purposes
227 void proposer_acceptor::breakpoint1() {
229 LOG("Dying at breakpoint 1!");
234 void proposer_acceptor::breakpoint2() {
236 LOG("Dying at breakpoint 2!");
241 void proposer_acceptor::breakpoint(int b) {