3 #include "threaded_log.h"
4 #include "lang/verify.h"
9 // This module implements the proposer and acceptor of the Paxos
10 // distributed algorithm as described by Lamport's "Paxos Made
11 // Simple". To kick off an instance of Paxos, the caller supplies a
12 // list of nodes, a proposed value, and invokes the proposer. If the
13 // majority of the nodes agree on the proposed value after running
14 // this instance of Paxos, the acceptor invokes the upcall
15 // paxos_commit to inform higher layers of the agreed value for this
18 bool operator> (const prop_t &a, const prop_t &b) {
19 return (a.n > b.n || (a.n == b.n && a.m > b.m));
22 bool operator>= (const prop_t &a, const prop_t &b) {
23 return (a.n > b.n || (a.n == b.n && a.m >= b.m));
27 print_members(const vector<string> &nodes) {
30 for (unsigned i = 0; i < nodes.size(); i++) {
32 if (i < (nodes.size()-1))
39 bool isamember(const string & m, const vector<string> & nodes) {
40 for (auto n : nodes) {
47 bool proposer::isrunning() {
54 // check if the servers in l2 contains a majority of servers in l1
55 bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
58 for (unsigned i = 0; i < l1.size(); i++) {
59 if (isamember(l1[i], l2))
62 return n >= (l1.size() >> 1) + 1;
65 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
66 : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
75 my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
78 bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
80 vector<string> accepts;
86 LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
87 if (!stable) { // already running proposer?
88 LOG("proposer::run: already running");
95 if (prepare(instance, accepts, cur_nodes, v)) {
97 if (majority(cur_nodes, accepts)) {
98 LOG("paxos::manager: received a majority of prepare responses");
107 accept(instance, accepts, nodes, v);
109 if (majority(cur_nodes, accepts)) {
110 LOG("paxos::manager: received a majority of accept responses");
114 decide(instance, accepts, v);
117 LOG("paxos::manager: no majority of accept responses");
120 LOG("paxos::manager: no majority of prepare responses");
123 LOG("paxos::manager: prepare is rejected " << stable);
129 // proposer::run() calls prepare to send prepare RPCs to nodes
130 // and collect responses. if one of those nodes
131 // replies with an oldinstance, return false.
132 // otherwise fill in accepts with set of nodes that accepted,
133 // set v to the v_a with the highest n_a, and return true.
135 proposer::prepare(unsigned instance, vector<string> & accepts,
136 const vector<string> & nodes,
139 struct paxos_protocol::preparearg arg = { instance, my_n };
140 struct paxos_protocol::prepareres res;
141 prop_t n_a = { 0, "" };
143 for (auto i : nodes) {
145 if (!(r = h.safebind()))
147 int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
148 if (status == paxos_protocol::OK) {
149 if (res.oldinstance) {
150 LOG("commiting old instance!");
151 acc->commit(instance, res.v_a);
155 accepts.push_back(i);
156 if (res.n_a >= n_a) {
157 LOG("found a newer accepted proposal");
167 // run() calls this to send out accept RPCs to accepts.
168 // fill in accepts with list of nodes that accepted.
170 proposer::accept(unsigned instance, vector<string> & accepts,
171 const vector<string> & nodes, const string & v)
173 struct paxos_protocol::acceptarg arg = { instance, my_n, v };
175 for (auto i : nodes) {
177 if (!(r = h.safebind()))
180 int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
181 if (status == paxos_protocol::OK && accept)
182 accepts.push_back(i);
187 proposer::decide(unsigned instance, const vector<string> & accepts,
190 struct paxos_protocol::decidearg arg = { instance, v };
192 for (auto i : accepts) {
194 if (!(r = h.safebind()))
197 r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
201 acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
202 const string & _value)
203 : cfg(_cfg), me (_me), instance_h(0)
211 l = new log (this, me);
213 if (instance_h == 0 && _first) {
215 l->loginstance(1, _value);
219 pxs = new rpcs((uint32_t)stoi(_me));
220 pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
221 pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
222 pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
225 paxos_protocol::status
226 acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
227 paxos_protocol::preparearg a)
230 r.oldinstance = false;
234 if (a.instance <= instance_h) {
235 r.oldinstance = true;
236 r.v_a = values[a.instance];
237 } else if (a.n > n_h) {
242 LOG("I totally rejected this request. Ha.");
244 return paxos_protocol::OK;
247 paxos_protocol::status
248 acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
255 l->logaccept(n_a, v_a);
258 return paxos_protocol::OK;
261 // the src argument is only for debugging
262 paxos_protocol::status
263 acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
266 LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
267 if (a.instance == instance_h + 1) {
269 commit(a.instance, v_a, ml);
270 } else if (a.instance <= instance_h) {
271 // we are ahead ignore.
276 return paxos_protocol::OK;
280 acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
282 LOG("acceptor::commit: instance=" << instance << " has v=" << value);
283 if (instance > instance_h) {
284 LOG("commit: highestaccepteinstance = " << instance);
285 values[instance] = value;
286 l->loginstance(instance, value);
287 instance_h = instance;
294 pxs_mutex_lock.unlock();
295 cfg->paxos_commit(instance, value);
296 pxs_mutex_lock.lock();
302 acceptor::commit(unsigned instance, const string & value)
305 commit(instance, value, ml);
315 acceptor::restore(const string & s)
323 // For testing purposes
325 // Call this from your code between phases prepare and accept of proposer
327 proposer::breakpoint1()
330 LOG("Dying at breakpoint 1!");
335 // Call this from your code between phases accept and decide of proposer
337 proposer::breakpoint2()
340 LOG("Dying at breakpoint 2!");
346 proposer::breakpoint(int b)
349 LOG("Proposer: breakpoint 1");
352 LOG("Proposer: breakpoint 2");