Cosmetic improvements.
[invirt/third/libt4.git] / paxos.cc
index 46d9c1c..dad5ecf 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,10 +1,15 @@
 #include "paxos.h"
 #include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
-#include "lock.h"
 
-using std::stoi;
+bool isamember(const node_t & m, const nodes_t & nodes) {
+    return find(nodes.begin(), nodes.end(), m) != nodes.end();
+}
+
+// check if l2 contains a majority of the elements of l1
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+    auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
+    return overlap >= (l1.size() >> 1) + 1;
+}
 
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
@@ -15,148 +20,95 @@ using std::stoi;
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
-bool operator> (const prop_t &a, const prop_t &b) {
-    return (a.n > b.n || (a.n == b.n && a.m > b.m));
-}
-
-bool operator>= (const prop_t &a, const prop_t &b) {
-    return (a.n > b.n || (a.n == b.n && a.m >= b.m));
-}
-
-string
-print_members(const vector<string> &nodes) {
-    string s;
-    s.clear();
-    for (unsigned i = 0; i < nodes.size(); i++) {
-        s += nodes[i];
-        if (i < (nodes.size()-1))
-            s += ",";
-    }
-    return s;
-}
-
-
-bool isamember(const string & m, const vector<string> & nodes) {
-    for (auto n : nodes) {
-        if (n == m)
-            return 1;
-    }
-    return 0;
-}
-
-bool proposer::isrunning() {
-    bool r;
-    lock ml(pxs_mutex);
-    r = !stable;
-    return r;
-}
-
-// check if the servers in l2 contains a majority of servers in l1
-bool proposer::majority(const vector<string> &l1, const vector<string> &l2) {
-    unsigned n = 0;
-
-    for (unsigned i = 0; i < l1.size(); i++) {
-        if (isamember(l1[i], l2))
-            n++;
-    }
-    return n >= (l1.size() >> 1) + 1;
-}
-
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, const string &_me)
-  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false),
-    stable (true)
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
+        bool _first, const node_t & _me, const value_t & _value)
+    : delegate(_delegate), me (_me)
 {
-    my_n.n = 0;
-    my_n.m = me;
-}
+    // at this point, the log has already been replayed
+    if (instance_h == 0 && _first) {
+        values[1] = _value;
+        l.loginstance(1, _value);
+        instance_h = 1;
+    }
 
-void proposer::setn()
-{
-    my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
+    pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
+    pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
+    pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
 }
 
-bool proposer::run(unsigned instance, const vector<string> & cur_nodes, const string & newv)
+bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
 {
-    vector<string> accepts;
-    vector<string> nodes;
-    string v;
-    bool r = false;
-
-    lock ml(pxs_mutex);
-    LOG("start: initiate paxos for " << print_members(cur_nodes) << " w. i=" << instance << " v=" << newv << " stable=" << stable);
+    lock ml(proposer_mutex);
+    LOG("initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable);
     if (!stable) {  // already running proposer?
-        LOG("proposer::run: already running");
+        LOG("paxos proposer already running");
         return false;
     }
     stable = false;
-    setn();
-    accepts.clear();
-    v.clear();
+    bool r = false;
+    proposal.n = max(promise.n, proposal.n) + 1;
+    nodes_t accepts;
+    value_t v;
     if (prepare(instance, accepts, cur_nodes, v)) {
 
         if (majority(cur_nodes, accepts)) {
-            LOG("paxos::manager: received a majority of prepare responses");
+            LOG("received a majority of prepare responses");
 
-            if (v.size() == 0)
+            if (!v.size())
                 v = newv;
 
             breakpoint1();
 
-            nodes = accepts;
-            accepts.clear();
+            nodes_t nodes;
+            nodes.swap(accepts);
             accept(instance, accepts, nodes, v);
 
             if (majority(cur_nodes, accepts)) {
-                LOG("paxos::manager: received a majority of accept responses");
+                LOG("received a majority of accept responses");
 
                 breakpoint2();
 
                 decide(instance, accepts, v);
                 r = true;
             } else {
-                LOG("paxos::manager: no majority of accept responses");
+                LOG("no majority of accept responses");
             }
         } else {
-            LOG("paxos::manager: no majority of prepare responses");
+            LOG("no majority of prepare responses");
         }
     } else {
-        LOG("paxos::manager: prepare is rejected " << stable);
+        LOG("prepare is rejected " << stable);
     }
     stable = true;
     return r;
 }
 
-// proposer::run() calls prepare to send prepare RPCs to nodes
-// and collect responses. if one of those nodes
-// replies with an oldinstance, return false.
-// otherwise fill in accepts with set of nodes that accepted,
-// set v to the v_a with the highest n_a, and return true.
-bool
-proposer::prepare(unsigned instance, vector<string> & accepts,
-        const vector<string> & nodes,
-        string & v)
-{
-    struct paxos_protocol::preparearg arg = { instance, my_n };
-    struct paxos_protocol::prepareres res;
-    prop_t n_a = { 0, "" };
-    rpcc *r;
+bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, value_t & v) {
+    LOG("sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")");
+    prepareres res;
+    prop_t highest_n_a{0, ""};
     for (auto i : nodes) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
-        int status = r->call_timeout(paxos_protocol::preparereq, rpcc::to(1000), res, me, arg);
+        auto status = (paxos_protocol::status)r->call_timeout(
+                paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
             if (res.oldinstance) {
                 LOG("commiting old instance!");
-                acc->commit(instance, res.v_a);
+                commit(instance, res.v_a);
                 return false;
             }
+            LOG("preparereq responded with oldinstance=" << res.oldinstance << " accept=" << res.accept << " n_a=(" << res.n_a.n << ", " << res.n_a.m << ") " <<
+                    "v_a=\"" << res.v_a << "\"");
             if (res.accept) {
                 accepts.push_back(i);
-                if (res.n_a >= n_a) {
-                    LOG("found a newer accepted proposal");
+                if (res.n_a >= highest_n_a) {
+                    LOG("found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")");
                     v = res.v_a;
-                    n_a = res.n_a;
+                    highest_n_a = res.n_a;
                 }
             }
         }
@@ -164,192 +116,134 @@ proposer::prepare(unsigned instance, vector<string> & accepts,
     return true;
 }
 
-// run() calls this to send out accept RPCs to accepts.
-// fill in accepts with list of nodes that accepted.
-void
-proposer::accept(unsigned instance, vector<string> & accepts,
-        const vector<string> & nodes, const string & v)
-{
-    struct paxos_protocol::acceptarg arg = { instance, my_n, v };
-    rpcc *r;
+void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, const value_t & v) {
     for (auto i : nodes) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
         bool accept = false;
-        int status = r->call_timeout(paxos_protocol::acceptreq, rpcc::to(1000), accept, me, arg);
+        int status = r->call_timeout(
+                paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
         if (status == paxos_protocol::OK && accept)
             accepts.push_back(i);
     }
 }
 
-void
-proposer::decide(unsigned instance, const vector<string> & accepts,
-        const string & v)
-{
-    struct paxos_protocol::decidearg arg = { instance, v };
-    rpcc *r;
+void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
     for (auto i : accepts) {
         handle h(i);
-        if (!(r = h.safebind()))
+        rpcc *r = h.safebind();
+        if (!r)
             continue;
         int res = 0;
-        r->call_timeout(paxos_protocol::decidereq, rpcc::to(1000), res, me, arg);
+        r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
     }
 }
 
-acceptor::acceptor(class paxos_change *_cfg, bool _first, const string & _me,
-        const string & _value)
-  : cfg(_cfg), me (_me), instance_h(0)
-{
-    n_h.n = 0;
-    n_h.m = me;
-    n_a.n = 0;
-    n_a.m = me;
-    v_a.clear();
-
-    l = new log (this, me);
-
-    if (instance_h == 0 && _first) {
-        values[1] = _value;
-        l->loginstance(1, _value);
-        instance_h = 1;
-    }
-
-    pxs = new rpcs((uint32_t)stoi(_me));
-    pxs->reg(paxos_protocol::preparereq, &acceptor::preparereq, this);
-    pxs->reg(paxos_protocol::acceptreq, &acceptor::acceptreq, this);
-    pxs->reg(paxos_protocol::decidereq, &acceptor::decidereq, this);
-}
-
 paxos_protocol::status
-acceptor::preparereq(paxos_protocol::prepareres & r, const string &,
-        paxos_protocol::preparearg a)
-{
-    lock ml(pxs_mutex);
+proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+    LOG("instance " << instance << " proposal (" << n.n << ", " << n.m << ")");
+    lock ml(acceptor_mutex);
     r.oldinstance = false;
     r.accept = false;
-    r.n_a = n_a;
-    r.v_a = v_a;
-    if (a.instance <= instance_h) {
+    r.n_a = accepted;
+    r.v_a = accepted_value;
+    if (instance <= instance_h) {
+        LOG("old instance " << instance << " has value " << values[instance]);
         r.oldinstance = true;
-        r.v_a = values[a.instance];
-    } else if (a.n > n_h) {
-        n_h = a.n;
-        l->logprop(n_h);
+        r.v_a = values[instance];
+    } else if (n > promise) {
+        LOG("looks good to me");
+        promise = n;
+        l.logprop(promise);
         r.accept = true;
     } else {
         LOG("I totally rejected this request.  Ha.");
     }
+    LOG("preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " <<
+        "v_a=\"" << r.v_a << "\"");
     return paxos_protocol::OK;
 }
 
 paxos_protocol::status
-acceptor::acceptreq(bool & r, const string &, paxos_protocol::acceptarg a)
-{
-    lock ml(pxs_mutex);
+proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
+    lock ml(acceptor_mutex);
     r = false;
-    if (a.n >= n_h) {
-        n_a = a.n;
-        v_a = a.v;
-        l->logaccept(n_a, v_a);
-        r = true;
+    if (instance == instance_h + 1) {
+        if (n >= promise) {
+            accepted = n;
+            accepted_value = v;
+            l.logaccept(accepted, accepted_value);
+            r = true;
+        }
+        return paxos_protocol::OK;
+    } else {
+        return paxos_protocol::ERR;
     }
-    return paxos_protocol::OK;
 }
 
-// the src argument is only for debugging
 paxos_protocol::status
-acceptor::decidereq(int &, const string &, paxos_protocol::decidearg a)
-{
-    lock ml(pxs_mutex);
-    LOG("decidereq for accepted instance " << a.instance << " (my instance " << instance_h << ") v=" << v_a);
-    if (a.instance == instance_h + 1) {
-        VERIFY(v_a == a.v);
-        commit(a.instance, v_a, ml);
-    } else if (a.instance <= instance_h) {
-        // we are ahead ignore.
+proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
+    lock ml(acceptor_mutex);
+    LOG("decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value);
+    if (instance == instance_h + 1) {
+        VERIFY(accepted_value == v);
+        commit(instance, accepted_value, ml);
+    } else if (instance <= instance_h) {
+        // we are ahead; ignore.
     } else {
-        // we are behind
+        // we are behind.
         VERIFY(0);
     }
     return paxos_protocol::OK;
 }
 
-void
-acceptor::commit(unsigned instance, const string & value, lock & pxs_mutex_lock)
-{
-    LOG("acceptor::commit: instance=" << instance << " has v=" << value);
+void proposer_acceptor::commit(unsigned instance, const value_t & value) {
+    lock ml(acceptor_mutex);
+    commit(instance, value, ml);
+}
+
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & pxs_mutex_lock) {
+    LOG("instance=" << instance << " has v=" << value);
     if (instance > instance_h) {
-        LOG("commit: highestaccepteinstance = " << instance);
+        LOG("highestacceptedinstance = " << instance);
         values[instance] = value;
-        l->loginstance(instance, value);
+        l.loginstance(instance, value);
         instance_h = instance;
-        n_h.n = 0;
-        n_h.m = me;
-        n_a.n = 0;
-        n_a.m = me;
-        v_a.clear();
-        if (cfg) {
+        accepted = promise = {0, me};
+        string v = value; // gaaahhh aliasing of value and accepted_value
+        accepted_value.clear(); // this wipes out "value", too
+        if (delegate) {
             pxs_mutex_lock.unlock();
-            cfg->paxos_commit(instance, value);
+            delegate->paxos_commit(instance, v);
             pxs_mutex_lock.lock();
         }
     }
 }
 
-void
-acceptor::commit(unsigned instance, const string & value)
-{
-    lock ml(pxs_mutex);
-    commit(instance, value, ml);
-}
-
-string
-acceptor::dump()
-{
-    return l->dump();
-}
-
-void
-acceptor::restore(const string & s)
-{
-    l->restore(s);
-    l->logread();
-}
-
-
-
 // For testing purposes
-
-// Call this from your code between phases prepare and accept of proposer
-void
-proposer::breakpoint1()
-{
+void proposer_acceptor::breakpoint1() {
     if (break1) {
         LOG("Dying at breakpoint 1!");
         exit(1);
     }
 }
 
-// Call this from your code between phases accept and decide of proposer
-void
-proposer::breakpoint2()
-{
+void proposer_acceptor::breakpoint2() {
     if (break2) {
         LOG("Dying at breakpoint 2!");
         exit(1);
     }
 }
 
-void
-proposer::breakpoint(int b)
-{
+void proposer_acceptor::breakpoint(int b) {
     if (b == 3) {
-        LOG("Proposer: breakpoint 1");
+        LOG("breakpoint 1");
         break1 = true;
     } else if (b == 4) {
-        LOG("Proposer: breakpoint 2");
+        LOG("breakpoint 2");
         break2 = true;
     }
 }