#include "tprintf.h"
#include "lang/verify.h"
+using namespace std::chrono;
+using std::string;
+using std::vector;
+using std::thread;
+using std::ostringstream;
+using std::istringstream;
+
// The config module maintains views. As a node joins or leaves a
// view, the next view will be the same as previous view, except with
// the new node added or removed. The first view contains only node
// date.
config::config(
- const std::string &_first,
- const std::string &_me,
+ const string &_first,
+ const string &_me,
config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc)
{
{
lock ml(cfg_mutex);
- reconstruct();
- std::thread(&config::heartbeater, this).detach();
+ reconstruct(ml);
+ thread(&config::heartbeater, this).detach();
}
}
void
-config::restore(const std::string &s)
+config::restore(const string &s)
{
lock ml(cfg_mutex);
paxos_acceptor->restore(s);
- reconstruct();
+ reconstruct(ml);
}
void
-config::get_view(unsigned instance, std::vector<std::string> &m)
+config::get_view(unsigned instance, vector<string> &m)
{
lock ml(cfg_mutex);
- get_view_wo(instance, m);
+ get_view(instance, m, ml);
}
// caller should hold cfg_mutex
void
-config::get_view_wo(unsigned instance, std::vector<std::string> &m)
+config::get_view(unsigned instance, vector<string> &m, lock &)
{
- std::string value = paxos_acceptor->value(instance);
+ string value = paxos_acceptor->value(instance);
tprintf("get_view(%d): returns %s\n", instance, value.c_str());
members(value, m);
}
void
-config::members(const std::string &value, std::vector<std::string> &view) const
+config::members(const string &value, vector<string> &view) const
{
- std::istringstream ist(value);
- std::string m;
+ istringstream ist(value);
+ string m;
view.clear();
- while (ist >> m) {
+ while (ist >> m)
view.push_back(m);
- }
}
-std::string
-config::value(const std::vector<std::string> &m) const
+string
+config::value(const vector<string> &m) const
{
- std::ostringstream ost;
+ ostringstream ost;
for (unsigned i = 0; i < m.size(); i++) {
ost << m[i];
ost << " ";
return ost.str();
}
-// caller should hold cfg_mutex
void
-config::reconstruct()
+config::reconstruct(lock &cfg_mutex_lock)
{
+ VERIFY(cfg_mutex_lock);
if (paxos_acceptor->instance() > 0) {
- std::string m;
my_view_id = paxos_acceptor->instance();
- get_view_wo(my_view_id, mems);
+ get_view(my_view_id, mems, cfg_mutex_lock);
tprintf("config::reconstruct: %d %s\n",
my_view_id, print_members(mems).c_str());
}
// Called by Paxos's acceptor.
void
-config::paxos_commit(unsigned instance, const std::string &value)
+config::paxos_commit(unsigned instance, const string &value)
{
- std::string m;
- std::vector<std::string> newmem;
+ vector<string> newmem;
lock ml(cfg_mutex);
members(value, newmem);
}
bool
-config::ismember(const std::string &m, unsigned vid)
+config::ismember(const string &m, unsigned vid)
{
lock ml(cfg_mutex);
- std::vector<std::string> v;
- get_view_wo(vid, v);
+ vector<string> v;
+ get_view(vid, v, ml);
return isamember(m, v);
}
bool
-config::add(const std::string &new_m, unsigned vid)
+config::add(const string &new_m, unsigned vid)
{
- std::vector<std::string> m;
- std::vector<std::string> curm;
+ vector<string> m;
+ vector<string> curm;
lock ml(cfg_mutex);
if (vid != my_view_id)
return false;
m = mems;
m.push_back(new_m);
curm = mems;
- std::string v = value(m);
- int nextvid = my_view_id + 1;
+ string v = value(m);
+ unsigned nextvid = my_view_id + 1;
bool r;
{
ml.unlock();
// caller should hold cfg_mutex
bool
-config::remove(const std::string &m)
+config::remove(const string &m)
{
adopt_lock ml(cfg_mutex);
tprintf("config::remove: my_view_id %d remove? %s\n",
my_view_id, m.c_str());
- std::vector<std::string> n;
+ vector<string> n;
for (unsigned i = 0; i < mems.size(); i++) {
if (mems[i] != m)
n.push_back(mems[i]);
}
- std::string v = value(n);
- std::vector<std::string> cmems = mems;
- int nextvid = my_view_id + 1;
+ string v = value(n);
+ vector<string> cmems = mems;
+ unsigned nextvid = my_view_id + 1;
bool r;
{
ml.unlock();
}
void
-config::heartbeater()
+config::heartbeater() [[noreturn]]
{
- std::string m;
+ string m;
heartbeat_t h;
bool stable;
unsigned vid;
- std::vector<std::string> cmems;
+ vector<string> cmems;
lock ml(cfg_mutex);
while (1) {
- auto next_timeout = std::chrono::steady_clock::now() + std::chrono::seconds(3);
+ auto next_timeout = steady_clock::now() + seconds(3);
tprintf("heartbeater: go to sleep\n");
config_cond.wait_until(ml, next_timeout);
stable = true;
vid = my_view_id;
- get_view_wo(vid, cmems);
+ get_view(vid, cmems, ml);
tprintf("heartbeater: current membership %s\n",
print_members(cmems).c_str());
}
paxos_protocol::status
-config::heartbeat(int &r, std::string m, unsigned vid)
+config::heartbeat(int &r, string m, unsigned vid)
{
lock ml(cfg_mutex);
int ret = paxos_protocol::ERR;
}
config::heartbeat_t
-config::doheartbeat(const std::string &m)
+config::doheartbeat(const string &m)
{
adopt_lock ml(cfg_mutex);
int ret = rpc_const::timeout_failure;
- int r;
+ int r = 0;
unsigned vid = my_view_id;
heartbeat_t res = OK;
class config_view_change {
public:
virtual void commit_change(unsigned view_id) = 0;
- virtual ~config_view_change() {};
+ virtual ~config_view_change() {}
};
class config : public paxos_change {
paxos_protocol::status heartbeat(int &r, std::string m, unsigned instance);
std::string value(const std::vector<std::string> &mems) const;
void members(const std::string &v, std::vector<std::string> &m) const;
- void get_view_wo(unsigned instance, std::vector<std::string> &m);
+ void get_view(unsigned instance, std::vector<std::string> &m, lock &cfg_mutex_lock);
bool remove(const std::string &);
- void reconstruct();
+ void reconstruct(lock &cfg_mutex_lock);
typedef enum {
OK, // response and same view #
VIEWERR, // response but different view #
const std::string &_me,
config_view_change *_vc);
unsigned view_id() { return my_view_id; }
- const std::string &myaddr() const { return me; };
- std::string dump() { return paxos_acceptor->dump(); };
+ const std::string &myaddr() const { return me; }
+ std::string dump() { return paxos_acceptor->dump(); }
void get_view(unsigned instance, std::vector<std::string> &m);
void restore(const std::string &s);
bool add(const std::string &, unsigned view_id);
#include "paxos.h"
#include <fstream>
#include <iostream>
+#include "tprintf.h"
// Paxos must maintain some durable state (i.e., that survives power
// failures) to run Paxos correct. This module implements a log with
unsigned instance;
from.open(name.c_str());
- printf ("logread\n");
+ LOG("logread");
while (from >> type) {
if (type == "done") {
std::string v;
getline(from, v);
pxs->values[instance] = v;
pxs->instance_h = instance;
- printf ("logread: instance: %d w. v = %s\n", instance,
- pxs->values[instance].c_str());
+ LOG("logread: instance: " << instance << " w. v = " <<
+ pxs->values[instance]);
pxs->v_a.clear();
pxs->n_h.n = 0;
pxs->n_a.n = 0;
} else if (type == "propseen") {
from >> pxs->n_h.n;
from >> pxs->n_h.m;
- printf("logread: high update: %d(%s)\n", pxs->n_h.n, pxs->n_h.m.c_str());
+ LOG("logread: high update: " << pxs->n_h.n << "(" << pxs->n_h.m << ")");
} else if (type == "accepted") {
std::string v;
from >> pxs->n_a.n;
from.get();
getline(from, v);
pxs->v_a = v;
- printf("logread: prop update %d(%s) with v = %s\n", pxs->n_a.n,
- pxs->n_a.m.c_str(), pxs->v_a.c_str());
+ LOG("logread: prop update " << pxs->n_a.n << "(" << pxs->n_a.m << ") with v = " << pxs->v_a);
} else {
- printf("logread: unknown log record\n");
+ LOG("logread: unknown log record");
VERIFY(0);
}
}
log::restore(std::string s)
{
std::ofstream f;
- printf("restore: %s\n", s.c_str());
+ LOG("restore: " << s);
f.open(name.c_str(), std::ios::trunc);
f << s;
f.close();
}
bool
-connection::send(char *b, int sz)
+connection::send(char *b, size_t sz)
{
lock ml(m_);
waiters_++;
} else {
//should be rare to need to explicitly add write callback
PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
- while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) {
+ while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) {
send_complete_.wait(ml);
}
}
PollMgr::Instance()->del_callback(fd_, CB_RDWR);
dead_ = true;
} else {
- VERIFY(wpdu_.solong >= 0);
+ VERIFY(wpdu_.solong != size_t_max);
if (wpdu_.solong < wpdu_.sz) {
return;
}
bool
connection::writepdu()
{
- VERIFY(wpdu_.solong >= 0);
+ VERIFY(wpdu_.solong != size_t_max);
if (wpdu_.solong == wpdu_.sz)
return true;
if (wpdu_.solong == 0) {
- int sz = htonl(wpdu_.sz);
+ uint32_t sz = htonl((uint32_t)wpdu_.sz);
bcopy(&sz,wpdu_.buf,sizeof(sz));
}
- int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
+ ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno);
- wpdu_.solong = -1;
+ wpdu_.solong = size_t_max;
wpdu_.sz = 0;
}
return (errno == EAGAIN);
}
- wpdu_.solong += n;
+ wpdu_.solong += (size_t)n;
return true;
}
connection::readpdu()
{
if (!rpdu_.sz) {
- int sz, sz1;
- int n = read(fd_, &sz1, sizeof(sz1));
+ uint32_t sz1;
+ ssize_t n = read(fd_, &sz1, sizeof(sz1));
if (n == 0) {
return false;
return false;
}
- if (n >0 && n!= sizeof(sz)) {
+ if (n > 0 && n != sizeof(sz1)) {
jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n");
return false;
}
- sz = ntohl(sz1);
+ size_t sz = ntohl(sz1);
if (sz > MAX_PDU) {
char *tmpb = (char *)&sz1;
- jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz,
+ jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %lu network order=%x %x %x %x %x\n", sz,
sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]);
return false;
}
rpdu_.sz = sz;
VERIFY(rpdu_.buf == NULL);
- rpdu_.buf = (char *)malloc(sz+sizeof(sz));
+ rpdu_.buf = (char *)malloc(sz+sizeof(sz1));
VERIFY(rpdu_.buf);
- bcopy(&sz1,rpdu_.buf,sizeof(sz));
- rpdu_.solong = sizeof(sz);
+ bcopy(&sz1,rpdu_.buf,sizeof(sz1));
+ rpdu_.solong = sizeof(sz1);
}
- int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
+ ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong);
if (n <= 0) {
if (errno == EAGAIN)
return true;
rpdu_.sz = rpdu_.solong = 0;
return (errno == EAGAIN);
}
- rpdu_.solong += n;
+ rpdu_.solong += (size_t)n;
return true;
}
-tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest)
+tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest)
: mgr_(m1), lossy_(lossytest)
{
struct sockaddr_in sin;
#include <thread>
#include <map>
+#include <limits>
#include "pollmgr.h"
+constexpr size_t size_t_max = std::numeric_limits<size_t>::max();
+
class thread_exit_exception : std::exception {
};
class chanmgr {
public:
- virtual bool got_pdu(connection *c, char *b, int sz) = 0;
+ virtual bool got_pdu(connection *c, char *b, size_t sz) = 0;
virtual ~chanmgr() {}
};
public:
struct charbuf {
charbuf(): buf(NULL), sz(0), solong(0) {}
- charbuf (char *b, int s) : buf(b), sz(s), solong(0){}
+ charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){}
char *buf;
- int sz;
- int solong; //amount of bytes written or read so far
+ size_t sz;
+ size_t solong; // number of bytes written or read so far
};
connection(chanmgr *m1, int f1, int lossytest=0);
bool isdead();
void closeconn();
- bool send(char *b, int sz);
+ bool send(char *b, size_t sz);
void write_cb(int s);
void read_cb(int s);
class tcpsconn {
public:
- tcpsconn(chanmgr *m1, int port, int lossytest=0);
+ tcpsconn(chanmgr *m1, unsigned int port, int lossytest=0);
~tcpsconn();
- inline int port() { return port_; }
+ inline unsigned int port() { return port_; }
void accept_conn();
private:
- int port_;
+ unsigned int port_;
std::mutex m_;
std::thread th_;
int pipe_[2];
}
};
-unmarshall& operator>>(unmarshall &, bool &);
-unmarshall& operator>>(unmarshall &, unsigned char &);
-unmarshall& operator>>(unmarshall &, char &);
-unmarshall& operator>>(unmarshall &, unsigned short &);
-unmarshall& operator>>(unmarshall &, short &);
-unmarshall& operator>>(unmarshall &, unsigned int &);
-unmarshall& operator>>(unmarshall &, int &);
-unmarshall& operator>>(unmarshall &, unsigned long long &);
-unmarshall& operator>>(unmarshall &, std::string &);
-
template <class A> unmarshall & operator>>(unmarshall &u, A &x) {
unsigned n = u.grab<unsigned>();
x.clear();
//
// this function keeps no reference for connection *c
bool
-rpcc::got_pdu(connection *c, char *b, int sz)
+rpcc::got_pdu(connection *c, char *b, size_t sz)
{
unmarshall rep(b, sz);
reply_header h;
}
bool
-rpcs::got_pdu(connection *c, char *b, int sz)
+rpcs::got_pdu(connection *c, char *b, size_t sz)
{
if(!reachable_){
jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n");
int call1(unsigned int proc,
marshall &req, unmarshall &rep, TO to);
- bool got_pdu(connection *c, char *b, int sz);
+ bool got_pdu(connection *c, char *b, size_t sz);
template<class R>
void set_reachable(bool r) { reachable_ = r; }
- bool got_pdu(connection *c, char *b, int sz);
+ bool got_pdu(connection *c, char *b, size_t sz);
template<class F, class C=void> void reg(unsigned int proc, F f, C *c=nullptr);
};
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = new rpcs(atoi(_me.c_str()) + 1);
+ testsvr = new rpcs((uint32_t)std::stoi(_me) + 1);
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
// The recovery thread runs this function
-void rsm::recovery() {
+void rsm::recovery() [[noreturn]] {
bool r = true;
lock ml(rsm_mutex);
// XXX iannucci 2013/09/15 -- I don't understand whether accessing
// cfg->view_id in this manner involves a race. I suspect not.
if (join(primary)) {
- tprintf("recovery: joined\n");
+ LOG("recovery: joined");
commit_change_wo(cfg->view_id());
} else {
ml.unlock();
}
}
vid_insync = vid_commit;
- tprintf("recovery: sync vid_insync %d\n", vid_insync);
+ LOG("recovery: sync vid_insync " << vid_insync);
if (primary == cfg->myaddr()) {
r = sync_with_backups();
} else {
r = sync_with_primary();
}
- tprintf("recovery: sync done\n");
+ LOG("recovery: sync done");
// If there was a commited viewchange during the synchronization, restart
// the recovery
myvs.seqno = 1;
inviewchange = false;
}
- tprintf("recovery: go to sleep %d %d\n", insync, inviewchange);
+ LOG("recovery: go to sleep " << insync << " " << inviewchange);
recovery_cond.wait(ml);
}
}
// Make sure that the state of lock_server is stable during
// synchronization; otherwise, the primary's state may be more recent
// than replicas after the synchronization.
- lock ml(invoke_mutex);
+ lock ml2(invoke_mutex);
// By acquiring and releasing the invoke_mutex once, we make sure that
// the state of lock_server will not be changed until all
// replicas are synchronized. The reason is that client_invoke arrives
int ret = 0;
rsm_protocol::joinres r;
- tprintf("rsm::join: %s mylast (%d,%d)\n", m.c_str(), last_myvs.vid,
- last_myvs.seqno);
+ LOG("rsm::join: " << m << " mylast (" << last_myvs.vid << "," << last_myvs.seqno << ")");
rpcc *cl;
{
adopt_lock ml(rsm_mutex);
}
if (cl == 0 || ret != rsm_protocol::OK) {
- tprintf("rsm::join: couldn't reach %s %p %d\n", m.c_str(),
- cl, ret);
+ LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret);
return false;
}
- tprintf("rsm::join: succeeded %s\n", r.log.c_str());
+ LOG("rsm::join: succeeded " << r.log);
cfg->restore(r.log);
return true;
}
std::string myaddr;
viewstamp vs;
{
- lock ml(rsm_mutex);
+ lock ml2(rsm_mutex);
LOG("Checking for inviewchange");
if (inviewchange)
return rsm_client_protocol::BUSY;
if (!cl)
return rsm_client_protocol::BUSY;
rsm_protocol::status ret;
- int r;
- ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), r, procno, vs, req);
+ int ignored_rval;
+ ret = cl->call_timeout(rsm_protocol::invoke, rpcc::to(1000), ignored_rval, procno, vs, req);
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
std::vector<std::string> m;
std::string myaddr;
{
- lock ml(rsm_mutex);
+ lock ml2(rsm_mutex);
// check if !inviewchange
LOG("Checking for view change");
if (inviewchange)
* so the client can switch to a different primary
* when it existing primary fails
*/
-rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int i) {
+rsm_client_protocol::status rsm::client_members(std::vector<std::string> &r, int) {
std::vector<std::string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
void commit_change_wo(unsigned vid);
public:
rsm (std::string _first, std::string _me);
- ~rsm() {};
+ ~rsm() {}
bool amiprimary();
- void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; };
+ void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
void recovery();
void commit_change(unsigned vid);
#include <unistd.h>
#include "lang/verify.h"
#include "lock.h"
+#include "tprintf.h"
rsm_client::rsm_client(std::string dst) {
- printf("create rsm_client\n");
+ LOG("create rsm_client");
std::vector<std::string> mems;
sockaddr_in dstsock;
lock ml(rsm_client_mutex);
VERIFY (init_members());
}
- printf("rsm_client: done\n");
+ LOG("rsm_client: done");
}
// Assumes caller holds rsm_client_mutex
known_mems.pop_back();
}
-rsm_protocol::status rsm_client::invoke(int proc, std::string &rep, const std::string &req) {
+rsm_protocol::status rsm_client::invoke(unsigned int proc, std::string &rep, const std::string &req) {
int ret = 0;
lock ml(rsm_client_mutex);
while (1) {
- printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
+ LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary);
handle h(primary);
ml.unlock();
if (!cl)
goto prim_fail;
- printf("rsm_client::invoke proc %x primary %s ret %d\n", proc,
- primary.c_str(), ret);
+ LOG("rsm_client::invoke proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret);
if (ret == rsm_client_protocol::OK)
break;
if (ret == rsm_client_protocol::BUSY) {
- printf("rsm is busy %s\n", primary.c_str());
+ LOG("rsm is busy " << primary);
sleep(3);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
- printf("primary %s isn't the primary--let's get a complete list of mems\n",
- primary.c_str());
+ LOG("primary " << primary << " isn't the primary--let's get a complete list of mems");
if (init_members())
continue;
}
prim_fail:
- printf("primary %s failed ret %d\n", primary.c_str(), ret);
+ LOG("primary " << primary << " failed ret " << std::dec << ret);
primary_failure();
- printf ("rsm_client::invoke: retry new primary %s\n", primary.c_str());
+ LOG("rsm_client::invoke: retry new primary " << primary);
}
return ret;
}
bool rsm_client::init_members() {
- printf("rsm_client::init_members get members!\n");
+ LOG("rsm_client::init_members get members!");
handle h(primary);
- int ret;
+ int ret = rsm_client_protocol::ERR;
rpcc *cl;
{
adopt_lock ml(rsm_client_mutex);
ml.unlock();
cl = h.safebind();
- if (cl) {
+ if (cl)
ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(1000), known_mems, 0);
- }
ml.lock();
}
if (cl == 0 || ret != rsm_protocol::OK)
return false;
if (known_mems.size() < 1) {
- printf("rsm_client::init_members do not know any members!\n");
+ LOG("rsm_client::init_members do not know any members!");
VERIFY(0);
}
primary = known_mems.back();
known_mems.pop_back();
- printf("rsm_client::init_members: primary %s\n", primary.c_str());
+ LOG("rsm_client::init_members: primary " << primary);
return true;
}
bool init_members();
public:
rsm_client(std::string dst);
- rsm_protocol::status invoke(int proc, std::string &rep, const std::string &req);
+ rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req);
template<class R, class ...Args>
int call(unsigned int proc, R & r, const Args & ...a1);
struct viewstamp {
- viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) {
- vid = _vid;
- seqno = _seqno;
- };
+ viewstamp (unsigned int _vid = 0, unsigned int _seqno = 0) :
+ vid(_vid), seqno(_seqno) {}
unsigned int vid;
unsigned int seqno;
- inline void operator++(int) {
- seqno++;
- };
+ inline void operator++(int) { seqno++; }
};
class rsm_protocol {
#include <stdio.h>
#include <string>
+char tprintf_thread_prefix = 't';
+
int
main(int argc, char *argv[])
{
rpcc *cl;
public:
rsmtest_client(std::string d);
- virtual ~rsmtest_client() {};
+ virtual ~rsmtest_client() {}
virtual rsm_test_protocol::status net_repair(int heal);
virtual rsm_test_protocol::status breakpoint(int b);
};