5 #include "lang/verify.h"
8 // This module implements the proposer and acceptor of the Paxos
9 // distributed algorithm as described by Lamport's "Paxos Made
10 // Simple". To kick off an instance of Paxos, the caller supplies a
11 // list of nodes, a proposed value, and invokes the proposer. If the
12 // majority of the nodes agree on the proposed value after running
13 // this instance of Paxos, the acceptor invokes the upcall
14 // paxos_commit to inform higher layers of the agreed value for this
17 bool operator> (const prop_t &a, const prop_t &b) {
18 return (a.n > b.n || (a.n == b.n && a.m > b.m));
21 bool operator>= (const prop_t &a, const prop_t &b) {
22 return (a.n > b.n || (a.n == b.n && a.m >= b.m));
26 print_members(const std::vector<std::string> &nodes) {
29 for (unsigned i = 0; i < nodes.size(); i++) {
31 if (i < (nodes.size()-1))
38 bool isamember(const std::string & m, const std::vector<std::string> & nodes) {
39 for (auto n : nodes) {
46 bool proposer::isrunning() {
53 // check if the servers in l2 contains a majority of servers in l1
54 bool proposer::majority(const std::vector<std::string> &l1,
55 const std::vector<std::string> &l2) {
58 for (unsigned i = 0; i < l1.size(); i++) {
59 if (isamember(l1[i], l2))
62 return n >= (l1.size() >> 1) + 1;
65 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
66 const std::string &_me)
67 : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
76 my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
79 bool proposer::run(unsigned instance, const std::vector<std::string> & cur_nodes,
80 const std::string & newv)
82 std::vector<std::string> accepts;
83 std::vector<std::string> nodes;
88 tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
89 print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
90 if (!stable) { // already running proposer?
91 tprintf("proposer::run: already running\n");
98 if (prepare(instance, accepts, cur_nodes, v)) {
100 if (majority(cur_nodes, accepts)) {
101 tprintf("paxos::manager: received a majority of prepare responses\n");
110 accept(instance, accepts, nodes, v);
112 if (majority(cur_nodes, accepts)) {
113 tprintf("paxos::manager: received a majority of accept responses\n");
117 decide(instance, accepts, v);
120 tprintf("paxos::manager: no majority of accept responses\n");
123 tprintf("paxos::manager: no majority of prepare responses\n");
126 tprintf("paxos::manager: prepare is rejected %d\n", stable);
132 // proposer::run() calls prepare to send prepare RPCs to nodes
133 // and collect responses. if one of those nodes
134 // replies with an oldinstance, return false.
135 // otherwise fill in accepts with set of nodes that accepted,
136 // set v to the v_a with the highest n_a, and return true.
138 proposer::prepare(unsigned instance, std::vector<std::string> & accepts,
139 const std::vector<std::string> & nodes,
142 struct paxos_protocol::preparearg arg = { instance, my_n };
143 struct paxos_protocol::prepareres res;
144 prop_t n_a = { 0, "" };
146 for (auto i : nodes) {
148 if (!(r = h.safebind()))
150 int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
151 if (status == paxos_protocol::OK) {
152 if (res.oldinstance) {
153 tprintf("commiting old instance!\n");
154 acc->commit(instance, res.v_a);
158 accepts.push_back(i);
159 if (res.n_a >= n_a) {
160 tprintf("found a newer accepted proposal\n");
170 // run() calls this to send out accept RPCs to accepts.
171 // fill in accepts with list of nodes that accepted.
173 proposer::accept(unsigned instance, std::vector<std::string> & accepts,
174 const std::vector<std::string> & nodes, const std::string & v)
176 struct paxos_protocol::acceptarg arg = { instance, my_n, v };
178 for (auto i : nodes) {
180 if (!(r = h.safebind()))
183 int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
184 if (status == paxos_protocol::OK && accept)
185 accepts.push_back(i);
190 proposer::decide(unsigned instance, const std::vector<std::string> & accepts,
191 const std::string & v)
193 struct paxos_protocol::decidearg arg = { instance, v };
195 for (auto i : accepts) {
197 if (!(r = h.safebind()))
200 r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
204 acceptor::acceptor(class paxos_change *_cfg, bool _first, const std::string & _me,
205 const std::string & _value)
206 : cfg(_cfg), me (_me), instance_h(0)
214 l = new log (this, me);
216 if (instance_h == 0 && _first) {
218 l->loginstance(1, _value);
222 pxs = new rpcs((uint32_t)std::stoi(_me));
223 pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
224 pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
225 pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
228 paxos_protocol::status
229 acceptor::preparereq(paxos_protocol::prepareres & r, const std::string &,
230 paxos_protocol::preparearg a)
233 r.oldinstance = false;
237 if (a.instance <= instance_h) {
238 r.oldinstance = true;
239 r.v_a = values[a.instance];
240 } else if (a.n > n_h) {
245 tprintf("I totally rejected this request. Ha.\n");
247 return paxos_protocol::OK;
250 paxos_protocol::status
251 acceptor::acceptreq(bool & r, const std::string &, paxos_protocol::acceptarg a)
258 l->logaccept(n_a, v_a);
261 return paxos_protocol::OK;
264 // the src argument is only for debugging
265 paxos_protocol::status
266 acceptor::decidereq(int &, const std::string &, paxos_protocol::decidearg a)
269 tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
270 a.instance, instance_h, v_a.c_str());
271 if (a.instance == instance_h + 1) {
273 commit(a.instance, v_a, ml);
274 } else if (a.instance <= instance_h) {
275 // we are ahead ignore.
280 return paxos_protocol::OK;
284 acceptor::commit(unsigned instance, const std::string & value, lock & pxs_mutex_lock)
286 tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
287 if (instance > instance_h) {
288 tprintf("commit: highestaccepteinstance = %d\n", instance);
289 values[instance] = value;
290 l->loginstance(instance, value);
291 instance_h = instance;
298 pxs_mutex_lock.unlock();
299 cfg->paxos_commit(instance, value);
300 pxs_mutex_lock.lock();
306 acceptor::commit(unsigned instance, const std::string & value)
309 commit(instance, value, ml);
319 acceptor::restore(const std::string & s)
327 // For testing purposes
329 // Call this from your code between phases prepare and accept of proposer
331 proposer::breakpoint1()
334 tprintf("Dying at breakpoint 1!\n");
339 // Call this from your code between phases accept and decide of proposer
341 proposer::breakpoint2()
344 tprintf("Dying at breakpoint 2!\n");
350 proposer::breakpoint(int b)
353 tprintf("Proposer: breakpoint 1\n");
356 tprintf("Proposer: breakpoint 2\n");