#include "tprintf.h"
#include "lang/verify.h"
+using namespace std::chrono;
+using std::string;
+using std::vector;
+using std::thread;
+using std::ostringstream;
+using std::istringstream;
+
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// the new node added or removed. The first view contains only node
// date.
config::config(
- const std::string &_first,
- const std::string &_me,
+ const string &_first,
+ const string &_me,
config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc)
{
{
lock ml(cfg_mutex);
- reconstruct();
- std::thread(&config::heartbeater, this).detach();
+ reconstruct(ml);
+ thread(&config::heartbeater, this).detach();
}
}
void
-config::restore(const std::string &s)
+config::restore(const string &s)
{
lock ml(cfg_mutex);
paxos_acceptor->restore(s);
- reconstruct();
+ reconstruct(ml);
}
void
-config::get_view(unsigned instance, std::vector<std::string> &m)
+config::get_view(unsigned instance, vector<string> &m)
{
lock ml(cfg_mutex);
- get_view_wo(instance, m);
+ get_view(instance, m, ml);
}
// caller should hold cfg_mutex
void
-config::get_view_wo(unsigned instance, std::vector<std::string> &m)
+config::get_view(unsigned instance, vector<string> &m, lock &)
{
- std::string value = paxos_acceptor->value(instance);
+ string value = paxos_acceptor->value(instance);
tprintf("get_view(%d): returns %s\n", instance, value.c_str());
members(value, m);
}
void
-config::members(const std::string &value, std::vector<std::string> &view) const
+config::members(const string &value, vector<string> &view) const
{
- std::istringstream ist(value);
- std::string m;
+ istringstream ist(value);
+ string m;
view.clear();
- while (ist >> m) {
+ while (ist >> m)
view.push_back(m);
- }
}
-std::string
-config::value(const std::vector<std::string> &m) const
+string
+config::value(const vector<string> &m) const
{
- std::ostringstream ost;
+ ostringstream ost;
for (unsigned i = 0; i < m.size(); i++) {
ost << m[i];
ost << " ";
return ost.str();
}
-// caller should hold cfg_mutex
void
-config::reconstruct()
+config::reconstruct(lock &cfg_mutex_lock)
{
+ VERIFY(cfg_mutex_lock);
if (paxos_acceptor->instance() > 0) {
- std::string m;
my_view_id = paxos_acceptor->instance();
- get_view_wo(my_view_id, mems);
+ get_view(my_view_id, mems, cfg_mutex_lock);
tprintf("config::reconstruct: %d %s\n",
my_view_id, print_members(mems).c_str());
}
// Called by Paxos's acceptor.
void
-config::paxos_commit(unsigned instance, const std::string &value)
+config::paxos_commit(unsigned instance, const string &value)
{
- std::string m;
- std::vector<std::string> newmem;
+ vector<string> newmem;
lock ml(cfg_mutex);
members(value, newmem);
}
bool
-config::ismember(const std::string &m, unsigned vid)
+config::ismember(const string &m, unsigned vid)
{
lock ml(cfg_mutex);
- std::vector<std::string> v;
- get_view_wo(vid, v);
+ vector<string> v;
+ get_view(vid, v, ml);
return isamember(m, v);
}
bool
-config::add(const std::string &new_m, unsigned vid)
+config::add(const string &new_m, unsigned vid)
{
- std::vector<std::string> m;
- std::vector<std::string> curm;
+ vector<string> m;
+ vector<string> curm;
lock ml(cfg_mutex);
if (vid != my_view_id)
return false;
m = mems;
m.push_back(new_m);
curm = mems;
- std::string v = value(m);
- int nextvid = my_view_id + 1;
+ string v = value(m);
+ unsigned nextvid = my_view_id + 1;
bool r;
{
ml.unlock();
// caller should hold cfg_mutex
bool
-config::remove(const std::string &m)
+config::remove(const string &m)
{
adopt_lock ml(cfg_mutex);
tprintf("config::remove: my_view_id %d remove? %s\n",
my_view_id, m.c_str());
- std::vector<std::string> n;
+ vector<string> n;
for (unsigned i = 0; i < mems.size(); i++) {
if (mems[i] != m)
n.push_back(mems[i]);
}
- std::string v = value(n);
- std::vector<std::string> cmems = mems;
- int nextvid = my_view_id + 1;
+ string v = value(n);
+ vector<string> cmems = mems;
+ unsigned nextvid = my_view_id + 1;
bool r;
{
ml.unlock();
}
void
-config::heartbeater()
+config::heartbeater() [[noreturn]]
{
- std::string m;
+ string m;
heartbeat_t h;
bool stable;
unsigned vid;
- std::vector<std::string> cmems;
+ vector<string> cmems;
lock ml(cfg_mutex);
while (1) {
- auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+ auto next_timeout = steady_clock::now() + seconds(3);
tprintf("heartbeater: go to sleep\n");
config_cond.wait_until(ml, next_timeout);
stable = true;
vid = my_view_id;
- get_view_wo(vid, cmems);
+ get_view(vid, cmems, ml);
tprintf("heartbeater: current membership %s\n",
print_members(cmems).c_str());
}
paxos_protocol::status
-config::heartbeat(int &r, std::string m, unsigned vid)
+config::heartbeat(int &r, string m, unsigned vid)
{
lock ml(cfg_mutex);
int ret = paxos_protocol::ERR;
}
config::heartbeat_t
-config::doheartbeat(const std::string &m)
+config::doheartbeat(const string &m)
{
adopt_lock ml(cfg_mutex);
int ret = rpc_const::timeout_failure;
- int r;
+ int r = 0;
unsigned vid = my_view_id;
heartbeat_t res = OK;