4 bool isamember(const node_t & m, const nodes_t & nodes) {
5 return find(nodes.begin(), nodes.end(), m) != nodes.end();
8 // check if l2 contains a majority of the elements of l1
9 bool majority(const nodes_t &l1, const nodes_t &l2) {
10 auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
11 return overlap >= (l1.size() >> 1) + 1;
14 // This module implements the proposer and acceptor of the Paxos
15 // distributed algorithm as described by Lamport's "Paxos Made
16 // Simple". To kick off an instance of Paxos, the caller supplies a
17 // list of nodes, a proposed value, and invokes the proposer. If the
18 // majority of the nodes agree on the proposed value after running
19 // this instance of Paxos, the acceptor invokes the upcall
20 // paxos_commit to inform higher layers of the agreed value for this
23 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
24 bool _first, const node_t & _me, const value_t & _value)
25 : delegate(_delegate), me (_me)
27 // at this point, the log has already been replayed
28 if (instance_h == 0 && _first) {
30 l.loginstance(1, _value);
34 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
35 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
36 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
39 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
41 lock ml(proposer_mutex);
42 LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
43 if (!stable) { // already running proposer?
44 LOG("paxos proposer already running");
49 proposal.n = max(promise.n, proposal.n) + 1;
52 if (prepare(instance, accepts, cur_nodes, v)) {
54 if (majority(cur_nodes, accepts)) {
55 LOG("received a majority of prepare responses");
61 accept(instance, accepts, nodes, v);
63 if (majority(cur_nodes, accepts)) {
64 LOG("received a majority of accept responses");
68 decide(instance, accepts, v);
71 LOG("no majority of accept responses");
74 LOG("no majority of prepare responses");
77 LOG("prepare is rejected " << stable);
83 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
84 const nodes_t & nodes, value_t & v) {
85 LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
87 prop_t highest_n_a{0, ""};
88 for (auto i : nodes) {
90 rpcc *r = h.safebind();
93 auto status = (paxos_protocol::status)r->call_timeout(
94 paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, proposal);
95 if (status == paxos_protocol::OK) {
96 if (res.oldinstance) {
97 LOG("commiting old instance!");
98 commit(instance, res.v_a);
101 LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
102 "v_a=\"" << res.v_a << "\"");
104 accepts.push_back(i);
105 if (res.n_a >= highest_n_a) {
106 LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
108 highest_n_a = res.n_a;
116 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
117 const nodes_t & nodes, const value_t & v) {
118 for (auto i : nodes) {
120 rpcc *r = h.safebind();
124 int status = r->call_timeout(
125 paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, proposal, v);
126 if (status == paxos_protocol::OK && accept)
127 accepts.push_back(i);
131 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
132 for (auto i : accepts) {
134 rpcc *r = h.safebind();
138 r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
142 paxos_protocol::status
143 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
144 LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
145 lock ml(acceptor_mutex);
146 r.oldinstance = false;
149 r.v_a = accepted_value;
150 if (instance <= instance_h) {
151 LOG("old instance " << instance << " has value " << values[instance]);
152 r.oldinstance = true;
153 r.v_a = values[instance];
154 } else if (n > promise) {
155 LOG("looks good to me");
160 LOG("I totally rejected this request. Ha.");
162 LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
163 "v_a=\"" << r.v_a << "\"");
164 return paxos_protocol::OK;
167 paxos_protocol::status
168 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
169 lock ml(acceptor_mutex);
171 if (instance == instance_h + 1) {
175 l.logaccept(accepted, accepted_value);
178 return paxos_protocol::OK;
180 return paxos_protocol::ERR;
184 paxos_protocol::status
185 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
186 lock ml(acceptor_mutex);
187 LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
188 if (instance == instance_h + 1) {
189 VERIFY(accepted_value == v);
190 commit(instance, accepted_value, ml);
191 } else if (instance <= instance_h) {
192 // we are ahead; ignore.
197 return paxos_protocol::OK;
200 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
201 lock ml(acceptor_mutex);
202 commit(instance, value, ml);
205 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
206 LOG("acceptor::commit: instance=" << instance << " has v=" << value);
207 if (instance > instance_h) {
208 LOG("commit: highestacceptedinstance = " << instance);
209 values[instance] = value;
210 l.loginstance(instance, value);
211 instance_h = instance;
212 accepted = promise = {0, me};
213 accepted_value.clear();
215 pxs_mutex_lock.unlock();
216 delegate->paxos_commit(instance, value);
217 pxs_mutex_lock.lock();
222 // For testing purposes
223 void proposer_acceptor::breakpoint1() {
225 LOG("Dying at breakpoint 1!");
230 void proposer_acceptor::breakpoint2() {
232 LOG("Dying at breakpoint 2!");
237 void proposer_acceptor::breakpoint(int b) {
239 LOG("Proposer: breakpoint 1");
242 LOG("Proposer: breakpoint 2");