So many changes. Broken.
[invirt/third/libt4.git] / paxos.cc
index c3a445f..108dfad 100644 (file)
--- a/paxos.cc
+++ b/paxos.cc
@@ -1,9 +1,20 @@
-#include "paxos.h"
-#include "handle.h"
-// #include <signal.h>
-#include <stdio.h>
-#include "tprintf.h"
-#include "lang/verify.h"
+#include "include/paxos.h"
+
+using namespace std::placeholders;
+using namespace std::chrono;
+
+paxos_change::~paxos_change() {}
+
+bool isamember(const node_t & m, const nodes_t & nodes) {
+    return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
+}
+
+// check if l2 contains a majority of the elements of l1
+bool majority(const nodes_t & l1, const nodes_t & l2) {
+    auto overlap = (size_t)std::count_if(
+            l1.begin(), l1.end(), std::bind(isamember, _1, l2));
+    return overlap >= (l1.size() >> 1) + 1;
+}
 
 // This module implements the proposer and acceptor of the Paxos
 // distributed algorithm as described by Lamport's "Paxos Made
 // paxos_commit to inform higher layers of the agreed value for this
 // instance.
 
+proposer_acceptor::proposer_acceptor(paxos_change *_delegate,
+        bool _first, const node_t & _me, const value_t & _value)
+    : delegate(_delegate), me (_me)
+{
+    l.handler<log_instance>([this] (auto entry) {
+        instance_h = entry.number;
+        values[entry.number] = entry.value;
+        accepted = promise = {0, me};
+        accepted_value.clear();
+    });
+    l.handler<log_proposal>([this] (auto entry) {
+        promise = entry.promise;
+    });
+    l.handler<log_accept>([this] (auto entry) {
+        accepted = entry.number;
+        accepted_value = entry.value;
+    });
+
+    if (instance_h == 0 && _first)
+        l.append(log_instance{1, _value});
+
+    l.replay();
+
+    pxs.reg(paxos_protocol::preparereq, &proposer_acceptor::preparereq, this);
+    pxs.reg(paxos_protocol::acceptreq, &proposer_acceptor::acceptreq, this);
+    pxs.reg(paxos_protocol::decidereq, &proposer_acceptor::decidereq, this);
+}
+
+bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const value_t & newv)
+{
+    lock ml(proposer_mutex);
+    LOG << "initiate paxos for " << cur_nodes << " w. i=" << instance << " v=\"" << newv << "\" stable=" << stable;
+    if (!stable) {  // already running proposer?
+        LOG << "paxos proposer already running";
+        return false;
+    }
+    stable = false;
+    bool r = false;
+    proposal.n = std::max(promise.n, proposal.n) + 1;
+    nodes_t accepts;
+    value_t v;
+    if (prepare(instance, accepts, cur_nodes, v)) {
 
-bool
-operator> (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m > b.m));
-}
-
-bool
-operator>= (const prop_t &a, const prop_t &b)
-{
-  return (a.n > b.n || (a.n == b.n && a.m >= b.m));
-}
-
-std::string
-print_members(const std::vector<std::string> &nodes)
-{
-  std::string s;
-  s.clear();
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    s += nodes[i];
-    if (i < (nodes.size()-1))
-      s += ",";
-  }
-  return s;
-}
-
-bool isamember(std::string m, const std::vector<std::string> &nodes)
-{
-  for (unsigned i = 0; i < nodes.size(); i++) {
-    if (nodes[i] == m) return 1;
-  }
-  return 0;
-}
-
-bool
-proposer::isrunning()
-{
-  bool r;
-  ScopedLock ml(pxs_mutex);
-  r = !stable;
-  return r;
-}
-
-// check if the servers in l2 contains a majority of servers in l1
-bool
-proposer::majority(const std::vector<std::string> &l1, 
-               const std::vector<std::string> &l2)
-{
-  unsigned n = 0;
-
-  for (unsigned i = 0; i < l1.size(); i++) {
-    if (isamember(l1[i], l2))
-      n++;
-  }
-  return n >= (l1.size() >> 1) + 1;
-}
-
-proposer::proposer(class paxos_change *_cfg, class acceptor *_acceptor, 
-                  std::string _me)
-  : cfg(_cfg), acc (_acceptor), me (_me), break1 (false), break2 (false), 
-    stable (true)
-{
-  my_n.n = 0;
-  my_n.m = me;
-}
-
-void
-proposer::setn()
-{
-  my_n.n = acc->get_n_h().n + 1 > my_n.n + 1 ? acc->get_n_h().n + 1 : my_n.n + 1;
-}
-
-bool
-proposer::run(int instance, std::vector<std::string> cur_nodes, std::string newv)
-{
-  std::vector<std::string> accepts;
-  std::vector<std::string> nodes;
-  std::string v;
-  bool r = false;
-
-  ScopedLock ml(pxs_mutex);
-  tprintf("start: initiate paxos for %s w. i=%d v=%s stable=%d\n",
-        print_members(cur_nodes).c_str(), instance, newv.c_str(), stable);
-  if (!stable) {  // already running proposer?
-    tprintf("proposer::run: already running\n");
-    return false;
-  }
-  stable = false;
-  setn();
-  accepts.clear();
-  v.clear();
-  if (prepare(instance, accepts, cur_nodes, v)) {
-
-    if (majority(cur_nodes, accepts)) {
-      tprintf("paxos::manager: received a majority of prepare responses\n");
+        if (majority(cur_nodes, accepts)) {
+            LOG << "received a majority of prepare responses";
 
-      if (v.size() == 0)
-       v = newv;
+            if (!v.size())
+                v = newv;
 
-      breakpoint1();
+            breakpoint1();
 
-      nodes = accepts;
-      accepts.clear();
-      accept(instance, accepts, nodes, v);
+            nodes_t nodes;
+            nodes.swap(accepts);
+            accept(instance, accepts, nodes, v);
 
-      if (majority(cur_nodes, accepts)) {
-       tprintf("paxos::manager: received a majority of accept responses\n");
+            if (majority(cur_nodes, accepts)) {
+                LOG << "received a majority of accept responses";
 
-       breakpoint2();
+                breakpoint2();
 
-       decide(instance, accepts, v);
-       r = true;
-      } else {
-       tprintf("paxos::manager: no majority of accept responses\n");
-      }
+                decide(instance, accepts, v);
+                r = true;
+            } else {
+                LOG << "no majority of accept responses";
+            }
+        } else {
+            LOG << "no majority of prepare responses";
+        }
     } else {
-      tprintf("paxos::manager: no majority of prepare responses\n");
+        LOG << "prepare is rejected " << stable;
     }
-  } else {
-    tprintf("paxos::manager: prepare is rejected %d\n", stable);
-  }
-  stable = true;
-  return r;
-}
-
-// proposer::run() calls prepare to send prepare RPCs to nodes
-// and collect responses. if one of those nodes
-// replies with an oldinstance, return false.
-// otherwise fill in accepts with set of nodes that accepted,
-// set v to the v_a with the highest n_a, and return true.
-bool
-proposer::prepare(unsigned instance, std::vector<std::string> &accepts, 
-         std::vector<std::string> nodes,
-         std::string &v)
-{
-    struct paxos_protocol::preparearg arg = { instance, my_n };
-    struct paxos_protocol::prepareres res;
-    prop_t n_a = { 0, "" };
-    rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
-        if (!(r = h.safebind()))
+    stable = true;
+    return r;
+}
+
+bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, value_t & v) {
+    LOG << "sending prepare messages (" << proposal.n << ", " << proposal.m << ", \"" << v << "\")";
+    prepareres res;
+    prop_t highest_n_a{0, ""};
+    for (auto i : nodes) {
+        auto cl = rpcc::bind_cached(i);
+        if (!cl)
             continue;
-        int status = r->call(paxos_protocol::preparereq, me, arg, res, rpcc::to(1000));
+        int status = cl->call_timeout(paxos_protocol::preparereq, 100ms,
+                res, me, instance, proposal);
         if (status == paxos_protocol::OK) {
-            if (res.oldinstance) {
-                tprintf("commiting old instance!\n");
-                acc->commit(instance, res.v_a);
+            LOG << "preparereq response type=" << res.type << " n_a=(" << res.n_a.n
+                << ", " << res.n_a.m << ") " << "v_a=\"" << res.v_a << "\"";
+            if (res.type == prepareres::oldinstance) {
+                LOG << "commiting old instance!";
+                lock ml(acceptor_mutex);
+                commit(instance, res.v_a, ml);
                 return false;
-            }
-            if (res.accept) {
-                accepts.push_back(*i);
-                if (res.n_a >= n_a) {
-                    tprintf("found a newer accepted proposal\n");
+            } else if (res.type == prepareres::accept) {
+                accepts.push_back(i);
+                if (res.n_a >= highest_n_a) {
+                    LOG << "found a newer accepted proposal, \"" << res.v_a << "\", with number (" << res.n_a.n << ", " << res.n_a.m << ")";
                     v = res.v_a;
-                    n_a = res.n_a;
+                    highest_n_a = res.n_a;
                 }
             }
         }
@@ -177,196 +132,118 @@ proposer::prepare(unsigned instance, std::vector<std::string> &accepts,
     return true;
 }
 
-// run() calls this to send out accept RPCs to accepts.
-// fill in accepts with list of nodes that accepted.
-void
-proposer::accept(unsigned instance, std::vector<std::string> &accepts,
-        std::vector<std::string> nodes, std::string v)
-{
-    struct paxos_protocol::acceptarg arg = { instance, my_n, v };
-    rpcc *r;
-    for (std::vector<std::string>::iterator i=nodes.begin(); i!=nodes.end(); i++) {
-        handle h(*i);
-        if (!(r = h.safebind()))
-            continue;
-        bool accept = false;
-        int status = r->call(paxos_protocol::acceptreq, me, arg, accept, rpcc::to(1000));
-        if (status == paxos_protocol::OK) {
-            if (accept)
-                accepts.push_back(*i);
+void proposer_acceptor::accept(unsigned instance, nodes_t & accepts,
+        const nodes_t & nodes, const value_t & v) {
+    bool accept = false;
+    for (auto i : nodes) {
+        if (auto cl = rpcc::bind_cached(i)) {
+            int status = cl->call_timeout(paxos_protocol::acceptreq, 100ms,
+                    accept, me, instance, proposal, v);
+            if (status == paxos_protocol::OK && accept)
+                accepts.push_back(i);
         }
     }
 }
 
-void
-proposer::decide(unsigned instance, std::vector<std::string> accepts, 
-             std::string v)
-{
-    struct paxos_protocol::decidearg arg = { instance, v };
-    rpcc *r;
-    for (std::vector<std::string>::iterator i=accepts.begin(); i!=accepts.end(); i++) {
-        handle h(*i);
-        if (!(r = h.safebind()))
-            continue;
-        int res = 0;
-        r->call(paxos_protocol::decidereq, me, arg, res, rpcc::to(1000));
-    }
-}
-
-acceptor::acceptor(class paxos_change *_cfg, bool _first, std::string _me, 
-            std::string _value)
-  : cfg(_cfg), me (_me), instance_h(0)
-{
-  n_h.n = 0;
-  n_h.m = me;
-  n_a.n = 0;
-  n_a.m = me;
-  v_a.clear();
-
-  l = new log (this, me);
-
-  if (instance_h == 0 && _first) {
-    values[1] = _value;
-    l->loginstance(1, _value);
-    instance_h = 1;
-  }
-
-  pxs = new rpcs(atoi(_me.c_str()));
-  pxs->reg(paxos_protocol::preparereq, this, &acceptor::preparereq);
-  pxs->reg(paxos_protocol::acceptreq, this, &acceptor::acceptreq);
-  pxs->reg(paxos_protocol::decidereq, this, &acceptor::decidereq);
+void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const value_t & v) {
+    int res = 0;
+    for (auto i : accepts)
+        if (auto cl = rpcc::bind_cached(i))
+            cl->call_timeout(paxos_protocol::decidereq, 100ms, res, me, instance, v);
 }
 
 paxos_protocol::status
-acceptor::preparereq(std::string src, paxos_protocol::preparearg a,
-    paxos_protocol::prepareres &r)
-{
-    ScopedLock ml(pxs_mutex);
-    r.oldinstance = false;
-    r.accept = false;
-    r.n_a = n_a;
-    r.v_a = v_a;
-    if (a.instance <= instance_h) {
-        r.oldinstance = true;
-        r.v_a = values[a.instance];
-    } else if (a.n > n_h) {
-        n_h = a.n;
-        l->logprop(n_h);
-        r.accept = true;
+proposer_acceptor::preparereq(prepareres & r, const node_t &, unsigned instance, prop_t n) {
+    LOG << "instance " << instance << " proposal (" << n.n << ", " << n.m << ")";
+    lock ml(acceptor_mutex);
+    if (instance <= instance_h) {
+        LOG << "old instance " << instance << " has value " << values[instance];
+        r = prepareres{prepareres::oldinstance, accepted, values[instance]};
+    } else if (n > promise) {
+        LOG << "looks good to me";
+        promise = n;
+        l.append(log_proposal{n});
+        r = prepareres{prepareres::accept, accepted, accepted_value};
     } else {
-        tprintf("I totally rejected this request.  Ha.\n");
+        LOG << "I totally rejected this request.  Ha.";
+        r = prepareres{prepareres::reject, accepted, accepted_value};
     }
+    LOG << "preparereq is responding with oldinstance=" << r.oldinstance << " accept=" << r.accept
+        << " n_a=(" << r.n_a.n << ", " << r.n_a.m << ") " << "v_a=\"" << r.v_a << "\"";
     return paxos_protocol::OK;
 }
 
 paxos_protocol::status
-acceptor::acceptreq(std::string src, paxos_protocol::acceptarg a, bool &r)
-{
-    ScopedLock ml(pxs_mutex);
+proposer_acceptor::acceptreq(bool & r, const node_t &, unsigned instance, prop_t n, const value_t & v) {
+    lock ml(acceptor_mutex);
     r = false;
-    if (a.n >= n_h) {
-        n_a = a.n;
-        v_a = a.v;
-        l->logaccept(n_a, v_a);
+    if (instance != instance_h + 1)
+        return paxos_protocol::ERR;
+    if (n >= promise) {
+        accepted = n;
+        accepted_value = v;
+        l.append(log_accept{n, v});
         r = true;
     }
     return paxos_protocol::OK;
 }
 
-// the src argument is only for debug purpose
 paxos_protocol::status
-acceptor::decidereq(std::string src, paxos_protocol::decidearg a, int &r)
-{
-  ScopedLock ml(pxs_mutex);
-  tprintf("decidereq for accepted instance %d (my instance %d) v=%s\n", 
-        a.instance, instance_h, v_a.c_str());
-  if (a.instance == instance_h + 1) {
-    VERIFY(v_a == a.v);
-    commit_wo(a.instance, v_a);
-  } else if (a.instance <= instance_h) {
-    // we are ahead ignore.
-  } else {
-    // we are behind
-    VERIFY(0);
-  }
-  return paxos_protocol::OK;
-}
-
-void
-acceptor::commit_wo(unsigned instance, std::string value)
-{
-  //assume pxs_mutex is held
-  tprintf("acceptor::commit: instance=%d has v= %s\n", instance, value.c_str());
-  if (instance > instance_h) {
-    tprintf("commit: highestaccepteinstance = %d\n", instance);
-    values[instance] = value;
-    l->loginstance(instance, value);
-    instance_h = instance;
-    n_h.n = 0;
-    n_h.m = me;
-    n_a.n = 0;
-    n_a.m = me;
-    v_a.clear();
-    if (cfg) {
-      pxs_mutex.release();
-      cfg->paxos_commit(instance, value);
-      pxs_mutex.acquire();
+proposer_acceptor::decidereq(int &, const node_t &, unsigned instance, const value_t & v) {
+    lock ml(acceptor_mutex);
+    LOG << "decidereq for accepted instance " << instance << " (my instance " << instance_h << ") v=" << accepted_value;
+    if (instance == instance_h + 1) {
+        VERIFY(accepted_value == v);
+        commit(instance, v, ml);
+    } else if (instance <= instance_h) {
+        // we are ahead; ignore.
+    } else {
+        // we are behind.
+        VERIFY(0);
     }
-  }
-}
-
-void
-acceptor::commit(unsigned instance, std::string value)
-{
-  ScopedLock ml(pxs_mutex);
-  commit_wo(instance, value);
-}
-
-std::string
-acceptor::dump()
-{
-  return l->dump();
+    return paxos_protocol::OK;
 }
 
-void
-acceptor::restore(std::string s)
-{
-  l->restore(s);
-  l->logread();
+void proposer_acceptor::commit(unsigned instance, const value_t & value, lock & acceptor_mutex_lock) {
+    VERIFY(&value != &accepted_value); // eited by aliasing?
+    VERIFY(acceptor_mutex_lock);
+    LOG << "instance=" << instance << " has v=" << value;
+    if (instance > instance_h) {
+        LOG << "highestacceptedinstance = " << instance;
+        l.append(log_instance{instance, value});
+        instance_h = instance;
+        values[instance] = value;
+        accepted = promise = {0, me};
+        accepted_value.clear();
+        if (delegate) {
+            acceptor_mutex_lock.unlock();
+            delegate->paxos_commit(instance, value);
+            acceptor_mutex_lock.lock();
+        }
+    }
 }
 
-
-
 // For testing purposes
-
-// Call this from your code between phases prepare and accept of proposer
-void
-proposer::breakpoint1()
-{
-  if (break1) {
-    tprintf("Dying at breakpoint 1!\n");
-    exit(1);
-  }
+void proposer_acceptor::breakpoint1() {
+    if (break1) {
+        LOG << "Dying at breakpoint 1!";
+        exit(1);
+    }
 }
 
-// Call this from your code between phases accept and decide of proposer
-void
-proposer::breakpoint2()
-{
-  if (break2) {
-    tprintf("Dying at breakpoint 2!\n");
-    exit(1);
-  }
+void proposer_acceptor::breakpoint2() {
+    if (break2) {
+        LOG << "Dying at breakpoint 2!";
+        exit(1);
+    }
 }
 
-void
-proposer::breakpoint(int b)
-{
-  if (b == 3) {
-    tprintf("Proposer: breakpoint 1\n");
-    break1 = true;
-  } else if (b == 4) {
-    tprintf("Proposer: breakpoint 2\n");
-    break2 = true;
-  }
+void proposer_acceptor::breakpoint(int b) {
+    if (b == 3) {
+        LOG << "breakpoint 1";
+        break1 = true;
+    } else if (b == 4) {
+        LOG << "breakpoint 2";
+        break2 = true;
+    }
 }