3 using namespace std::placeholders;
5 paxos_change::~paxos_change() {}
7 bool isamember(const node_t & m, const nodes_t & nodes) {
8 return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
11 // check if l2 contains a majority of the elements of l1
12 bool majority(const nodes_t & l1, const nodes_t & l2) {
13 auto overlap = (size_t)std::count_if(
14 l1.begin(), l1.end(), std::bind(isamember, _1, l2));
15 return overlap >= (l1.size() >> 1) + 1;
18 // This module implements the proposer and acceptor of the Paxos
19 // distributed algorithm as described by Lamport's "Paxos Made
20 // Simple". To kick off an instance of Paxos, the caller supplies a
21 // list of nodes, a proposed value, and invokes the proposer. If the
22 // majority of the nodes agree on the proposed value after running
23 // this instance of Paxos, the acceptor invokes the upcall
24 // paxos_commit to inform higher layers of the agreed value for this
27 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
28 bool _first, const node_t & _me, const value_t & _value)
29 : delegate(_delegate), me (_me)
31 // at this point, the log has already been replayed
32 if (instance_h == 0 && _first) {
34 l.loginstance(1, _value);
38 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
39 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
40 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
43 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
45 lock ml(proposer_mutex);
46 LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
47 if (!stable) { // already running proposer?
48 LOG << "paxos proposer already running";
53 proposal.n = std::max(promise.n, proposal.n) + 1;
56 if (prepare(instance, accepts, cur_nodes, v)) {
58 if (majority(cur_nodes, accepts)) {
59 LOG << "received a majority of prepare responses";
68 accept(instance, accepts, nodes, v);
70 if (majority(cur_nodes, accepts)) {
71 LOG << "received a majority of accept responses";
75 decide(instance, accepts, v);
78 LOG << "no majority of accept responses";
81 LOG << "no majority of prepare responses";
84 LOG << "prepare is rejected " << stable;
90 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
91 const nodes_t & nodes, value_t & v) {
92 LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
94 prop_t highest_n_a{0, ""};
95 for (auto i : nodes) {
96 auto r = rpcc::bind_cached(i);
99 auto status = (paxos_protocol::status)r->call_timeout(
100 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
101 if (status == paxos_protocol::OK) {
102 if (res.oldinstance) {
103 LOG << "commiting old instance!";
104 commit(instance, res.v_a);
107 LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
108 << "v_a=\"" << res.v_a << "\"";
110 accepts.push_back(i);
111 if (res.n_a >= highest_n_a) {
112 LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
114 highest_n_a = res.n_a;
122 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
123 const nodes_t & nodes, const value_t & v) {
124 for (auto i : nodes) {
125 auto r = rpcc::bind_cached(i);
129 int status = r->call_timeout(
130 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
131 if (status == paxos_protocol::OK && accept)
132 accepts.push_back(i);
136 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
137 for (auto i : accepts) {
138 auto r = rpcc::bind_cached(i);
142 r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
146 paxos_protocol::status
147 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
148 LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
149 lock ml(acceptor_mutex);
150 r.oldinstance = false;
153 r.v_a = accepted_value;
154 if (instance <= instance_h) {
155 LOG << "old instance " << instance << " has value " << values[instance];
156 r.oldinstance = true;
157 r.v_a = values[instance];
158 } else if (n > promise) {
159 LOG << "looks good to me";
164 LOG << "I totally rejected this request. Ha.";
166 LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
167 << "v_a=\"" << r.v_a << "\"";
168 return paxos_protocol::OK;
171 paxos_protocol::status
172 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
173 lock ml(acceptor_mutex);
175 if (instance == instance_h + 1) {
179 l.logaccept(accepted, accepted_value);
182 return paxos_protocol::OK;
184 return paxos_protocol::ERR;
188 paxos_protocol::status
189 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
190 lock ml(acceptor_mutex);
191 LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
192 if (instance == instance_h + 1) {
193 VERIFY(accepted_value == v);
194 commit(instance, accepted_value, ml);
195 } else if (instance <= instance_h) {
196 // we are ahead; ignore.
201 return paxos_protocol::OK;
204 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
205 lock ml(acceptor_mutex);
206 commit(instance, value, ml);
209 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
210 LOG << "instance=" << instance << " has v=" << value;
211 if (instance > instance_h) {
212 LOG << "highestacceptedinstance = " << instance;
213 values[instance] = value;
214 l.loginstance(instance, value);
215 instance_h = instance;
216 accepted = promise = {0, me};
217 string v = value; // gaaahhh aliasing of value and accepted_value
218 accepted_value.clear(); // this wipes out "value", too
220 pxs_mutex_lock.unlock();
221 delegate->paxos_commit(instance, v);
222 pxs_mutex_lock.lock();
227 // For testing purposes
228 void proposer_acceptor::breakpoint1() {
230 LOG << "Dying at breakpoint 1!";
235 void proposer_acceptor::breakpoint2() {
237 LOG << "Dying at breakpoint 2!";
242 void proposer_acceptor::breakpoint(int b) {
244 LOG << "breakpoint 1";
247 LOG << "breakpoint 2";