6 #include "lang/verify.h"
8 // This module implements the proposer and acceptor of the Paxos
9 // distributed algorithm as described by Lamport's "Paxos Made
10 // Simple". To kick off an instance of Paxos, the caller supplies a
11 // list of nodes, a proposed value, and invokes the proposer. If the
12 // majority of the nodes agree on the proposed value after running
13 // this instance of Paxos, the acceptor invokes the upcall
14 // paxos_commit to inform higher layers of the agreed value for this
19 operator> (const prop_t &a, const prop_t &b)
21 return (a.n > b.n || (a.n == b.n && a.m > b.m));
25 operator>= (const prop_t &a, const prop_t &b)
27 return (a.n > b.n || (a.n == b.n && a.m >= b.m));
31 print_members(const std::vector<std::string> &nodes)
35 for (unsigned i = 0; i < nodes.size(); i++) {
37 if (i < (nodes.size()-1))
43 bool isamember(std::string m, const std::vector<std::string> &nodes)
45 for (unsigned i = 0; i < nodes.size(); i++) {
46 if (nodes[i] == m) return 1;
55 ScopedLock ml(pxs_mutex);
60 // check if the servers in l2 contains a majority of servers in l1
62 proposer::majority(const std::vector<std::string> &l1,
63 const std::vector<std::string> &l2)
67 for (unsigned i = 0; i < l1.size(); i++) {
68 if (isamember(l1[i], l2))
71 return n >= (l1.size() >> 1) + 1;
74 proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor,
76 : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
86 my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
90 proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
92 std::vector<std::string> accepts;
93 std::vector<std::string> nodes;
97 ScopedLock ml(pxs_mutex);
98 tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
99 print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
100 if (!stable) { // already running proposer?
101 tprintf("proposer::run: already running\n");
108 if (prepare(instance, accepts, cur_nodes, v)) {
110 if (majority(cur_nodes, accepts)) {
111 tprintf("paxos::manager: received a majority of prepare responses\n");
120 accept(instance, accepts, nodes, v);
122 if (majority(cur_nodes, accepts)) {
123 tprintf("paxos::manager: received a majority of accept responses\n");
127 decide(instance, accepts, v);
130 tprintf("paxos::manager: no majority of accept responses\n");
133 tprintf("paxos::manager: no majority of prepare responses\n");
136 tprintf("paxos::manager: prepare is rejected %d\n", stable);
142 // proposer::run() calls prepare to send prepare RPCs to nodes
143 // and collect responses. if one of those nodes
144 // replies with an oldinstance, return false.
145 // otherwise fill in accepts with set of nodes that accepted,
146 // set v to the v_a with the highest n_a, and return true.
148 proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
149 std::vector<std::string> nodes,
152 struct paxos_protocol::preparearg arg = { instance, my_n };
153 struct paxos_protocol::prepareres res;
154 prop_t n_a = { 0, "" };
156 for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
158 if (!(r = h.safebind()))
160 int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
161 if (status == paxos_protocol::OK) {
162 if (res.oldinstance) {
163 tprintf("commiting old instance!\n");
164 acc->commit(instance, res.v_a);
168 accepts.push_back(*i);
169 if (res.n_a >= n_a) {
170 tprintf("found a newer accepted proposal\n");
180 // run() calls this to send out accept RPCs to accepts.
181 // fill in accepts with list of nodes that accepted.
183 proposer::accept(unsigned instance, std::vector<std::string> &accepts,
184 std::vector<std::string> nodes, std::string v)
186 struct paxos_protocol::acceptarg arg = { instance, my_n, v };
188 for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
190 if (!(r = h.safebind()))
193 int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
194 if (status == paxos_protocol::OK) {
196 accepts.push_back(*i);
202 proposer::decide(unsigned instance, std::vector<std::string> accepts,
205 struct paxos_protocol::decidearg arg = { instance, v };
207 for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
209 if (!(r = h.safebind()))
212 r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
216 acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me,
218 : cfg(_cfg), me (_me), instance_h(0)
226 l = new log (this, me);
228 if (instance_h == 0 && _first) {
230 l->loginstance(1, _value);
234 pxs = new rpcs(atoi(_me.c_str()));
235 pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
236 pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
237 pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
240 paxos_protocol::status
241 acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
242 paxos_protocol::prepareres &r)
244 ScopedLock ml(pxs_mutex);
245 r.oldinstance = false;
249 if (a.instance <= instance_h) {
250 r.oldinstance = true;
251 r.v_a = values[a.instance];
252 } else if (a.n > n_h) {
257 tprintf("I totally rejected this request. Ha.\n");
259 return paxos_protocol::OK;
262 paxos_protocol::status
263 acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
265 ScopedLock ml(pxs_mutex);
270 l->logaccept(n_a, v_a);
273 return paxos_protocol::OK;
276 // the src argument is only for debug purpose
277 paxos_protocol::status
278 acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
280 ScopedLock ml(pxs_mutex);
281 tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n",
282 a.instance, instance_h, v_a.c_str());
283 if (a.instance == instance_h + 1) {
285 commit_wo(a.instance, v_a);
286 } else if (a.instance <= instance_h) {
287 // we are ahead ignore.
292 return paxos_protocol::OK;
296 acceptor::commit_wo(unsigned instance, std::string value)
298 //assume pxs_mutex is held
299 tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
300 if (instance > instance_h) {
301 tprintf("commit: highestaccepteinstance = %d\n", instance);
302 values[instance] = value;
303 l->loginstance(instance, value);
304 instance_h = instance;
312 cfg->paxos_commit(instance, value);
319 acceptor::commit(unsigned instance, std::string value)
321 ScopedLock ml(pxs_mutex);
322 commit_wo(instance, value);
332 acceptor::restore(std::string s)
340 // For testing purposes
342 // Call this from your code between phases prepare and accept of proposer
344 proposer::breakpoint1()
347 tprintf("Dying at breakpoint 1!\n");
352 // Call this from your code between phases accept and decide of proposer
354 proposer::breakpoint2()
357 tprintf("Dying at breakpoint 2!\n");
363 proposer::breakpoint(int b)
366 tprintf("Proposer: breakpoint 1\n");
369 tprintf("Proposer: breakpoint 2\n");