4 string print_members(const nodes_t &nodes) {
6 copy(nodes.begin(), nodes.end(), ostream_iterator<string>(ost, ", "));
10 bool isamember(const node_t & m, const nodes_t & nodes) {
11 return find(nodes.begin(), nodes.end(), m) != nodes.end();
14 // check if l2 contains a majority of the elements of l1
15 bool majority(const nodes_t &l1, const nodes_t &l2) {
16 auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
17 return overlap >= (l1.size() >> 1) + 1;
20 // This module implements the proposer and acceptor of the Paxos
21 // distributed algorithm as described by Lamport's "Paxos Made
22 // Simple". To kick off an instance of Paxos, the caller supplies a
23 // list of nodes, a proposed value, and invokes the proposer. If the
24 // majority of the nodes agree on the proposed value after running
25 // this instance of Paxos, the acceptor invokes the upcall
26 // paxos_commit to inform higher layers of the agreed value for this
29 proposer_acceptor::proposer_acceptor(class paxos_change *_delegate,
30 bool _first, const node_t & _me, const value_t & _value)
31 : delegate(_delegate), me (_me)
33 // at this point, the log has already been replayed
34 if (instance_h == 0 && _first) {
36 l.loginstance(1, _value);
40 pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
41 pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
42 pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
45 bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
47 lock ml(proposer_mutex);
48 LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
49 if (!stable) { // already running proposer?
50 LOG("proposer::run: already running");
55 my_n.n = std::max(n_h.n, my_n.n) + 1;
58 if (prepare(instance, accepts, cur_nodes, v)) {
60 if (majority(cur_nodes, accepts)) {
61 LOG("paxos::run: received a majority of prepare responses");
67 accept(instance, accepts, nodes, v);
69 if (majority(cur_nodes, accepts)) {
70 LOG("paxos::run: received a majority of accept responses");
74 decide(instance, accepts, v);
77 LOG("paxos::run: no majority of accept responses");
80 LOG("paxos::run: no majority of prepare responses");
83 LOG("paxos::run: prepare is rejected " << stable);
89 bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
90 const nodes_t & nodes, value_t & v) {
92 prop_t highest_n_a{0, ""};
93 for (auto i : nodes) {
95 rpcc *r = h.safebind();
98 auto status = (paxos_protocol::status)r->call_timeout(
99 paxos_protocol::preparereq, rpcc::to(1000), res, me, instance, my_n);
100 if (status == paxos_protocol::OK) {
101 if (res.oldinstance) {
102 LOG("commiting old instance!");
103 commit(instance, res.v_a);
107 accepts.push_back(i);
108 if (res.n_a >= highest_n_a) {
109 LOG("found a newer accepted proposal");
111 highest_n_a = res.n_a;
119 void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
120 const nodes_t & nodes, const value_t & v) {
121 for (auto i : nodes) {
123 rpcc *r = h.safebind();
127 int status = r->call_timeout(
128 paxos_protocol::acceptreq, rpcc::to(1000), accept, me, instance, my_n, v);
129 if (status == paxos_protocol::OK && accept)
130 accepts.push_back(i);
134 void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
135 for (auto i : accepts) {
137 rpcc *r = h.safebind();
141 r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, instance, v);
145 paxos_protocol::status
146 proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
147 lock ml(acceptor_mutex);
148 r.oldinstance = false;
152 if (instance <= instance_h) {
153 r.oldinstance = true;
154 r.v_a = values[instance];
155 } else if (n > n_h) {
160 LOG("I totally rejected this request. Ha.");
162 return paxos_protocol::OK;
165 paxos_protocol::status
166 proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
167 lock ml(acceptor_mutex);
169 if (instance == instance_h + 1) {
173 l.logaccept(n_a, v_a);
176 return paxos_protocol::OK;
178 return paxos_protocol::ERR;
182 paxos_protocol::status
183 proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
184 lock ml(acceptor_mutex);
185 LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << v_a);
186 if (instance == instance_h + 1) {
188 commit(instance, v_a, ml);
189 } else if (instance <= instance_h) {
190 // we are ahead; ignore.
195 return paxos_protocol::OK;
198 void proposer_acceptor::commit(unsigned instance, const value_t & value) {
199 lock ml(acceptor_mutex);
200 commit(instance, value, ml);
203 void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
204 LOG("acceptor::commit: instance=" << instance << " has v=" << value);
205 if (instance > instance_h) {
206 LOG("commit: highestacceptedinstance = " << instance);
207 values[instance] = value;
208 l.loginstance(instance, value);
209 instance_h = instance;
213 pxs_mutex_lock.unlock();
214 delegate->paxos_commit(instance, value);
215 pxs_mutex_lock.lock();
220 // For testing purposes
221 void proposer_acceptor::breakpoint1() {
223 LOG("Dying at breakpoint 1!");
228 void proposer_acceptor::breakpoint2() {
230 LOG("Dying at breakpoint 2!");
235 void proposer_acceptor::breakpoint(int b) {
237 LOG("Proposer: breakpoint 1");
240 LOG("Proposer: breakpoint 2");