Partially fixed a bug in the test suite that led to test runs randomly failing.
[invirt/third/libt4.git] / config.cc
index 7bac4a9..727277e 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,10 +1,5 @@
-#include <thread>
-#include <sstream>
 #include "config.h"
-#include "paxos.h"
 #include "handle.h"
-#include "threaded_log.h"
-#include "lang/verify.h"
 
 // The config module maintains views. As a node joins or leaves a
 // view, the next view will be the same as previous view, except with
@@ -40,8 +35,7 @@
 
 config::config(const string &_first, const string &_me, config_view_change *_vc)
     : my_view_id(0), first(_first), me(_me), vc(_vc),
-      paxos_acceptor(this, me == _first, me, me),
-      paxos_proposer(this, &paxos_acceptor, me)
+      paxos(this, me == _first, me, me)
 {
     get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
     lock cfg_mutex_lock(cfg_mutex);
@@ -51,7 +45,7 @@ config::config(const string &_first, const string &_me, config_view_change *_vc)
 
 void config::restore(const string &s) {
     lock cfg_mutex_lock(cfg_mutex);
-    paxos_acceptor.restore(s);
+    paxos.restore(s);
     reconstruct(cfg_mutex_lock);
 }
 
@@ -61,29 +55,25 @@ void config::get_view(unsigned instance, vector<string> &m) {
 }
 
 void config::get_view(unsigned instance, vector<string> &m, lock &) {
-    string value = paxos_acceptor.value(instance);
+    string value = paxos.value(instance);
     LOG("get_view(" << instance << "): returns " << value);
     m = members(value);
 }
 
 vector<string> config::members(const string &value) const {
-    istringstream ist(value);
-    using it = istream_iterator<string>;
-    return {it(ist), it()};
+    return explode(value);
 }
 
-string config::value(const vector<string> &m) const {
-    ostringstream ost;
-    copy(m.begin(), m.end(), ostream_iterator<string>(ost, " "));
-    return ost.str();
+string config::value(const vector<string> &members) const {
+    return implode(members);
 }
 
 void config::reconstruct(lock &cfg_mutex_lock) {
     VERIFY(cfg_mutex_lock);
-    if (paxos_acceptor.instance() > 0) {
-        my_view_id = paxos_acceptor.instance();
+    my_view_id = paxos.instance();
+    if (my_view_id > 0) {
         get_view(my_view_id, mems, cfg_mutex_lock);
-        LOG("config::reconstruct: " << my_view_id << " " << print_members(mems));
+        LOG("view " << my_view_id << " " << mems);
     }
 }
 
@@ -92,12 +82,12 @@ void config::paxos_commit(unsigned instance, const string &value) {
     lock cfg_mutex_lock(cfg_mutex);
 
     vector<string> newmem = members(value);
-    LOG("config::paxos_commit: " << instance << ": " << print_members(newmem));
+    LOG("instance " << instance << ": " << newmem);
 
     for (auto mem : mems) {
-        LOG("config::paxos_commit: is " << mem << " still a member?");
+        LOG("is " << mem << " still a member?");
         if (!isamember(mem, newmem) && me != mem) {
-            LOG("config::paxos_commit: delete " << mem);
+            LOG("delete " << mem);
             invalidate_handle(mem);
         }
     }
@@ -120,23 +110,27 @@ bool config::ismember(const string &m, unsigned vid) {
 
 bool config::add(const string &new_m, unsigned vid) {
     lock cfg_mutex_lock(cfg_mutex);
-    if (vid != my_view_id)
+    LOG("adding " << new_m << " to " << vid);
+    if (vid != my_view_id) {
+        LOG("that's not my view id, " << my_view_id << "!");
         return false;
-    LOG("config::add " << new_m);
-    vector<string> m = mems;
+    }
+    LOG("calling down to paxos layer");
+    vector<string> m(mems), cmems(mems);
     m.push_back(new_m);
-    vector<string> cmems = mems;
+    LOG("old mems " << cmems << " " << value(cmems));
+    LOG("new mems " << m << " " << value(m));
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos_proposer.run(nextvid, cmems, value(m));
+    bool r = paxos.run(nextvid, cmems, value(m));
     cfg_mutex_lock.lock();
-    LOG("config::add: proposer returned " << (r ? "success" : "failure"));
+    LOG("paxos proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
 // caller should hold cfg_mutex
 bool config::remove(const string &m, lock &cfg_mutex_lock) {
-    LOG("config::remove: my_view_id " << my_view_id << " remove? " << m);
+    LOG("my_view_id " << my_view_id << " remove? " << m);
     vector<string> n;
     for (auto mem : mems) {
         if (mem != m)
@@ -145,9 +139,9 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) {
     vector<string> cmems = mems;
     unsigned nextvid = my_view_id + 1;
     cfg_mutex_lock.unlock();
-    bool r = paxos_proposer.run(nextvid, cmems, value(n));
+    bool r = paxos.run(nextvid, cmems, value(n));
     cfg_mutex_lock.lock();
-    LOG("config::remove: proposer returned " << (r ? "success" : "failure"));
+    LOG("proposer returned " << (r ? "success" : "failure"));
     return r;
 }
 
@@ -156,16 +150,16 @@ void config::heartbeater() [[noreturn]] {
 
     while (1) {
         auto next_timeout = steady_clock::now() + seconds(3);
-        LOG("heartbeater: go to sleep");
+        LOG("go to sleep");
         config_cond.wait_until(cfg_mutex_lock, next_timeout);
 
         unsigned vid = my_view_id;
         vector<string> cmems;
         get_view(vid, cmems, cfg_mutex_lock);
-        LOG("heartbeater: current membership " << print_members(cmems));
+        LOG("current membership " << cmems);
 
         if (!isamember(me, cmems)) {
-            LOG("heartbeater: not member yet; skip hearbeat");
+            LOG("not member yet; skip hearbeat");
             continue;
         }
 
@@ -195,7 +189,7 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
     LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
     if (vid == my_view_id)
         return paxos_protocol::OK;
-    else if (paxos_proposer.isrunning()) {
+    else if (paxos.isrunning()) {
         VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
         return paxos_protocol::OK;
     }
@@ -204,7 +198,7 @@ paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
 
 config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
     unsigned vid = my_view_id;
-    LOG("doheartbeater to " << m << " (" << vid << ")");
+    LOG("heartbeat to " << m << " (" << vid << ")");
     handle h(m);
 
     cfg_mutex_lock.unlock();
@@ -222,9 +216,9 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
             invalidate_handle(m);
             break;
         default:
-            LOG("doheartbeat: problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
+            LOG("problem with " << m << " (" << ret << ") my vid " << vid << " his vid " << r);
             res = (ret < 0) ? FAILURE : VIEWERR;
     }
-    LOG("doheartbeat done " << res);
+    LOG("done " << res);
     return res;
 }