4 using namespace std::placeholders;
6 paxos_change::~paxos_change() {}
8 bool isamember(const node_t & m, const nodes_t & nodes) {
9 return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
12 // check if l2 contains a majority of the elements of l1
13 bool majority(const nodes_t & l1, const nodes_t & l2) {
14 auto overlap = (size_t)std::count_if(
15 l1.begin(), l1.end(), std::bind(isamember, _1, l2));
16 return overlap >= (l1.size() >> 1) + 1;
19 // This module implements the proposer and acceptor of the Paxos
20 // distributed algorithm as described by Lamport's "Paxos Made
21 // Simple". To kick off an instance of Paxos, the caller supplies a
22 // list of nodes, a proposed value, and invokes the proposer. If the
23 // majority of the nodes agree on the proposed value after running
24 // this instance of Paxos, the acceptor invokes the upcall
25 // paxos_commit to inform higher layers of the agreed value for this
28 proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
29 bool _first, const node_t & _me, const value_t & _value)
30 : delegate(_delegate), me (_me)
32 // at this point, the log has already been replayed
33 if (instance_h == 0 && _first) {
35 l.loginstance(1, _value);
39 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
40 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
41 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
44 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
46 lock ml(proposer_mutex);
47 LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
48 if (!stable) { // already running proposer?
49 LOG << "paxos proposer already running";
54 proposal.n = std::max(promise.n, proposal.n) + 1;
57 if (prepare(instance, accepts, cur_nodes, v)) {
59 if (majority(cur_nodes, accepts)) {
60 LOG << "received a majority of prepare responses";
69 accept(instance, accepts, nodes, v);
71 if (majority(cur_nodes, accepts)) {
72 LOG << "received a majority of accept responses";
76 decide(instance, accepts, v);
79 LOG << "no majority of accept responses";
82 LOG << "no majority of prepare responses";
85 LOG << "prepare is rejected " << stable;
91 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
92 const nodes_t & nodes, value_t & v) {
93 LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
95 prop_t highest_n_a{0, ""};
96 for (auto i : nodes) {
98 rpcc *r = h.safebind();
101 auto status = (paxos_protocol::status)r->call_timeout(
102 paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
103 if (status == paxos_protocol::OK) {
104 if (res.oldinstance) {
105 LOG << "commiting old instance!";
106 commit(instance, res.v_a);
109 LOG << "preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") "
110 << "v_a=\"" << res.v_a << "\"";
112 accepts.push_back(i);
113 if (res.n_a >= highest_n_a) {
114 LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
116 highest_n_a = res.n_a;
124 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
125 const nodes_t & nodes, const value_t & v) {
126 for (auto i : nodes) {
128 rpcc *r = h.safebind();
132 int status = r->call_timeout(
133 paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
134 if (status == paxos_protocol::OK && accept)
135 accepts.push_back(i);
139 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
140 for (auto i : accepts) {
142 rpcc *r = h.safebind();
146 r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
150 paxos_protocol::status
151 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
152 LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
153 lock ml(acceptor_mutex);
154 r.oldinstance = false;
157 r.v_a = accepted_value;
158 if (instance <= instance_h) {
159 LOG << "old instance " << instance << " has value " << values[instance];
160 r.oldinstance = true;
161 r.v_a = values[instance];
162 } else if (n > promise) {
163 LOG << "looks good to me";
168 LOG << "I totally rejected this request. Ha.";
170 LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") "
171 << "v_a=\"" << r.v_a << "\"";
172 return paxos_protocol::OK;
175 paxos_protocol::status
176 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
177 lock ml(acceptor_mutex);
179 if (instance == instance_h + 1) {
183 l.logaccept(accepted, accepted_value);
186 return paxos_protocol::OK;
188 return paxos_protocol::ERR;
192 paxos_protocol::status
193 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
194 lock ml(acceptor_mutex);
195 LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
196 if (instance == instance_h + 1) {
197 VERIFY(accepted_value == v);
198 commit(instance, accepted_value, ml);
199 } else if (instance <= instance_h) {
200 // we are ahead; ignore.
205 return paxos_protocol::OK;
208 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
209 lock ml(acceptor_mutex);
210 commit(instance, value, ml);
213 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
214 LOG << "instance=" << instance << " has v=" << value;
215 if (instance > instance_h) {
216 LOG << "highestacceptedinstance = " << instance;
217 values[instance] = value;
218 l.loginstance(instance, value);
219 instance_h = instance;
220 accepted = promise = {0, me};
221 string v = value; // gaaahhh aliasing of value and accepted_value
222 accepted_value.clear(); // this wipes out "value", too
224 pxs_mutex_lock.unlock();
225 delegate->paxos_commit(instance, v);
226 pxs_mutex_lock.lock();
231 // For testing purposes
232 void proposer_acceptor::breakpoint1() {
234 LOG << "Dying at breakpoint 1!";
239 void proposer_acceptor::breakpoint2() {
241 LOG << "Dying at breakpoint 2!";
246 void proposer_acceptor::breakpoint(int b) {
248 LOG << "breakpoint 1";
251 LOG << "breakpoint 2";