Lots more clean-ups
[invirt/third/libt4.git] / config.cc
index 5127cb2..35654d8 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,12 +1,5 @@
-#include <thread>
-#include <sstream>
-#include <iostream>
-#include <stdio.h>
 #include "config.h"
-#include "paxos.h"
 #include "handle.h"
-#include "tprintf.h"
-#include "lang/verify.h"
 
 // The config module maintains views. As a node joins or leaves a
 // view, the next view will be the same as previous view, except with
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
-config::config(
-        const std::string &_first,
-        const std::string &_me,
-        config_view_change *_vc)
-    : my_view_id(0), first(_first), me(_me), vc(_vc)
+config::config(const string &_first, const string &_me, config_view_change *_vc)
+    : my_view_id(0), first(_first), me(_me), vc(_vc),
+      paxos(this, me == _first, me, me)
 {
-    paxos_acceptor = new acceptor(this, me == _first, me, me);
-    paxos_proposer = new proposer(this, paxos_acceptor, me);
-
-    // XXX hack; maybe should have its own port number
-    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
-
-    {
-        lock ml(cfg_mutex);
-        reconstruct();
-        std::thread(&config::heartbeater, this).detach();
-    }
+    get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
+    lock cfg_mutex_lock(cfg_mutex);
+    reconstruct(cfg_mutex_lock);
+    thread(&config::heartbeater, this).detach();
 }
 
-void
-config::restore(const std::string &s)
-{
-    lock ml(cfg_mutex);
-    paxos_acceptor->restore(s);
-    reconstruct();
+void config::restore(const string &s) {
+    lock cfg_mutex_lock(cfg_mutex);
+    paxos.restore(s);
+    reconstruct(cfg_mutex_lock);
 }
 
-void
-config::get_view(unsigned instance, std::vector<std::string> &m)
-{
-    lock ml(cfg_mutex);
-    get_view_wo(instance, m);
+void config::get_view(unsigned instance, vector<string> &m) {
+    lock cfg_mutex_lock(cfg_mutex);
+    get_view(instance, m, cfg_mutex_lock);
 }
 
-// caller should hold cfg_mutex
-void
-config::get_view_wo(unsigned instance, std::vector<std::string> &m)
-{
-    std::string value = paxos_acceptor->value(instance);
-    tprintf("get_view(%d): returns %s\n", instance, value.c_str());
-    members(value, m);
+void config::get_view(unsigned instance, vector<string> &m, lock &) {
+    string value = paxos.value(instance);
+    LOG("get_view(" << instance << "): returns " << value);
+    m = members(value);
 }
 
-void
-config::members(const std::string &value, std::vector<std::string> &view) const
-{
-    std::istringstream ist(value);
-    std::string m;
-    view.clear();
-    while (ist >> m) {
-        view.push_back(m);
-    }
+vector<string> config::members(const string &value) const {
+    return explode(value);
 }
 
-std::string
-config::value(const std::vector<std::string> &m) const
-{
-    std::ostringstream ost;
-    for (unsigned i = 0; i < m.size(); i++)  {
-        ost << m[i];
-        ost << " ";
-    }
-    return ost.str();
+string config::value(const vector<string> &members) const {
+    return implode(members);
 }
 
-// caller should hold cfg_mutex
-void
-config::reconstruct()
-{
-    if (paxos_acceptor->instance() > 0) {
-        std::string m;
-        my_view_id = paxos_acceptor->instance();
-        get_view_wo(my_view_id, mems);
-        tprintf("config::reconstruct: %d %s\n",
-                my_view_id, print_members(mems).c_str());
+void config::reconstruct(lock &cfg_mutex_lock) {
+    VERIFY(cfg_mutex_lock);
+    my_view_id = paxos.instance();
+    if (my_view_id > 0) {
+        get_view(my_view_id, mems, cfg_mutex_lock);
+        LOG("view " << my_view_id << " " << mems);
     }
 }
 
 // Called by Paxos's acceptor.
-void
-config::paxos_commit(unsigned instance, const std::string &value)
-{
-    std::string m;
-    std::vector<std::string> newmem;
-    lock ml(cfg_mutex);
+void config::paxos_commit(unsigned instance, const string &value) {
+    lock cfg_mutex_lock(cfg_mutex);
 
-    members(value, newmem);
-    tprintf("config::paxos_commit: %d: %s\n", instance,
-                 print_members(newmem).c_str());
+    vector<string> newmem = members(value);
+    LOG("instance " << instance << ": " << newmem);
 
-    for (unsigned i = 0; i < mems.size(); i++) {
-        tprintf("config::paxos_commit: is %s still a member?\n",
-                mems[i].c_str());
-        if (!isamember(mems[i], newmem) && me != mems[i]) {
-            tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
-            mgr.delete_handle(mems[i]);
+    for (auto mem : mems) {
+        LOG("is " << mem << " still a member?");
+        if (!isamember(mem, newmem) && me != mem) {
+            LOG("delete " << mem);
+            invalidate_handle(mem);
         }
     }
 
     mems = newmem;
     my_view_id = instance;
     if (vc) {
-        ml.unlock();
+        cfg_mutex_lock.unlock();
         vc->commit_change(instance);
-        ml.lock();
+        cfg_mutex_lock.lock();
     }
 }
 
-bool
-config::ismember(const std::string &m, unsigned vid)
-{
-    lock ml(cfg_mutex);
-    std::vector<std::string> v;
-    get_view_wo(vid, v);
+bool config::ismember(const string &m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
+    vector<string> v;
+    get_view(vid, v, cfg_mutex_lock);
     return isamember(m, v);
 }
 
-bool
-config::add(const std::string &new_m, unsigned vid)
-{
-    std::vector<std::string> m;
-    std::vector<std::string> curm;
-    lock ml(cfg_mutex);
-    if (vid != my_view_id)
+bool config::add(const string &new_m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
+    LOG("adding " << new_m << " to " << vid);
+    if (vid != my_view_id) {
+        LOG("that's not my view id, " << my_view_id << "!");
         return false;
-    tprintf("config::add %s\n", new_m.c_str());
-    m = mems;
-    m.push_back(new_m);
-    curm = mems;
-    std::string v = value(m);
-    int nextvid = my_view_id + 1;
-    bool r;
-    {
-        ml.unlock();
-        r = paxos_proposer->run(nextvid, curm, v);
-        ml.lock();
     }
-    tprintf("config::add: proposer returned %s\n",
-            r ? "success" : "failure");
+    LOG("calling down to paxos layer");
+    vector<string> m(mems), cmems(mems);
+    m.push_back(new_m);
+    LOG("old mems " << cmems << " " << value(cmems));
+    LOG("new mems " << m << " " << value(m));
+    unsigned nextvid = my_view_id + 1;
+    cfg_mutex_lock.unlock();
+    bool r = paxos.run(nextvid, cmems, value(m));
+    cfg_mutex_lock.lock();
+    LOG("paxos proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
 // caller should hold cfg_mutex
-bool
-config::remove(const std::string &m)
-{
-    adopt_lock ml(cfg_mutex);
-    tprintf("config::remove: my_view_id %d remove? %s\n",
-            my_view_id, m.c_str());
-    std::vector<std::string> n;
-    for (unsigned i = 0; i < mems.size(); i++) {
-        if (mems[i] != m)
-            n.push_back(mems[i]);
-    }
-    std::string v = value(n);
-    std::vector<std::string> cmems = mems;
-    int nextvid = my_view_id + 1;
-    bool r;
-    {
-        ml.unlock();
-        r = paxos_proposer->run(nextvid, cmems, v);
-        ml.lock();
+bool config::remove(const string &m, lock &cfg_mutex_lock) {
+    LOG("my_view_id " << my_view_id << " remove? " << m);
+    vector<string> n;
+    for (auto mem : mems) {
+        if (mem != m)
+            n.push_back(mem);
     }
-    tprintf("config::remove: proposer returned %s\n",
-            r ? "success" : "failure");
+    vector<string> cmems = mems;
+    unsigned nextvid = my_view_id + 1;
+    cfg_mutex_lock.unlock();
+    bool r = paxos.run(nextvid, cmems, value(n));
+    cfg_mutex_lock.lock();
+    LOG("proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
-void
-config::heartbeater()
-{
-    std::string m;
-    heartbeat_t h;
-    bool stable;
-    unsigned vid;
-    std::vector<std::string> cmems;
-    lock ml(cfg_mutex);
+void config::heartbeater() [[noreturn]] {
+    lock cfg_mutex_lock(cfg_mutex);
 
     while (1) {
-        auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
-        tprintf("heartbeater: go to sleep\n");
-        config_cond.wait_until(ml, next_timeout);
+        auto next_timeout = steady_clock::now() + milliseconds(300);
+        LOG("go to sleep");
+        config_cond.wait_until(cfg_mutex_lock, next_timeout);
 
-        stable = true;
-        vid = my_view_id;
-        get_view_wo(vid, cmems);
-        tprintf("heartbeater: current membership %s\n",
-                print_members(cmems).c_str());
+        unsigned vid = my_view_id;
+        vector<string> cmems;
+        get_view(vid, cmems, cfg_mutex_lock);
+        LOG("current membership " << cmems);
 
         if (!isamember(me, cmems)) {
-            tprintf("heartbeater: not member yet; skip hearbeat\n");
+            LOG("not member yet; skip hearbeat");
             continue;
         }
 
         // who has the smallest ID?
-        m = me;
-        for (unsigned i = 0; i < cmems.size(); i++) {
-            if (m > cmems[i])
-                m = cmems[i];
-        }
+        string m = min(me, *min_element(cmems.begin(), cmems.end()));
 
         if (m == me) {
             // ping the other nodes
-            for (unsigned i = 0; i < cmems.size(); i++) {
-                if (cmems[i] != me) {
-                    if ((h = doheartbeat(cmems[i])) != OK) {
-                        stable = false;
-                        m = cmems[i];
-                        break;
-                    }
-                }
+            for (string mem : cmems) {
+                if (mem == me || doheartbeat(mem, cfg_mutex_lock) == OK)
+                    continue;
+                if (vid == my_view_id)
+                    remove(mem, cfg_mutex_lock);
+                break;
             }
         } else {
             // ping the node with the smallest ID
-            if ((h = doheartbeat(m)) != OK)
-                stable = false;
-        }
-
-        if (!stable && vid == my_view_id) {
-            remove(m);
+            if (doheartbeat(m, cfg_mutex_lock) != OK && vid == my_view_id)
+                remove(m, cfg_mutex_lock);
         }
     }
 }
 
-paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
-{
-    lock ml(cfg_mutex);
-    int ret = paxos_protocol::ERR;
+paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+    lock cfg_mutex_lock(cfg_mutex);
     r = (int) my_view_id;
-    tprintf("heartbeat from %s(%d) my_view_id %d\n",
-            m.c_str(), vid, my_view_id);
-    if (vid == my_view_id) {
-        ret = paxos_protocol::OK;
-    } else if (paxos_proposer->isrunning()) {
+    LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
+    if (vid == my_view_id)
+        return paxos_protocol::OK;
+    else if (paxos.isrunning()) {
         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
-        ret = paxos_protocol::OK;
-    } else {
-        ret = paxos_protocol::ERR;
+        return paxos_protocol::OK;
     }
-    return ret;
+    return paxos_protocol::ERR;
 }
 
-config::heartbeat_t
-config::doheartbeat(const std::string &m)
-{
-    adopt_lock ml(cfg_mutex);
-    int ret = rpc_const::timeout_failure;
-    int r;
+config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     unsigned vid = my_view_id;
-    heartbeat_t res = OK;
-
-    tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+    LOG("heartbeat to " << m << " (" << vid << ")");
     handle h(m);
-    {
-        ml.unlock();
-        rpcc *cl = h.safebind();
-        if (cl) {
-            ret = cl->call(paxos_protocol::heartbeat, me, vid, r,
-                                         rpcc::to(1000));
-        }
-        ml.lock();
-    }
-    if (ret != paxos_protocol::OK) {
-        if (ret == rpc_const::atmostonce_failure ||
-            ret == rpc_const::oldsrv_failure) {
-            mgr.delete_handle(m);
-        } else {
-            tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
-                         m.c_str(), ret, vid, r);
-            if (ret < 0) res = FAILURE;
-            else res = VIEWERR;
-        }
+
+    cfg_mutex_lock.unlock();
+    int r = 0, ret = rpc_const::bind_failure;
+    if (rpcc *cl = h.safebind())
+        ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
+    cfg_mutex_lock.lock();
+
+    heartbeat_t res = OK;
+    switch (ret) {
+        case paxos_protocol::OK:
+            break;
+        case rpc_const::atmostonce_failure:
+        case rpc_const::oldsrv_failure:
+            invalidate_handle(m);
+            break;
+        default:
+            LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+            res = (ret < 0) ? FAILURE : VIEWERR;
     }
-    tprintf("doheartbeat done %d\n", res);
+    LOG("done " << res);
     return res;
 }
-