Actually, you know, build.
[invirt/third/libt4.git] / config.cc
index 0f9ab4c..04c869e 100644 (file)
--- a/config.cc
+++ b/config.cc
@@ -1,3 +1,4 @@
+#include <thread>
 #include <sstream>
 #include <iostream>
 #include <stdio.h>
 #include <sstream>
 #include <iostream>
 #include <stdio.h>
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
 // all views, the other nodes can bring this re-joined node up to
 // date.
 
-static void *
-heartbeatthread(void *x)
+config::config(
+        const std::string &_first,
+        const std::string &_me,
+        config_view_change *_vc)
+    : my_view_id(0), first(_first), me(_me), vc(_vc)
 {
 {
-  config *r = (config *) x;
-  r->heartbeater();
-  return 0;
-}
-
-config::config(std::string _first, std::string _me, config_view_change *_vc) 
-  : myvid (0), first (_first), me (_me), vc (_vc)
-{
-  VERIFY (pthread_mutex_init(&cfg_mutex, NULL) == 0);
-  VERIFY(pthread_cond_init(&config_cond, NULL) == 0);  
-
-  std::ostringstream ost;
-  ost << me;
-
-  acc = new acceptor(this, me == _first, me, ost.str());
-  pro = new proposer(this, acc, me);
+    paxos_acceptor = new acceptor(this, me == _first, me, me);
+    paxos_proposer = new proposer(this, paxos_acceptor, me);
 
 
-  // XXX hack; maybe should have its own port number
-  pxsrpc = acc->get_rpcs();
-  pxsrpc->reg(paxos_protocol::heartbeat, this, &config::heartbeat);
+    // XXX hack; maybe should have its own port number
+    paxos_acceptor->get_rpcs()->reg(paxos_protocol::heartbeat, &config::heartbeat, this);
 
 
-  {
-      ScopedLock ml(&cfg_mutex);
-
-      reconstruct();
-
-      pthread_t th;
-      VERIFY (pthread_create(&th, NULL, &heartbeatthread, (void *) this) == 0);
-  }
+    {
+        lock ml(cfg_mutex);
+        reconstruct();
+        std::thread(&config::heartbeater, this).detach();
+    }
 }
 
 void
 }
 
 void
-config::restore(std::string s)
+config::restore(const std::string &s)
 {
 {
-  ScopedLock ml(&cfg_mutex);
-  acc->restore(s);
-  reconstruct();
+    lock ml(cfg_mutex);
+    paxos_acceptor->restore(s);
+    reconstruct();
 }
 
 }
 
-std::vector<std::string>
-config::get_view(unsigned instance)
+void
+config::get_view(unsigned instance, std::vector<std::string> &m)
 {
 {
-  ScopedLock ml(&cfg_mutex);
-  return get_view_wo(instance);
+    lock ml(cfg_mutex);
+    get_view_wo(instance, m);
 }
 
 // caller should hold cfg_mutex
 }
 
 // caller should hold cfg_mutex
-std::vector<std::string>
-config::get_view_wo(unsigned instance)
+void
+config::get_view_wo(unsigned instance, std::vector<std::string> &m)
 {
 {
-  std::string value = acc->value(instance);
-  tprintf("get_view(%d): returns %s\n", instance, value.c_str());
-  return members(value);
+    std::string value = paxos_acceptor->value(instance);
+    tprintf("get_view(%d): returns %s\n", instance, value.c_str());
+    members(value, m);
 }
 
 }
 
-std::vector<std::string>
-config::members(std::string value)
+void
+config::members(const std::string &value, std::vector<std::string> &view) const
 {
 {
-  std::istringstream ist(value);
-  std::string m;
-  std::vector<std::string> view;
-  while (ist >> m) {
-    view.push_back(m);
-  }
-  return view;
+    std::istringstream ist(value);
+    std::string m;
+    view.clear();
+    while (ist >> m) {
+        view.push_back(m);
+    }
 }
 
 std::string
 }
 
 std::string
-config::value(std::vector<std::string> m)
+config::value(const std::vector<std::string> &m) const
 {
 {
-  std::ostringstream ost;
-  for (unsigned i = 0; i < m.size(); i++)  {
-    ost << m[i];
-    ost << " ";
-  }
-  return ost.str();
+    std::ostringstream ost;
+    for (unsigned i = 0; i < m.size(); i++)  {
+        ost << m[i];
+        ost << " ";
+    }
+    return ost.str();
 }
 
 // caller should hold cfg_mutex
 void
 config::reconstruct()
 {
 }
 
 // caller should hold cfg_mutex
 void
 config::reconstruct()
 {
-  if (acc->instance() > 0) {
-    std::string m;
-    myvid = acc->instance();
-    mems = get_view_wo(myvid);
-    tprintf("config::reconstruct: %d %s\n", myvid, print_members(mems).c_str());
-  }
+    if (paxos_acceptor->instance() > 0) {
+        std::string m;
+        my_view_id = paxos_acceptor->instance();
+        get_view_wo(my_view_id, mems);
+        tprintf("config::reconstruct: %d %s\n",
+                my_view_id, print_members(mems).c_str());
+    }
 }
 
 // Called by Paxos's acceptor.
 void
 }
 
 // Called by Paxos's acceptor.
 void
-config::paxos_commit(unsigned instance, std::string value)
+config::paxos_commit(unsigned instance, const std::string &value)
 {
 {
-  std::string m;
-  std::vector<std::string> newmem;
-  ScopedLock ml(&cfg_mutex);
-
-  newmem = members(value);
-  tprintf("config::paxos_commit: %d: %s\n", instance, 
-        print_members(newmem).c_str());
-
-  for (unsigned i = 0; i < mems.size(); i++) {
-    tprintf("config::paxos_commit: is %s still a member?\n", mems[i].c_str());
-    if (!isamember(mems[i], newmem) && me != mems[i]) {
-      tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
-      mgr.delete_handle(mems[i]);
+    std::string m;
+    std::vector<std::string> newmem;
+    lock ml(cfg_mutex);
+
+    members(value, newmem);
+    tprintf("config::paxos_commit: %d: %s\n", instance,
+                 print_members(newmem).c_str());
+
+    for (unsigned i = 0; i < mems.size(); i++) {
+        tprintf("config::paxos_commit: is %s still a member?\n",
+                mems[i].c_str());
+        if (!isamember(mems[i], newmem) && me != mems[i]) {
+            tprintf("config::paxos_commit: delete %s\n", mems[i].c_str());
+            mgr.delete_handle(mems[i]);
+        }
     }
     }
-  }
 
 
-  mems = newmem;
-  myvid = instance;
-  if (vc) {
-    unsigned vid = myvid;
-    VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-    vc->commit_change(vid);
-    VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  }
+    mems = newmem;
+    my_view_id = instance;
+    if (vc) {
+        ml.unlock();
+        vc->commit_change(instance);
+        ml.lock();
+    }
 }
 
 bool
 }
 
 bool
-config::ismember(std::string m, unsigned vid)
+config::ismember(const std::string &m, unsigned vid)
 {
 {
-  bool r;
-  ScopedLock ml(&cfg_mutex);
-  std::vector<std::string> v = get_view_wo(vid);
-  r = isamember(m, v);
-  return r;
+    lock ml(cfg_mutex);
+    std::vector<std::string> v;
+    get_view_wo(vid, v);
+    return isamember(m, v);
 }
 
 bool
 }
 
 bool
-config::add(std::string new_m, unsigned vid)
+config::add(const std::string &new_m, unsigned vid)
 {
 {
-  std::vector<std::string> m;
-  std::vector<std::string> curm;
-  ScopedLock ml(&cfg_mutex);
-  if (vid != myvid)
-    return false;
-  tprintf("config::add %s\n", new_m.c_str());
-  m = mems;
-  m.push_back(new_m);
-  curm = mems;
-  std::string v = value(m);
-  int nextvid = myvid + 1;
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  bool r = pro->run(nextvid, curm, v);
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (r) {
-    tprintf("config::add: proposer returned success\n");
-  } else {
-    tprintf("config::add: proposer returned failure\n");
-  }
-  return r;
+    std::vector<std::string> m;
+    std::vector<std::string> curm;
+    lock ml(cfg_mutex);
+    if (vid != my_view_id)
+        return false;
+    tprintf("config::add %s\n", new_m.c_str());
+    m = mems;
+    m.push_back(new_m);
+    curm = mems;
+    std::string v = value(m);
+    int nextvid = my_view_id + 1;
+    bool r;
+    {
+        ml.unlock();
+        r = paxos_proposer->run(nextvid, curm, v);
+        ml.lock();
+    }
+    tprintf("config::add: proposer returned %s\n",
+            r ? "success" : "failure");
+    return r;
 }
 
 // caller should hold cfg_mutex
 bool
 }
 
 // caller should hold cfg_mutex
 bool
-config::remove_wo(std::string m)
+config::remove(const std::string &m)
 {
 {
-  tprintf("config::remove: myvid %d remove? %s\n", myvid, m.c_str());
-  std::vector<std::string> n;
-  for (unsigned i = 0; i < mems.size(); i++) {
-    if (mems[i] != m) n.push_back(mems[i]);
-  }
-  std::string v = value(n);
-  std::vector<std::string> cmems = mems;
-  int nextvid = myvid + 1;
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  bool r = pro->run(nextvid, cmems, v);
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (r) {
-    tprintf("config::remove: proposer returned success\n");
-  } else {
-    tprintf("config::remove: proposer returned failure\n");
-  }
-  return r;
+    adopt_lock ml(cfg_mutex);
+    tprintf("config::remove: my_view_id %d remove? %s\n",
+            my_view_id, m.c_str());
+    std::vector<std::string> n;
+    for (unsigned i = 0; i < mems.size(); i++) {
+        if (mems[i] != m)
+            n.push_back(mems[i]);
+    }
+    std::string v = value(n);
+    std::vector<std::string> cmems = mems;
+    int nextvid = my_view_id + 1;
+    bool r;
+    {
+        ml.unlock();
+        r = paxos_proposer->run(nextvid, cmems, v);
+        ml.lock();
+    }
+    tprintf("config::remove: proposer returned %s\n",
+            r ? "success" : "failure");
+    return r;
 }
 
 void
 config::heartbeater()
 {
 }
 
 void
 config::heartbeater()
 {
-  struct timeval now;
-  struct timespec next_timeout;
-  std::string m;
-  heartbeat_t h;
-  bool stable;
-  unsigned vid;
-  std::vector<std::string> cmems;
-  ScopedLock ml(&cfg_mutex);
-  
-  while (1) {
-
-    gettimeofday(&now, NULL);
-    next_timeout.tv_sec = now.tv_sec + 3;
-    next_timeout.tv_nsec = 0;
-    tprintf("heartbeater: go to sleep\n");
-    pthread_cond_timedwait(&config_cond, &cfg_mutex, &next_timeout);
-
-    stable = true;
-    vid = myvid;
-    cmems = get_view_wo(vid);
-    tprintf("heartbeater: current membership %s\n", print_members(cmems).c_str());
-
-    if (!isamember(me, cmems)) {
-      tprintf("heartbeater: not member yet; skip hearbeat\n");
-      continue;
-    }
-
-    //find the node with the smallest id
-    m = me;
-    for (unsigned i = 0; i < cmems.size(); i++) {
-      if (m > cmems[i])
-       m = cmems[i];
-    }
-
-    if (m == me) {
-      //if i am the one with smallest id, ping the rest of the nodes
-      for (unsigned i = 0; i < cmems.size(); i++) {
-       if (cmems[i] != me) {
-         if ((h = doheartbeat(cmems[i])) != OK) {
-           stable = false;
-           m = cmems[i];
-           break;
-         }
-       }
-      }
-    } else {
-      //the rest of the nodes ping the one with smallest id
-       if ((h = doheartbeat(m)) != OK) 
-           stable = false;
-    }
-
-    if (!stable && vid == myvid) {
-      remove_wo(m);
+    std::string m;
+    heartbeat_t h;
+    bool stable;
+    unsigned vid;
+    std::vector<std::string> cmems;
+    lock ml(cfg_mutex);
+
+    while (1) {
+        auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+        tprintf("heartbeater: go to sleep\n");
+        config_cond.wait_until(ml, next_timeout);
+
+        stable = true;
+        vid = my_view_id;
+        get_view_wo(vid, cmems);
+        tprintf("heartbeater: current membership %s\n",
+                print_members(cmems).c_str());
+
+        if (!isamember(me, cmems)) {
+            tprintf("heartbeater: not member yet; skip hearbeat\n");
+            continue;
+        }
+
+        // who has the smallest ID?
+        m = me;
+        for (unsigned i = 0; i < cmems.size(); i++) {
+            if (m > cmems[i])
+                m = cmems[i];
+        }
+
+        if (m == me) {
+            // ping the other nodes
+            for (unsigned i = 0; i < cmems.size(); i++) {
+                if (cmems[i] != me) {
+                    if ((h = doheartbeat(cmems[i])) != OK) {
+                        stable = false;
+                        m = cmems[i];
+                        break;
+                    }
+                }
+            }
+        } else {
+            // ping the node with the smallest ID
+            if ((h = doheartbeat(m)) != OK)
+                stable = false;
+        }
+
+        if (!stable && vid == my_view_id) {
+            remove(m);
+        }
     }
     }
-  }
 }
 
 paxos_protocol::status
 }
 
 paxos_protocol::status
-config::heartbeat(std::string m, unsigned vid, int &r)
+config::heartbeat(int &r, std::string m, unsigned vid)
 {
 {
-  ScopedLock ml(&cfg_mutex);
-  int ret = paxos_protocol::ERR;
-  r = (int) myvid;
-  tprintf("heartbeat from %s(%d) myvid %d\n", m.c_str(), vid, myvid);
-  if (vid == myvid) {
-    ret = paxos_protocol::OK;
-  } else if (pro->isrunning()) {
-    VERIFY (vid == myvid + 1 || vid + 1 == myvid);
-    ret = paxos_protocol::OK;
-  } else {
-    ret = paxos_protocol::ERR;
-  }
-  return ret;
+    lock ml(cfg_mutex);
+    int ret = paxos_protocol::ERR;
+    r = (int) my_view_id;
+    tprintf("heartbeat from %s(%d) my_view_id %d\n",
+            m.c_str(), vid, my_view_id);
+    if (vid == my_view_id) {
+        ret = paxos_protocol::OK;
+    } else if (paxos_proposer->isrunning()) {
+        VERIFY (vid == my_view_id + 1 || vid + 1 == my_view_id);
+        ret = paxos_protocol::OK;
+    } else {
+        ret = paxos_protocol::ERR;
+    }
+    return ret;
 }
 
 config::heartbeat_t
 }
 
 config::heartbeat_t
-config::doheartbeat(std::string m)
+config::doheartbeat(const std::string &m)
 {
 {
-  int ret = rpc_const::timeout_failure;
-  int r;
-  unsigned vid = myvid;
-  heartbeat_t res = OK;
-
-  tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
-  handle h(m);
-  VERIFY(pthread_mutex_unlock(&cfg_mutex)==0);
-  rpcc *cl = h.safebind();
-  if (cl) {
-    ret = cl->call(paxos_protocol::heartbeat, me, vid, r, 
-                  rpcc::to(1000));
-  } 
-  VERIFY(pthread_mutex_lock(&cfg_mutex)==0);
-  if (ret != paxos_protocol::OK) {
-    if (ret == rpc_const::atmostonce_failure || 
-       ret == rpc_const::oldsrv_failure) {
-      mgr.delete_handle(m);
-    } else {
-      tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n", 
-            m.c_str(), ret, vid, r);
-      if (ret < 0) res = FAILURE;
-      else res = VIEWERR;
+    adopt_lock ml(cfg_mutex);
+    int ret = rpc_const::timeout_failure;
+    int r;
+    unsigned vid = my_view_id;
+    heartbeat_t res = OK;
+
+    tprintf("doheartbeater to %s (%d)\n", m.c_str(), vid);
+    handle h(m);
+    {
+        ml.unlock();
+        rpcc *cl = h.safebind();
+        if (cl) {
+            ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(1000), r, me, vid);
+        }
+        ml.lock();
+    }
+    if (ret != paxos_protocol::OK) {
+        if (ret == rpc_const::atmostonce_failure ||
+            ret == rpc_const::oldsrv_failure) {
+            mgr.delete_handle(m);
+        } else {
+            tprintf("doheartbeat: problem with %s (%d) my vid %d his vid %d\n",
+                         m.c_str(), ret, vid, r);
+            if (ret < 0) res = FAILURE;
+            else res = VIEWERR;
+        }
     }
     }
-  }
-  tprintf("doheartbeat done %d\n", res);
-  return res;
+    tprintf("doheartbeat done %d\n", res);
+    return res;
 }
 
 }