4 paxos_change::~paxos_change() {}
6 bool isamember(const node_t & m, const nodes_t & nodes) {
7 return find(nodes.begin(), nodes.end(), m) != nodes.end();
10 // check if l2 contains a majority of the elements of l1
11 bool majority(const nodes_t & l1, const nodes_t & l2) {
12 auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
13 return overlap >= (l1.size() >> 1) + 1;
16 // This module implements the proposer and acceptor of the Paxos
17 // distributed algorithm as described by Lamport's "Paxos Made
18 // Simple". To kick off an instance of Paxos, the caller supplies a
19 // list of nodes, a proposed value, and invokes the proposer. If the
20 // majority of the nodes agree on the proposed value after running
21 // this instance of Paxos, the acceptor invokes the upcall
22 // paxos_commit to inform higher layers of the agreed value for this
25 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
26 bool _first, const node_t & _me, const value_t & _value)
27 : delegate(_delegate), me (_me)
29 // at this point, the log has already been replayed
30 if (instance_h == 0 && _first) {
32 l.loginstance(1, _value);
36 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
37 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
38 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
41 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
43 lock ml(proposer_mutex);
44 LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
45 if (!stable) { // already running proposer?
46 LOG("paxos proposer already running");
51 proposal.n = max(promise.n, proposal.n) + 1;
54 if (prepare(instance, accepts, cur_nodes, v)) {
56 if (majority(cur_nodes, accepts)) {
57 LOG("received a majority of prepare responses");
66 accept(instance, accepts, nodes, v);
68 if (majority(cur_nodes, accepts)) {
69 LOG("received a majority of accept responses");
73 decide(instance, accepts, v);
76 LOG("no majority of accept responses");
79 LOG("no majority of prepare responses");
82 LOG("prepare is rejected " << stable);
88 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
89 const nodes_t & nodes, value_t & v) {
90 LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
92 prop_t highest_n_a{0, ""};
93 for (auto i : nodes) {
95 rpcc *r = h.safebind();
98 auto status = (paxos_protocol::status)r->call_timeout(
99 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
100 if (status == paxos_protocol::OK) {
101 if (res.oldinstance) {
102 LOG("commiting old instance!");
103 commit(instance, res.v_a);
106 LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
107 "v_a=\"" << res.v_a << "\"");
109 accepts.push_back(i);
110 if (res.n_a >= highest_n_a) {
111 LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
113 highest_n_a = res.n_a;
121 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
122 const nodes_t & nodes, const value_t & v) {
123 for (auto i : nodes) {
125 rpcc *r = h.safebind();
129 int status = r->call_timeout(
130 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
131 if (status == paxos_protocol::OK && accept)
132 accepts.push_back(i);
136 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
137 for (auto i : accepts) {
139 rpcc *r = h.safebind();
143 r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
147 paxos_protocol::status
148 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
149 LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
150 lock ml(acceptor_mutex);
151 r.oldinstance = false;
154 r.v_a = accepted_value;
155 if (instance <= instance_h) {
156 LOG("old instance " << instance << " has value " << values[instance]);
157 r.oldinstance = true;
158 r.v_a = values[instance];
159 } else if (n > promise) {
160 LOG("looks good to me");
165 LOG("I totally rejected this request. Ha.");
167 LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
168 "v_a=\"" << r.v_a << "\"");
169 return paxos_protocol::OK;
172 paxos_protocol::status
173 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
174 lock ml(acceptor_mutex);
176 if (instance == instance_h + 1) {
180 l.logaccept(accepted, accepted_value);
183 return paxos_protocol::OK;
185 return paxos_protocol::ERR;
189 paxos_protocol::status
190 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
191 lock ml(acceptor_mutex);
192 LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
193 if (instance == instance_h + 1) {
194 VERIFY(accepted_value == v);
195 commit(instance, accepted_value, ml);
196 } else if (instance <= instance_h) {
197 // we are ahead; ignore.
202 return paxos_protocol::OK;
205 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
206 lock ml(acceptor_mutex);
207 commit(instance, value, ml);
210 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
211 LOG("instance=" << instance << " has v=" << value);
212 if (instance > instance_h) {
213 LOG("highestacceptedinstance = " << instance);
214 values[instance] = value;
215 l.loginstance(instance, value);
216 instance_h = instance;
217 accepted = promise = {0, me};
218 string v = value; // gaaahhh aliasing of value and accepted_value
219 accepted_value.clear(); // this wipes out "value", too
221 pxs_mutex_lock.unlock();
222 delegate->paxos_commit(instance, v);
223 pxs_mutex_lock.lock();
228 // For testing purposes
229 void proposer_acceptor::breakpoint1() {
231 LOG("Dying at breakpoint 1!");
236 void proposer_acceptor::breakpoint2() {
238 LOG("Dying at breakpoint 2!");
243 void proposer_acceptor::breakpoint(int b) {