-Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
-Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
-Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++
-OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins
+OPTFLAGS = -O3 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins
STDLIB = -stdlib=libc++
#STDLIB =
CXX = clang++-mp-3.4
// all views, the other nodes can bring this re-joined node up to
// date.
-config::config(const string &_first, const string &_me, config_view_change *_vc)
+config::config(const string & _first, const string & _me, config_view_change *_vc)
: my_view_id(0), first(_first), me(_me), vc(_vc),
paxos(this, me == _first, me, me)
{
thread(&config::heartbeater, this).detach();
}
-void config::restore(const string &s) {
+void config::restore(const string & s) {
lock cfg_mutex_lock(cfg_mutex);
paxos.restore(s);
reconstruct(cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> &m) {
+void config::get_view(unsigned instance, vector<string> & m) {
lock cfg_mutex_lock(cfg_mutex);
get_view(instance, m, cfg_mutex_lock);
}
-void config::get_view(unsigned instance, vector<string> &m, lock &) {
+void config::get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
string value = paxos.value(instance);
LOG("get_view(" << instance << "): returns " << value);
- m = members(value);
-}
-
-vector<string> config::members(const string &value) const {
- return explode(value);
+ m = explode(value);
}
-string config::value(const vector<string> &members) const {
- return implode(members);
-}
-
-void config::reconstruct(lock &cfg_mutex_lock) {
+void config::reconstruct(lock & cfg_mutex_lock) {
VERIFY(cfg_mutex_lock);
my_view_id = paxos.instance();
if (my_view_id > 0) {
}
// Called by Paxos's acceptor.
-void config::paxos_commit(unsigned instance, const string &value) {
+void config::paxos_commit(unsigned instance, const string & value) {
lock cfg_mutex_lock(cfg_mutex);
- vector<string> newmem = members(value);
+ vector<string> newmem = explode(value);
LOG("instance " << instance << ": " << newmem);
for (auto mem : mems) {
}
}
-bool config::ismember(const string &m, unsigned vid) {
+bool config::ismember(const string & m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
vector<string> v;
get_view(vid, v, cfg_mutex_lock);
return isamember(m, v);
}
-bool config::add(const string &new_m, unsigned vid) {
+bool config::add(const string & new_m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
LOG("adding " << new_m << " to " << vid);
if (vid != my_view_id) {
LOG("calling down to paxos layer");
vector<string> m(mems), cmems(mems);
m.push_back(new_m);
- LOG("old mems " << cmems << " " << value(cmems));
- LOG("new mems " << m << " " << value(m));
+ LOG("old mems " << cmems << " " << implode(cmems));
+ LOG("new mems " << m << " " << implode(m));
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos.run(nextvid, cmems, value(m));
+ bool r = paxos.run(nextvid, cmems, implode(m));
cfg_mutex_lock.lock();
LOG("paxos proposer returned " << (r ? "success" : "failure"));
return r;
}
// caller should hold cfg_mutex
-bool config::remove(const string &m, lock &cfg_mutex_lock) {
+bool config::remove(const string & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
LOG("my_view_id " << my_view_id << " remove? " << m);
vector<string> n;
for (auto mem : mems) {
vector<string> cmems = mems;
unsigned nextvid = my_view_id + 1;
cfg_mutex_lock.unlock();
- bool r = paxos.run(nextvid, cmems, value(n));
+ bool r = paxos.run(nextvid, cmems, implode(n));
cfg_mutex_lock.lock();
LOG("proposer returned " << (r ? "success" : "failure"));
return r;
}
}
-paxos_protocol::status config::heartbeat(int &r, string m, unsigned vid) {
+paxos_protocol::status config::heartbeat(int & r, string m, unsigned vid) {
lock cfg_mutex_lock(cfg_mutex);
r = (int) my_view_id;
LOG("heartbeat from " << m << "(" << vid << ") my_view_id " << my_view_id);
return paxos_protocol::ERR;
}
-config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) {
+config::heartbeat_t config::doheartbeat(const string & m, lock & cfg_mutex_lock) {
+ VERIFY(cfg_mutex_lock);
unsigned vid = my_view_id;
LOG("heartbeat to " << m << " (" << vid << ")");
handle h(m);
vector<string> mems;
mutex cfg_mutex;
cond config_cond;
- paxos_protocol::status heartbeat(int &r, string m, unsigned instance);
- string value(const vector<string> &mems) const;
- vector<string> members(const string &v) const;
- void get_view(unsigned instance, vector<string> &m, lock &cfg_mutex_lock);
- bool remove(const string &, lock &cfg_mutex_lock);
- void reconstruct(lock &cfg_mutex_lock);
+ paxos_protocol::status heartbeat(int & r, string m, unsigned instance);
+ void get_view(unsigned instance, vector<string> & m, lock & cfg_mutex_lock);
+ bool remove(const string &, lock & cfg_mutex_lock);
+ void reconstruct(lock & cfg_mutex_lock);
typedef enum {
OK, // response and same view #
VIEWERR, // response but different view #
FAILURE, // no response
} heartbeat_t;
- heartbeat_t doheartbeat(const string &m, lock &cfg_mutex_lock);
+ heartbeat_t doheartbeat(const string & m, lock & cfg_mutex_lock);
public:
- config(const string &_first, const string &_me, config_view_change *_vc);
+ config(const string & _first, const string & _me, config_view_change *_vc);
unsigned view_id() { return my_view_id; }
- const string &myaddr() const { return me; }
+ const string & myaddr() const { return me; }
string dump() { return paxos.dump(); }
- void get_view(unsigned instance, vector<string> &m);
- void restore(const string &s);
+ void get_view(unsigned instance, vector<string> & m);
+ void restore(const string & s);
bool add(const string &, unsigned view_id);
- bool ismember(const string &m, unsigned view_id);
+ bool ismember(const string & m, unsigned view_id);
void heartbeater NORETURN ();
- void paxos_commit(unsigned instance, const string &v);
+ void paxos_commit(unsigned instance, const string & v);
rpcs *get_rpcs() { return paxos.get_rpcs(); }
void breakpoint(int b) { paxos.breakpoint(b); }
};
release_fifo.deq(&lid);
LOG("Releaser: " << lid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
VERIFY(st.state == lock_state::locked && st.held_by == releaser_thread.get_id());
st.state = lock_state::releasing;
}
lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) {
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
auto self = this_thread::get_id();
}
lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) {
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
auto self = this_thread::get_id();
VERIFY(st.state == lock_state::locked && st.held_by == self);
rlock_protocol::status lock_client::revoke_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t xid) {
LOG("Revoke handler " << lid << " " << xid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
if (st.state == lock_state::releasing || st.state == lock_state::none)
}
rlock_protocol::status lock_client::retry_handler(int &, lock_protocol::lockid_t lid, lock_protocol::xid_t) {
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
VERIFY(st.state == lock_state::acquiring);
st.state = lock_state::retrying;
fifo<lock_protocol::lockid_t> release_fifo;
mutex lock_table_lock;
lock_map lock_table;
- lock_state &get_lock_state(lock_protocol::lockid_t lid);
+ lock_state & get_lock_state(lock_protocol::lockid_t lid);
public:
static in_port_t last_port;
lock_client(string xdst, lock_release_user *l = 0);
{
}
-lock_state::lock_state(const lock_state &other) {
+lock_state::lock_state(const lock_state & other) {
*this = other;
}
-lock_state& lock_state::operator=(const lock_state& o) {
+lock_state & lock_state::operator=(const lock_state & o) {
held = o.held;
held_by = o.held_by;
wanted_by = o.wanted_by;
return lock_table[lid];
}
-lock_server::lock_server(rsm *r) : rsm_ (r) {
+lock_server::lock_server(rsm & r) : rsm_ (&r) {
thread(&lock_server::revoker, this).detach();
thread(&lock_server::retryer, this).detach();
- rsm_->set_state_transfer(this);
+ r.set_state_transfer(this);
+
+ r.reg(lock_protocol::acquire, &lock_server::acquire, this);
+ r.reg(lock_protocol::release, &lock_server::release, this);
+ r.reg(lock_protocol::stat, &lock_server::stat, this);
}
void lock_server::revoker () {
if (rsm_ && !rsm_->amiprimary())
continue;
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
holder_t held_by;
{
lock sl(st.m);
continue;
LOG("Sending retry for " << lid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
holder_t front;
{
lock sl(st.m);
lock_protocol::status lock_server::acquire(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
holder_t h = holder_t(id, xid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
// deal with duplicated requests
lock_protocol::status lock_server::release(int &, lock_protocol::lockid_t lid, const callback_t & id, lock_protocol::xid_t xid) {
LOG("lid=" << lid << " client=" << id << "," << xid);
- lock_state &st = get_lock_state(lid);
+ lock_state & st = get_lock_state(lid);
lock sl(st.m);
if (st.held && st.held_by == holder_t(id, xid)) {
st.held = false;
string lock_server::marshal_state() {
lock sl(lock_table_lock);
- marshall rep;
- rep << nacquire << lock_table;
- return rep.content();
+ return marshall(nacquire, lock_table).content();
}
void lock_server::unmarshal_state(const string & state) {
lock sl(lock_table_lock);
- unmarshall rep(state, false);
- rep >> nacquire >> lock_table;
+ unmarshall(state, false, nacquire, lock_table);
}
-lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid, const callback_t &) {
+lock_protocol::status lock_server::stat(int & r, lock_protocol::lockid_t lid, const callback_t &) {
LOG("stat request for " << lid);
VERIFY(0);
r = nacquire;
class lock_state {
public:
lock_state();
- lock_state(const lock_state &other);
+ lock_state(const lock_state & other);
bool held;
holder_t held_by;
list<holder_t> wanted_by;
map<callback_t, lock_protocol::xid_t> old_requests;
mutex m;
- lock_state& operator=(const lock_state&);
+ lock_state & operator=(const lock_state &);
MEMBERS(held, held_by, wanted_by)
};
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
-class lock_server : public rsm_state_transfer {
+class lock_server : private rsm_state_transfer {
private:
int nacquire;
mutex lock_table_lock;
lock_map lock_table;
- lock_state &get_lock_state(lock_protocol::lockid_t lid);
+ lock_state & get_lock_state(lock_protocol::lockid_t lid);
fifo<lock_protocol::lockid_t> retry_fifo;
fifo<lock_protocol::lockid_t> revoke_fifo;
rsm *rsm_;
- public:
- lock_server(rsm *r = 0);
- void revoker NORETURN ();
- void retryer NORETURN ();
string marshal_state();
void unmarshal_state(const string & state);
+ void revoker NORETURN ();
+ void retryer NORETURN ();
+ public:
+ lock_server(rsm & r);
lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
lock_protocol::status release(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
lock_protocol::status stat(int &, lock_protocol::lockid_t, const callback_t & id);
}
rsm rsm(argv[1], argv[2]);
- lock_server ls(&rsm);
- rsm.set_state_transfer(&ls);
-
- rsm.reg(lock_protocol::acquire, &lock_server::acquire, &ls);
- rsm.reg(lock_protocol::release, &lock_server::release, &ls);
- rsm.reg(lock_protocol::stat, &lock_server::stat, &ls);
+ lock_server ls(rsm);
rsm.start();
}
// check if l2 contains a majority of the elements of l1
-bool majority(const nodes_t &l1, const nodes_t &l2) {
+bool majority(const nodes_t & l1, const nodes_t & l2) {
auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
return overlap >= (l1.size() >> 1) + 1;
}
unsigned instance() { return instance_h; }
const value_t & value(unsigned instance) { return values[instance]; }
string dump() { return l.dump(); }
- void restore(const string &s) { l.restore(s); l.logread(); }
+ void restore(const string & s) { l.restore(s); l.logread(); }
rpcs *get_rpcs() { return &pxs; }
bool run(unsigned instance, const nodes_t & cnodes, const value_t & v);
VERIFY(!wpdu_.buf.size());
}
-shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
+shared_ptr<connection> connection::to_dst(const sockaddr_in & dst, connection_delegate * delegate, int lossy) {
socket_t s = socket(AF_INET, SOCK_STREAM, 0);
s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
bool send(const string & b);
- static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
+ static shared_ptr<connection> to_dst(const sockaddr_in & dst, connection_delegate *mgr, int lossy=0);
const time_point<steady_clock> create_time = steady_clock::now();
const file_t fd;
#include "types.h"
#include "rpc_protocol.h"
-class marshall;
-class unmarshall;
-
//
// Marshall and unmarshall objects
//
public:
template <typename... Args>
- marshall(const Args&... args) {
+ marshall(const Args & ... args) {
(void)pass{(*this << args)...};
}
inline string content() const { return buf_.substr(rpc_protocol::RPC_HEADER_SZ,index_-rpc_protocol::RPC_HEADER_SZ); }
// letting S be a defaulted template parameter forces the compiler to
- // delay looking up operator<<(marshall&, rpc_sz_t) until we define it
+ // delay looking up operator<<(marshall &, rpc_sz_t) until we define it
// (i.e. we define an operator for marshalling uint32_t)
template <class T, class S=rpc_protocol::rpc_sz_t> inline void
pack_header(const T & h) {
bool ok_ = false;
public:
- unmarshall(const string &s, bool has_header)
+ template <typename... Args>
+ unmarshall(const string & s, bool has_header, Args && ... args)
: buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) {
if (!has_header)
buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
+ (void)pass{(*this >> args)...};
}
bool ok() const { return ok_; }
//
#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
-inline marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
-inline unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
+inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
}
template <class... Args, size_t... Indices> inline unmarshall &
-tuple_unmarshall_imp(unmarshall & u, tuple<Args &...> t, tuple_indices<Indices...>) {
+tuple_unmarshall_imp(unmarshall & u, tuple<Args & ...> t, tuple_indices<Indices...>) {
(void)pass{(u >> get<Indices>(t))...};
return u;
}
template <class... Args> unmarshall &
-operator>>(unmarshall & u, tuple<Args &...> && t) {
+operator>>(unmarshall & u, tuple<Args & ...> && t) {
using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
return tuple_unmarshall_imp(u, t, Indices());
}
// Implements struct marshalling via tuple marshalling of members.
#define MARSHALLABLE_STRUCT(_c_) \
-inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
-inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
+inline unmarshall & operator>>(unmarshall & u, _c_ & a) { return u >> a._tuple_(); } \
+inline marshall & operator<<(marshall & m, const _c_ a) { return m << a._tuple_(); }
// our first two marshallable structs...
MARSHALLABLE_STRUCT(rpc_protocol::request_header)
// this overload is visible for type A only if A::cbegin and A::cend exist
template <class A> inline typename
enable_if<is_const_iterable<A>::value, marshall>::type &
-operator<<(marshall &m, const A &x) {
+operator<<(marshall & m, const A & x) {
m << (unsigned int)x.size();
- for (const auto &a : x)
+ for (const auto & a : x)
m << a;
return m;
}
// visible for type A if A::emplace_back(a) makes sense
template <class A> inline typename
enable_if<supports_emplace_back<A>::value, unmarshall>::type &
-operator>>(unmarshall &u, A &x) {
+operator>>(unmarshall & u, A & x) {
unsigned n = u._grab<unsigned>();
x.clear();
while (n--)
// std::pair<A, B>
template <class A, class B> inline marshall &
-operator<<(marshall &m, const pair<A,B> &d) {
+operator<<(marshall & m, const pair<A,B> & d) {
return m << d.first << d.second;
}
template <class A, class B> inline unmarshall &
-operator>>(unmarshall &u, pair<A,B> &d) {
+operator>>(unmarshall & u, pair<A,B> & d) {
return u >> d.first >> d.second;
}
// std::map<A, B>
template <class A, class B> inline unmarshall &
-operator>>(unmarshall &u, map<A,B> &x) {
+operator>>(unmarshall & u, map<A,B> & x) {
unsigned n = u._grab<unsigned>();
x.clear();
while (n--)
}
// std::string
-inline marshall & operator<<(marshall &m, const string &s) {
+inline marshall & operator<<(marshall & m, const string & s) {
m << (uint32_t)s.size();
m.rawbytes(s.data(), s.size());
return m;
}
-inline unmarshall & operator>>(unmarshall &u, string &s) {
+inline unmarshall & operator>>(unmarshall & u, string & s) {
uint32_t sz = u._grab<uint32_t>();
if (u.ok()) {
s.resize(sz);
//
template <class E> typename enable_if<is_enum<E>::value, marshall>::type &
-operator<<(marshall &m, E e) {
+operator<<(marshall & m, E e) {
return m << from_enum(e);
}
template <class E> typename enable_if<is_enum<E>::value, unmarshall>::type &
-operator>>(unmarshall &u, E &e) {
+operator>>(unmarshall & u, E & e) {
e = to_enum<E>(u._grab<enum_type_t<E>>());
return u;
}
// Recursive marshalling
//
-inline marshall & operator<<(marshall &m, marshall &n) {
+inline marshall & operator<<(marshall & m, marshall & n) {
return m << n.content();
}
-inline unmarshall & operator>>(unmarshall &u, unmarshall &v) {
+inline unmarshall & operator>>(unmarshall & u, unmarshall & v) {
v = unmarshall(u._grab<string>(), false);
return u;
}
#include "marshall.h"
-typedef function<int(unmarshall &, marshall &)> handler;
+typedef function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
//
// Automatic marshalling wrappers for RPC handlers
// between various types of callable objects at this level of abstraction.
template <class F, class C, class ErrorHandler, class R, class RV, class... Args>
-struct marshalled_func_imp<F, C, RV(R&, Args...), ErrorHandler> {
+struct marshalled_func_imp<F, C, RV(R &, Args...), ErrorHandler> {
static inline handler *wrap(F f, C *c=nullptr) {
// This type definition corresponds to an empty struct with
// template parameters running from 0 up to (# args) - 1.
using ArgsStorage = tuple<typename decay<Args>::type...>;
// Allocate a handler (i.e. function) to hold the lambda
// which will unmarshall RPCs and call f.
- return new handler([=](unmarshall &u, marshall &m) -> RV {
+ return new handler([=](unmarshall && u, marshall & m) -> RV {
// Unmarshall each argument with the correct type and store the
// result in a tuple.
ArgsStorage t{u._grab<typename decay<Args>::type>()...};
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
}
-static sockaddr_in make_sockaddr(const string &hostandport);
+static sockaddr_in make_sockaddr(const string & hostandport);
-rpcc::rpcc(const string & d, bool retrans) :
- dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0),
- retrans_(retrans), reachable_(true), chan_(), destroy_wait_ (false), xid_rep_done_(-1)
+rpcc::rpcc(const string & d) : dst_(make_sockaddr(d))
{
- if (retrans) {
- set_rand_seed();
- clt_nonce_ = (nonce_t)random();
- } else {
- // special client nonce 0 means this client does not
- // require at-most-once logic from the server
- // because it uses tcp and never retries a failed connection
- clt_nonce_ = 0;
- }
+ set_rand_seed();
+ clt_nonce_ = (nonce_t)random();
char *loss_env = getenv("RPC_LOSSY");
if (loss_env)
lossytest_ = atoi(loss_env);
- // xid starts with 1 and latest received reply starts with 0
- xid_rep_window_.push_back(0);
-
IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_);
}
int rpcc::bind(milliseconds to) {
nonce_t r;
- int ret = call_timeout(rpc_protocol::bind, to, r);
+ rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r);
if (ret == 0) {
lock ml(m_);
bind_done_ = true;
lock ml(m_);
if (calls_.size()) {
LOG("force callers to fail");
- for (auto &p : calls_) {
+ for (auto & p : calls_) {
caller *ca = p.second;
IF_LEVEL(2) LOG("force caller to fail");
}
}
-int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
+int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) {
caller ca(0, &rep);
xid_t xid_rep;
while (1) {
if (transmit) {
- get_refconn(ch);
+ get_latest_connection(ch);
if (ch) {
if (reachable_) {
request forgot;
if (nextdeadline >= finaldeadline)
break;
- if (retrans_ && (!ch || ch->isdead())) {
- // since connection is dead, retransmit
- // on the new connection
+ // retransmit on new connection if connection is dead
+ if (!ch || ch->isdead())
transmit = true;
- }
}
{
// may need to update the xid again here, in case the
// packet times out before it's even sent by the channel.
// I don't think there's any harm in maybe doing it twice
- update_xid_rep(ca.xid);
+ update_xid_rep(ca.xid, ml);
if (destroy_wait_)
destroy_wait_c_.notify_one();
return (ca.done? ca.intret : rpc_protocol::timeout_failure);
}
-void rpcc::get_refconn(shared_ptr<connection> & ch) {
+void rpcc::get_latest_connection(shared_ptr<connection> & ch) {
lock ml(chan_m_);
if (!chan_ || chan_->isdead())
chan_ = connection::to_dst(dst_, this, lossytest_);
lock ml(m_);
- update_xid_rep(h.xid);
+ update_xid_rep(h.xid, ml);
if (calls_.find(h.xid) == calls_.end()) {
IF_LEVEL(2) LOG("xid " << h.xid << " no pending request");
return true;
}
-// assumes thread holds mutex m
-void
-rpcc::update_xid_rep(int xid)
-{
+void rpcc::update_xid_rep(xid_t xid, lock & m_lock) {
+ VERIFY(m_lock);
if (xid <= xid_rep_window_.front())
return;
}
}
-rpcs::rpcs(in_port_t p1)
- : port_(p1), reachable_ (true)
+rpcs::rpcs(in_port_t p1) : port_(p1)
{
set_rand_seed();
nonce_ = (nonce_t)random();
IF_LEVEL(2) LOG("created with nonce " << nonce_);
reg(rpc_protocol::bind, &rpcs::rpcbind, this);
- dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
}
void rpcs::start() {
// must delete listener before dispatchpool
listener_ = nullptr;
dispatchpool_ = nullptr;
- free_reply_window();
}
bool rpcs::got_pdu(const shared_ptr<connection> & c, const string & b) {
return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b));
}
-void rpcs::reg1(proc_id_t proc, handler *h) {
- lock pl(procs_m_);
- VERIFY(procs_.count(proc) == 0);
- procs_[proc] = h;
- VERIFY(procs_.count(proc) >= 1);
-}
-
void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
unmarshall req(buf, true);
f = procs_[proc];
}
- rpcs::rpcstate_t stat;
- string b1;
-
- if (h.clt_nonce) {
- // have i seen this client before?
- {
- lock rwl(reply_window_m_);
- // if we don't know about this clt_nonce, create a cleanup object
- if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
- VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
- reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
- IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
- " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
- }
- }
-
- // save the latest good connection to the client
- {
- lock rwl(conns_m_);
- if (conns_.find(h.clt_nonce) == conns_.end())
- conns_[h.clt_nonce] = c;
- else if (conns_[h.clt_nonce]->create_time < c->create_time)
- conns_[h.clt_nonce] = c;
+ // have i seen this client before?
+ {
+ lock rwl(reply_window_m_);
+ // if we don't know about this clt_nonce, create a cleanup object
+ if (reply_window_.find(h.clt_nonce) == reply_window_.end()) {
+ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
+ reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
+ IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
+ " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
}
+ }
- stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1);
- } else {
- // this client does not require at most once logic
- stat = NEW;
+ // save the latest good connection to the client
+ {
+ lock rwl(conns_m_);
+ if (conns_.find(h.clt_nonce) == conns_.end())
+ conns_[h.clt_nonce] = c;
+ else if (conns_[h.clt_nonce]->create_time < c->create_time)
+ conns_[h.clt_nonce] = c;
}
- switch (stat) {
+ string b1;
+
+ switch (check_duplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1)) {
case NEW: // new request
- rh.ret = (*f)(req, rep);
- if (rh.ret == rpc_protocol::unmarshal_args_failure) {
+ rh.ret = (*f)(forward<unmarshall>(req), rep);
+ if (rh.ret == rpc_protocol::unmarshall_args_failure) {
LOG("failed to unmarshall the arguments. You are " <<
"probably calling RPC 0x" << hex << proc << " with the wrong " <<
"types of arguments.");
IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " <<
h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce);
- if (h.clt_nonce > 0) {
- // only record replies for clients that require at-most-once logic
- add_reply(h.clt_nonce, h.xid, b1);
- }
+ add_reply(h.clt_nonce, h.xid, b1);
// get the latest connection to the client
{
// DONE: seen this xid, previous reply returned in b.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+rpcs::check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
xid_t xid_rep, string & b)
{
lock rwl(reply_window_m_);
- list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> & l = reply_window_[clt_nonce];
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
// rpcs::dispatch calls add_reply when it is sending a reply to an RPC,
// and passes the return value in b.
// add_reply() should remember b.
-// free_reply_window() and checkduplicate_and_update are responsible for
-// cleaning up the remembered values.
void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
lock rwl(reply_window_m_);
// remember the RPC reply value
- list<reply_t> &l = reply_window_[clt_nonce];
+ list<reply_t> & l = reply_window_[clt_nonce];
list<reply_t>::iterator it = l.begin();
// skip to our place in the list
for (it++; it != l.end() && it->xid < xid; it++);
}
}
-void rpcs::free_reply_window(void) {
- lock rwl(reply_window_m_);
- reply_window_.clear();
-}
-
-int rpcs::rpcbind(nonce_t &r) {
+rpc_protocol::status rpcs::rpcbind(nonce_t & r) {
IF_LEVEL(2) LOG("called return nonce " << nonce_);
r = nonce_;
return 0;
}
-static sockaddr_in make_sockaddr(const string &hostandport) {
+static sockaddr_in make_sockaddr(const string & hostandport) {
string host = "127.0.0.1";
string port = hostandport;
auto colon = hostandport.find(':');
cond c;
};
- void get_refconn(shared_ptr<connection> & ch);
- void update_xid_rep(xid_t xid);
+ void get_latest_connection(shared_ptr<connection> & ch);
+ void update_xid_rep(xid_t xid, lock & m_lock);
sockaddr_in dst_;
nonce_t clt_nonce_;
- nonce_t srv_nonce_;
- bool bind_done_;
- xid_t xid_;
- int lossytest_;
- bool retrans_;
- bool reachable_;
+ nonce_t srv_nonce_ = 0;
+ bool bind_done_ = false;
+ int lossytest_ = 0;
+ bool reachable_ = true;
shared_ptr<connection> chan_;
mutex m_; // protect insert/delete to calls[]
mutex chan_m_;
- bool destroy_wait_;
+ bool destroy_wait_ = false;
cond destroy_wait_c_;
map<int, caller *> calls_;
- list<xid_t> xid_rep_window_;
+
+ // xid starts with 1 and latest received reply starts with 0
+ xid_t xid_ = 1;
+ list<xid_t> xid_rep_window_ = {0};
struct request {
void clear() { buf.clear(); xid = -1; }
xid_t xid = -1;
};
request dup_req_;
- int xid_rep_done_;
+ int xid_rep_done_ = -1;
- int call1(proc_id_t proc, marshall &req, string &rep, milliseconds to);
+ int call1(proc_id_t proc, milliseconds to, string & rep, marshall & req);
template<class R>
- int call_m(proc_id_t proc, marshall &req, R & r, milliseconds to) {
+ inline int call_m(proc_id_t proc, milliseconds to, R & r, marshall && req) {
string rep;
- int intret = call1(proc, req, rep, to);
- unmarshall u(rep, true);
+ int intret = call1(proc, to, rep, req);
if (intret < 0) return intret;
- u >> r;
+ unmarshall u(rep, true, r);
if (u.okdone() != true) {
LOG("rpcc::call_m: failed to unmarshall the reply. You are probably " <<
"calling RPC 0x" << hex << proc << " with the wrong return type.");
VERIFY(0);
- return rpc_protocol::unmarshal_reply_failure;
+ return rpc_protocol::unmarshall_reply_failure;
}
return intret;
}
public:
- rpcc(const string & d, bool retrans=true);
+ rpcc(const string & d);
~rpcc();
nonce_t id() { return clt_nonce_; }
void cancel();
template<class P, class R, typename ...Args>
- inline int call(proc_t<P> proc, R & r, const Args&... args) {
+ inline int call(proc_t<P> proc, R & r, const Args & ... args) {
return call_timeout(proc, rpc::to_max, r, args...);
}
template<class P, class R, typename ...Args>
- inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args&... args) {
+ inline int call_timeout(proc_t<P> proc, milliseconds to, R & r, const Args & ... args) {
static_assert(is_valid_call<P, R, Args...>::value, "RPC called with incorrect argument types");
- marshall m{args...};
- return call_m(proc.id, m, r, to);
+ return call_m(proc.id, to, r, forward<marshall>(marshall(args...)));
}
};
// indexed by client nonce.
map<nonce_t, list<reply_t>> reply_window_;
- void free_reply_window(void);
void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
- rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+ rpcstate_t check_duplicate_and_update(nonce_t clt_nonce, xid_t xid,
xid_t rep_xid, string & b);
// latest connection to the client
map<nonce_t, shared_ptr<connection>> conns_;
- bool reachable_;
+ bool reachable_ = true;
// map proc # to function
map<proc_id_t, handler *> procs_;
void dispatch(shared_ptr<connection> c, const string & buf);
- // internal handler registration
- void reg1(proc_id_t proc, handler *);
-
- unique_ptr<thread_pool> dispatchpool_;
+ unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
unique_ptr<connection_listener> listener_;
// RPC handler for clients binding
- rpc_protocol::status rpcbind(nonce_t &r);
+ rpc_protocol::status rpcbind(nonce_t & r);
bool got_pdu(const shared_ptr<connection> & c, const string & b);
static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
- return rpc_protocol::unmarshal_args_failure;
+ return rpc_protocol::unmarshall_args_failure;
}
};
- reg1(proc.id, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
+ lock pl(procs_m_);
+ VERIFY(procs_.count(proc.id) == 0);
+ procs_[proc.id] = marshalled_func<F, ReturnOnFailure>::wrap(f, c);
+ VERIFY(procs_.count(proc.id) >= 1);
}
void start();
enum : status {
timeout_failure = -1,
- unmarshal_args_failure = -2,
- unmarshal_reply_failure = -3,
+ unmarshall_args_failure = -2,
+ unmarshall_reply_failure = -3,
atmostonce_failure = -4,
oldsrv_failure = -5,
bind_failure = -6,
class srv {
public:
int handle_22(string & r, const string a, const string b);
- int handle_fast(int &r, const int a);
- int handle_slow(int &r, const int a);
- int handle_bigrep(string &r, const size_t a);
+ int handle_fast(int & r, const int a);
+ int handle_slow(int & r, const int a);
+ int handle_bigrep(string & r, const size_t a);
};
namespace srv_protocol {
// rpcs::reg() decides how to unmarshall by looking
// at these argument types, so this function definition
// does what a .x file does in SunRPC.
-int srv::handle_22(string &r, const string a, string b) {
+int srv::handle_22(string & r, const string a, string b) {
r = a + b;
return 0;
}
-int srv::handle_fast(int &r, const int a) {
+int srv::handle_fast(int & r, const int a) {
r = a + 1;
return 0;
}
-int srv::handle_slow(int &r, const int a) {
+int srv::handle_slow(int & r, const int a) {
usleep(random() % 500);
r = a + 2;
return 0;
}
-int srv::handle_bigrep(string &r, const size_t len) {
+int srv::handle_bigrep(string & r, const size_t len) {
r = string(len, 'x');
return 0;
}
th_[i].join();
}
-bool thread_pool::addJob(const job_t &j) {
+bool thread_pool::addJob(const job_t & j) {
return jobq_.enq(j,blockadd_);
}
thread_pool(size_t sz, bool blocking=true);
~thread_pool();
- bool addJob(const job_t &j);
+ bool addJob(const job_t & j);
private:
size_t nthreads_;
#include "rsm_client.h"
#include <unistd.h>
-rsm::rsm(const string & _first, const string & _me) :
- stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0),
- partitioned (false), dopartition(false), break1(false), break2(false)
+rsm::rsm(const string & _first, const string & _me) : primary(_first)
{
cfg = unique_ptr<config>(new config(_first, _me, this));
rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this);
// tester must be on different port, otherwise it may partition itself
- testsvr = unique_ptr<rpcs>(new rpcs((in_port_t)stoi(_me) + 1));
+ testsvr.reset(new rpcs((in_port_t)stoi(_me) + 1));
testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this);
testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this);
}
thread(&rsm::recovery, this).detach();
}
-void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) {
- lock ml(rsm_mutex);
- procs[proc] = h;
-}
-
// The recovery thread runs this function
void rsm::recovery() {
bool r = true;
lock ml(rsm_mutex);
commit_change(vid, ml);
if (cfg->ismember(cfg->myaddr(), vid_commit))
- breakpoint2();
+ breakpoint(2);
}
void rsm::commit_change(unsigned vid, lock &) {
recovery_cond.notify_one();
sync_cond.notify_one();
if (cfg->ismember(cfg->myaddr(), vid_commit))
- breakpoint2();
+ breakpoint(2);
}
LOG("execute");
handler *h = procs[procno];
VERIFY(h);
- unmarshall args(req, false);
marshall rep;
- auto ret = (rsm_protocol::status)(*h)(args, rep);
- r = marshall{ret, rep.content()}.content();
+ auto ret = (rsm_protocol::status)(*h)(unmarshall(req, false), rep);
+ r = marshall(ret, rep.content()).content();
}
//
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
- breakpoint1();
+ breakpoint(1);
lock rsm_mutex_lock(rsm_mutex);
partition1(rsm_mutex_lock);
}
string r;
execute(proc, req, r);
last_myvs = vs;
- breakpoint1();
+ breakpoint(1);
return rsm_protocol::OK;
}
//
// RPC handler: Send back the local node's state to the caller
//
-rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, const string & src,
+rsm_protocol::status rsm::transferreq(rsm_protocol::transferres & r, const string & src,
viewstamp last, unsigned vid) {
lock ml(rsm_mutex);
LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" <<
// RPC handler: Responds with the list of known nodes for fall-back on a
// primary failure
//
-rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
+rsm_client_protocol::status rsm::client_members(vector<string> & r, int) {
vector<string> m;
lock ml(rsm_mutex);
cfg->get_view(vid_commit, m);
// Test RPCs -- simulate partitions and failures
-void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
+void rsm::net_repair(bool heal, lock & rsm_mutex_lock) {
+ VERIFY(rsm_mutex_lock);
vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
rsmrpc->set_reachable(heal);
}
-rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, int heal) {
+rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status & r, int heal) {
lock ml(rsm_mutex);
LOG("heal " << heal << " (dopartition " <<
dopartition << ", partitioned " << partitioned << ")");
// simulate failure at breakpoint 1 and 2
-void rsm::breakpoint1() {
- if (break1) {
- LOG("Dying at breakpoint 1 in rsm!");
- exit(1);
- }
-}
-
-void rsm::breakpoint2() {
- if (break2) {
- LOG("Dying at breakpoint 2 in rsm!");
+void rsm::breakpoint(int b) {
+ if (breakpoints[b-1]) {
+ LOG("Dying at breakpoint " << b << " in rsm!");
exit(1);
}
}
}
}
-rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status &r, int b) {
+rsm_test_protocol::status rsm::breakpointreq(rsm_test_protocol::status & r, int b) {
r = rsm_test_protocol::OK;
lock ml(rsm_mutex);
LOG("breakpoint " << b);
- if (b == 1) break1 = true;
- else if (b == 2) break2 = true;
+ if (b == 1) breakpoints[1-1] = true;
+ else if (b == 2) breakpoints[2-1] = true;
else if (b == 3 || b == 4) cfg->breakpoint(b);
else r = rsm_test_protocol::ERR;
return r;
};
class rsm : public config_view_change {
- private:
- void reg1(rpc_protocol::proc_id_t proc, handler *);
protected:
map<rpc_protocol::proc_id_t, handler *> procs;
unique_ptr<config> cfg;
viewstamp last_myvs{0, 0}; // Viewstamp of the last executed request
viewstamp myvs{0, 1};
string primary;
- bool insync;
- bool inviewchange;
- unsigned vid_commit; // Latest view id that is known to rsm layer
+ bool insync = false;
+ bool inviewchange = true;
+ unsigned vid_commit = 0; // Latest view id that is known to rsm layer
unsigned vid_insync; // The view id that this node is synchronizing for
vector<string> backups; // A list of unsynchronized backups
// For testing purposes
unique_ptr<rpcs> testsvr;
- bool partitioned;
- bool dopartition;
- bool break1;
- bool break2;
+ bool partitioned = false;
+ bool dopartition = false;
+ bool breakpoints[2] = {};
- rsm_client_protocol::status client_members(vector<string> &r, int i);
+ rsm_client_protocol::status client_members(vector<string> & r, int i);
rsm_protocol::status invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & mreq);
- rsm_protocol::status transferreq(rsm_protocol::transferres &r, const string & src,
+ rsm_protocol::status transferreq(rsm_protocol::transferres & r, const string & src,
viewstamp last, unsigned vid);
rsm_protocol::status transferdonereq(int &, const string & m, unsigned vid);
rsm_protocol::status joinreq(string & log, const string & src, viewstamp last);
- rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status &r, int heal);
- rsm_test_protocol::status breakpointreq(rsm_test_protocol::status &r, int b);
+ rsm_test_protocol::status test_net_repairreq(rsm_test_protocol::status & r, int heal);
+ rsm_test_protocol::status breakpointreq(rsm_test_protocol::status & r, int b);
mutex rsm_mutex, invoke_mutex;
cond recovery_cond, sync_cond;
bool sync_with_backups(lock & rsm_mutex_lock);
bool sync_with_primary(lock & rsm_mutex_lock);
void net_repair(bool heal, lock & rsm_mutex_lock);
- void breakpoint1();
- void breakpoint2();
+ void breakpoint(int b);
void partition1(lock & rsm_mutex_lock);
void commit_change(unsigned vid, lock & rsm_mutex_lock);
+ void recovery NORETURN ();
public:
rsm (const string & _first, const string & _me);
bool amiprimary();
void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
- void recovery NORETURN ();
void commit_change(unsigned vid);
template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
static_assert(is_valid_registration<P, F>::value, "RSM handler registered with incorrect argument types");
- reg1(proc.id, marshalled_func<F>::wrap(f, c));
+ lock ml(rsm_mutex);
+ procs[proc.id] = marshalled_func<F>::wrap(f, c);
}
void start();
};
-#endif /* rsm_h */
+#endif
known_mems.pop_back();
}
-rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const string &req) {
+rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
lock ml(rsm_client_mutex);
while (1) {
LOG("proc " << hex << proc << " primary " << primary);
mutex rsm_client_mutex;
void primary_failure(lock & rsm_client_mutex_lock);
bool init_members(lock & rsm_client_mutex_lock);
- rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req);
+ rsm_protocol::status invoke(unsigned int proc, string & rep, const string & req);
template<class R> int call_m(unsigned int proc, R & r, const marshall & req);
public:
rsm_client(string dst);
template<class P, class R, class ...Args>
int call(rpc_protocol::proc_t<P> proc, R & r, const Args & ...a1) {
static_assert(is_valid_call<P, R, Args...>::value, "RSM method invoked with incorrect argument types");
- return call_m(proc.id, r, marshall{a1...});
+ return call_m(proc.id, r, marshall(a1...));
}
};
template<class R>
int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) {
string rep;
- string res;
int intret = invoke(proc, rep, req.content());
VERIFY( intret == rsm_client_protocol::OK );
- unmarshall u(rep, false);
- u >> intret;
+ unmarshall u(rep, false, intret);
if (intret < 0) return intret;
+ string res;
u >> res;
if (!u.okdone()) {
LOG("failed to unmarshall the reply.");
"0x" << hex << proc << " with the wrong return type");
LOG("here's what I got: \"" << hexify(rep) << "\"");
VERIFY(0);
- return rpc_protocol::unmarshal_reply_failure;
+ return rpc_protocol::unmarshall_reply_failure;
}
- unmarshall u1(res, false);
- u1 >> r;
- if(!u1.okdone()) {
+ if(!unmarshall(res, false, r).okdone()) {
LOG("failed to unmarshall the reply.");
LOG("You are probably calling RPC 0x" << hex << proc <<
" with the wrong return type.");
LOG("here's what I got: \"" << hexify(res) << "\"");
VERIFY(0);
- return rpc_protocol::unmarshal_reply_failure;
+ return rpc_protocol::unmarshall_reply_failure;
}
return intret;
}
namespace std {
// Sticking this in std:: makes it possible for ostream_iterator to use it.
template <class A, class B>
- ostream & operator<<(ostream &o, const pair<A,B> &d) {
+ ostream & operator<<(ostream & o, const pair<A,B> & d) {
return o << "<" << d.first << "," << d.second << ">";
}
}
template <class A>
typename enable_if<is_const_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
-operator<<(ostream &o, const A &a) {
+operator<<(ostream & o, const A & a) {
return o << "[" << implode(a, ", ") << "]";
}
template <class A, typename I=void> struct is_const_iterable : false_type {};
template<class A> struct is_const_iterable<A,
- decltype(declval<A&>().cbegin(), declval<A&>().cend(), void())
+ decltype(declval<A &>().cbegin(), declval<A &>().cend(), void())
> : true_type {};
template <class A, typename I=void> struct supports_emplace_back : false_type {};
template<class A> struct supports_emplace_back<A,
- decltype(declval<A&>().emplace_back(declval<typename A::value_type>()), void())
+ decltype(declval<A &>().emplace_back(declval<typename A::value_type>()), void())
> : true_type {};
template<typename E>
return oss.str();
}
-inline vector<string> explode(const string &s, string delim=" ") {
+inline vector<string> explode(const string & s, string delim=" ") {
vector<string> out;
size_t start = 0, end = 0;
while ((end = s.find(delim, start)) != string::npos) {
// LEXICOGRAPHIC_COMPARISON(foo)
#define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \
-inline bool operator _op_(const _c_ &b) const { return _tuple_() _op_ b._tuple_(); }
+inline bool operator _op_(const _c_ & b) const { return _tuple_() _op_ b._tuple_(); }
#define LEXICOGRAPHIC_COMPARISON(_c_) \
LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \
// Template parameter pack expansion is not allowed in certain contexts, but
// brace initializers (for instance, calls to constructors of empty structs)
// are fair game.
-struct pass { template <typename... Args> inline pass(Args&&...) {} };
+struct pass { template <typename... Args> inline pass(Args && ...) {} };
#include "endian.h"