ifeq "$(USE_CLANG)" "1"
PEDANTRY += \
- -Weverything -pedantic-errors -Werror -Wno-c++98-compat \
- -Wno-c++98-compat-pedantic -Wno-padded -Wno-global-constructors \
- -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++
+ -Weverything -pedantic-errors -Werror -Wno-c++98-compat-pedantic \
+ -Wno-padded -Wno-global-constructors -Wno-exit-time-destructors \
+ -pedantic -Wall -Wextra -Weffc++
STDLIB += -stdlib=libc++
CXX = clang++-mp-3.4
}
// who has the smallest ID?
- string m = min(me, *min_element(cmems.begin(), cmems.end()));
+ string m = std::min(me, *std::min_element(cmems.begin(), cmems.end()));
if (m == me) {
// ping the other nodes
template <class... Args> inline tuple<typename remove_reference<Args>::type...>
hton(tuple<Args...> && t) {
- using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
- return tuple_hton_imp(forward<tuple<Args...>>(t), Indices());
+ return tuple_hton_imp(forward<tuple<Args...>>(t), TUPLE_INDICES(Args));
}
-#define ENDIAN_SWAPPABLE(_c_) \
-inline _c_ hton(_c_ && t) { \
- _c_ result; \
- result._tuple_() = hton(t._tuple_()); \
- return result; \
+template <class T> inline typename
+enable_if<is_tuple_convertible<T>::value, T>::type
+hton(T && t) {
+ T result;
+ result._tuple_() = hton(t._tuple_());
+ return result;
}
#endif
h->valid = false;
} else {
LOG << "bind succeeded " << h->destination;
- h->client = move(client);
+ h->client = std::move(client);
}
}
return h->client.get();
// check for reentrancy
VERIFY(st.state != lock_state::locked || st.held_by != self);
- VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end());
+ VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self)
+ == st.wanted_by.end());
st.wanted_by.push_back(self);
#endif // C++
+#ifdef __cplusplus
extern "C" {
+#endif
struct _t4_lock_client;
typedef struct _t4_lock_client t4_lock_client;
t4_status t4_lock_client_release(t4_lock_client *, t4_lockid_t);
t4_status t4_lock_client_stat(t4_lock_client *, t4_lockid_t);
+#ifdef __cplusplus
}
+#endif
#endif
MEMBERS(held, held_by, wanted_by)
};
-MARSHALLABLE_STRUCT(lock_state)
-
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
class lock_server : private rsm_state_transfer {
}
string log::dump() {
- ifstream from(name);
+ std::ifstream from(name);
string res;
string v;
- while (getline(from, v))
+ while (std::getline(from, v))
res += v + "\n";
from.close();
return res;
void log::restore(string s) {
LOG << "restore: " << s;
- ofstream f(name, ios::trunc);
+ std::ofstream f(name, std::ios::trunc);
f << s;
f.close();
}
// XXX should be an atomic operation
void log::loginstance(unsigned instance, string v) {
- ofstream f(name, ios::app);
+ std::ofstream f(name, std::ios::app);
f << "done " << instance << " " << v << "\n";
f.close();
}
// an acceptor should call logprop(promise) when it
// receives a prepare to which it responds prepare_ok().
void log::logprop(prop_t promise) {
- ofstream f(name, ios::app);
+ std::ofstream f(name, std::ios::app);
f << "propseen " << promise.n << " " << promise.m << "\n";
f.close();
}
// an acceptor should call logaccept(accepted, accepted_value) when it
// receives an accept RPC to which it replies accept_ok().
void log::logaccept(prop_t n, string v) {
- ofstream f(name, ios::app);
+ std::ofstream f(name, std::ios::app);
f << "accepted " << n.n << " " << n.m << " " << v << "\n";
f.close();
}
#include "paxos.h"
#include "handle.h"
+using namespace std::placeholders;
+
paxos_change::~paxos_change() {}
bool isamember(const node_t & m, const nodes_t & nodes) {
- return find(nodes.begin(), nodes.end(), m) != nodes.end();
+ return std::find(nodes.begin(), nodes.end(), m) != nodes.end();
}
// check if l2 contains a majority of the elements of l1
bool majority(const nodes_t & l1, const nodes_t & l2) {
- auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
+ auto overlap = (size_t)std::count_if(
+ l1.begin(), l1.end(), std::bind(isamember, _1, l2));
return overlap >= (l1.size() >> 1) + 1;
}
}
stable = false;
bool r = false;
- proposal.n = max(promise.n, proposal.n) + 1;
+ proposal.n = std::max(promise.n, proposal.n) + 1;
nodes_t accepts;
value_t v;
if (prepare(instance, accepts, cur_nodes, v)) {
LEXICOGRAPHIC_COMPARISON(prop_t)
};
-MARSHALLABLE_STRUCT(prop_t)
-
namespace paxos_protocol {
enum status : rpc_protocol::status { OK, ERR };
struct prepareres {
REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
}
-MARSHALLABLE_STRUCT(paxos_protocol::prepareres)
-
#endif
connection_delegate::~connection_delegate() {}
connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
-: fd(move(f1)), delegate_(delegate), lossy_(l1)
+: fd(std::move(f1)), delegate_(delegate), lossy_(l1)
{
fd.flags() |= O_NONBLOCK;
return nullptr;
}
IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port);
- return make_shared<connection>(delegate, move(s), lossy);
+ return make_shared<connection>(delegate, std::move(s), lossy);
}
bool connection::send(const string & b) {
size_t sz = ntoh(sz1);
if (sz > rpc_protocol::MAX_PDU) {
- IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1;
+ IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << std::hex << sz1;
return false;
}
class fifo {
public:
fifo(size_t limit=0) : max_(limit) {}
- bool enq(T, bool blocking=true);
- void deq(T *);
+
+ bool enq(T e, bool blocking=true) {
+ lock ml(m_);
+ while (max_ && q_.size() >= max_) {
+ if (!blocking)
+ return false;
+ has_space_c_.wait(ml);
+ }
+ q_.push_back(e);
+ non_empty_c_.notify_one();
+ return true;
+ }
+
+ void deq(T * e) {
+ lock ml(m_);
+ while(q_.empty())
+ non_empty_c_.wait(ml);
+ *e = q_.front();
+ q_.pop_front();
+ if (max_ && q_.size() < max_)
+ has_space_c_.notify_one();
+ }
+
bool size() {
lock ml(m_);
return q_.size();
size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit
};
-template<class T> bool
-fifo<T>::enq(T e, bool blocking)
-{
- lock ml(m_);
- while (max_ && q_.size() >= max_) {
- if (!blocking)
- return false;
- has_space_c_.wait(ml);
- }
- q_.push_back(e);
- non_empty_c_.notify_one();
- return true;
-}
-
-template<class T> void
-fifo<T>::deq(T *e)
-{
- lock ml(m_);
- while(q_.empty())
- non_empty_c_.wait(ml);
- *e = q_.front();
- q_.pop_front();
- if (max_ && q_.size() < max_)
- has_space_c_.notify_one();
-}
-
#endif
public:
inline file_t(int fd=-1) : fd_(fd) {}
inline file_t(const file_t &) = delete;
- inline file_t(file_t && other) : fd_(-1) { swap(fd_, other.fd_); }
+ inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
inline ~file_t() { if (fd_ != -1) ::close(fd_); }
static inline void pipe(file_t *ends) {
int fds[2];
class marshall {
private:
- string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); // Raw bytes buffer
- size_t index_ = rpc_protocol::RPC_HEADER_SZ; // Read/write head position
+ string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0);
+ size_t index_ = rpc_protocol::RPC_HEADER_SZ;
public:
template <typename... Args>
(void)pass{(*this << args)...};
}
- void rawbytes(const void *p, size_t n) {
+ void write(const void *p, size_t n) {
if (index_+n > buf_.size())
buf_.resize(index_+n);
- copy((char *)p, (char *)p+n, &buf_[index_]);
+ std::copy((char *)p, (char *)p+n, &buf_[index_]);
index_ += n;
}
// delay looking up operator<<(marshall &, rpc_sz_t) until we define it
// (i.e. we define an operator for marshalling uint32_t)
template <class T, class S=rpc_protocol::rpc_sz_t> inline void
- pack_header(const T & h) {
+ write_header(const T & h) {
VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ);
size_t saved_sz = index_;
index_ = 0;
class unmarshall {
private:
string buf_;
- size_t index_ = 0;
+ size_t index_ = rpc_protocol::RPC_HEADER_SZ;
bool ok_ = false;
public:
template <typename... Args>
unmarshall(const string & s, bool has_header, Args && ... args)
- : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) {
+ : buf_(s) {
if (!has_header)
buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0);
ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ);
(void)pass{(*this >> args)...};
}
- bool ok() const { return ok_; }
- bool okdone() const { return ok_ && index_ == buf_.size(); }
+ inline bool ok() const { return ok_; }
+ inline bool okdone() const { return ok_ && index_ == buf_.size(); }
- void rawbytes(void * t, size_t n) {
+ void read(void * t, size_t n) {
if (index_+n > buf_.size())
ok_ = false;
- VERIFY(ok_);
- copy(&buf_[index_], &buf_[index_+n], (char *)t);
- index_ += n;
+ if (ok_) {
+ std::copy(&buf_[index_], &buf_[index_+n], (char *)t);
+ index_ += n;
+ }
}
template <class T> inline void
- unpack_header(T & h) {
+ read_header(T & h) {
VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ);
// first 4 bytes hold length field
index_ = sizeof(rpc_protocol::rpc_sz_t);
//
#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \
-inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \
-inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
+inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.write(&y, sizeof(_d_)); return m; } \
+inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; }
#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_)
return m;
}
-template <class... Args> marshall &
+template <class... Args> inline marshall &
operator<<(marshall & m, tuple<Args...> && t) {
- using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
- return tuple_marshall_imp(m, t, Indices());
+ return tuple_marshall_imp(m, t, TUPLE_INDICES(Args));
}
template <class... Args, size_t... Indices> inline unmarshall &
return u;
}
-template <class... Args> unmarshall &
+template <class... Args> inline unmarshall &
operator>>(unmarshall & u, tuple<Args & ...> && t) {
- using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
- return tuple_unmarshall_imp(u, t, Indices());
+ return tuple_unmarshall_imp(u, t, TUPLE_INDICES(Args));
}
//
//
// Implements struct marshalling via tuple marshalling of members.
-#define MARSHALLABLE_STRUCT(_c_) \
-inline unmarshall & operator>>(unmarshall & u, _c_ & a) { return u >> a._tuple_(); } \
-inline marshall & operator<<(marshall & m, const _c_ a) { return m << a._tuple_(); }
+template <class T> inline typename
+enable_if<is_tuple_convertible<T>::value, unmarshall>::type &
+operator>>(unmarshall & u, T & a) { return u >> a._tuple_(); }
-// our first two marshallable structs...
-MARSHALLABLE_STRUCT(rpc_protocol::request_header)
-MARSHALLABLE_STRUCT(rpc_protocol::reply_header)
+template <class T> inline typename
+enable_if<is_tuple_convertible<T>::value, marshall>::type &
+operator<<(marshall & m, const T a) { return m << a._tuple_(); }
//
// Marshalling for STL containers
template <class A> inline typename
enable_if<is_const_iterable<A>::value, marshall>::type &
operator<<(marshall & m, const A & x) {
- m << (unsigned int)x.size();
+ m << (uint32_t)x.size();
for (const auto & a : x)
m << a;
return m;
template <class A> inline typename
enable_if<supports_emplace_back<A>::value, unmarshall>::type &
operator>>(unmarshall & u, A & x) {
- unsigned n = u._grab<unsigned>();
+ uint32_t n = u._grab<uint32_t>();
x.clear();
while (n--)
x.emplace_back(u._grab<typename A::value_type>());
// std::map<A, B>
template <class A, class B> inline unmarshall &
operator>>(unmarshall & u, map<A,B> & x) {
- unsigned n = u._grab<unsigned>();
+ uint32_t n = u._grab<uint32_t>();
x.clear();
while (n--)
x.emplace(u._grab<pair<A,B>>());
// std::string
inline marshall & operator<<(marshall & m, const string & s) {
m << (uint32_t)s.size();
- m.rawbytes(s.data(), s.size());
+ m.write(s.data(), s.size());
return m;
}
uint32_t sz = u._grab<uint32_t>();
if (u.ok()) {
s.resize(sz);
- u.rawbytes(&s[0], sz);
+ u.read(&s[0], sz);
}
return u;
}
#include "marshall.h"
-typedef function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
+typedef std::function<rpc_protocol::status(unmarshall &&, marshall &)> handler;
//
// Automatic marshalling wrappers for RPC handlers
template <class F, class R, class RV, class args_type, size_t... Indices>
typename enable_if<!is_member_function_pointer<F>::value, RV>::type inline
invoke(RV, F f, void *, R & r, args_type & t, tuple_indices<Indices...>) {
- return f(r, move(get<Indices>(t))...);
+ return f(r, get<Indices>(t)...);
}
// And one for pointers to member functions...
template <class F, class C, class RV, class R, class args_type, size_t... Indices>
typename enable_if<is_member_function_pointer<F>::value, RV>::type inline
invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices<Indices...>) {
- return (c->*f)(r, move(get<Indices>(t))...);
+ return (c->*f)(r, get<Indices>(t)...);
}
// The class marshalled_func_imp uses partial template specialization to
template <class F, class C, class ErrorHandler, class R, class RV, class... Args>
struct marshalled_func_imp<F, C, RV(R &, Args...), ErrorHandler> {
static inline handler *wrap(F f, C *c=nullptr) {
- // This type definition corresponds to an empty struct with
- // template parameters running from 0 up to (# args) - 1.
- using Indices = typename make_tuple_indices<sizeof...(Args)>::type;
// This type definition represents storage for f's unmarshalled
// arguments. decay is (most notably) stripping off const
// qualifiers.
// Allocate space for the RPC response -- will be passed into the
// function as an lvalue reference.
R r;
- // Perform the invocation. Note that Indices() calls the default
- // constructor of the empty struct with the special template
- // parameters.
- RV b = invoke(RV(), f, c, r, t, Indices());
+ // Perform the invocation. Note that TUPLE_INDICES calls the
+ // default constructor of an empty struct with template parameters
+ // running from 0 up to (# args) - 1.
+ RV b = invoke(RV(), f, c, r, t, TUPLE_INDICES(Args));
// Marshall the response.
m << r;
// Make like a tree.
public marshalled_func_imp<F, C, RV(Args...), ErrorHandler> {};
template <class F, class ErrorHandler, class Signature>
-struct marshalled_func<F, ErrorHandler, function<Signature>> :
+struct marshalled_func<F, ErrorHandler, std::function<Signature>> :
public marshalled_func_imp<F, void, Signature, ErrorHandler> {};
#endif
}
int rpcc::bind(milliseconds to) {
- nonce_t r;
+ nonce_t r = 0;
rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r);
if (ret == 0) {
lock ml(m_);
ca.xid = xid_++;
calls_[ca.xid] = &ca;
- req.pack_header(rpc_protocol::request_header{
+ req.write_header(rpc_protocol::request_header{
ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()
});
xid_rep = xid_rep_window_.front();
ch->send(req);
}
else IF_LEVEL(1) LOG << "not reachable";
- IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc
- << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_;
+ IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc
+ << " xid " << std::dec << ca.xid << " clt_nonce " << clt_nonce_;
}
transmit = false; // only send once on a given channel
}
- auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline);
+ auto nextdeadline = std::min(steady_clock::now() + curr_to, finaldeadline);
curr_to *= 2;
{
lock cal(ca.m);
- IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc
- << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":"
+ IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc
+ << " xid " << std::dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":"
<< ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret;
// destruction of req automatically frees its buffer
{
unmarshall rep(b, true);
rpc_protocol::reply_header h;
- rep.unpack_header(h);
+ rep.read_header(h);
if (!rep.ok()) {
IF_LEVEL(1) LOG << "unmarshall header failed!!!";
unmarshall req(buf, true);
rpc_protocol::request_header h;
- req.unpack_header(h);
+ req.read_header(h);
proc_id_t proc = h.proc;
if (!req.ok()) {
return;
}
- IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep "
- << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
+ IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep "
+ << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce;
marshall rep;
rpc_protocol::reply_header rh{h.xid,0};
// is client sending to an old instance of server?
if (h.srv_nonce != 0 && h.srv_nonce != nonce_) {
IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce
- << " (current " << nonce_ << ") proc " << hex << h.proc;
+ << " (current " << nonce_ << ") proc " << std::hex << h.proc;
rh.ret = rpc_protocol::oldsrv_failure;
- rep.pack_header(rh);
+ rep.write_header(rh);
c->send(rep);
return;
}
{
lock pl(procs_m_);
if (procs_.count(proc) < 1) {
- LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_;
+ LOG << "unknown proc 0x" << std::hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_;
VERIFY(0);
return;
}
rh.ret = (*f)(forward<unmarshall>(req), rep);
if (rh.ret == rpc_protocol::unmarshall_args_failure) {
LOG << "failed to unmarshall the arguments. You are "
- << "probably calling RPC 0x" << hex << proc << " with the wrong "
+ << "probably calling RPC 0x" << std::hex << proc << " with the wrong "
<< "types of arguments.";
VERIFY(0);
}
VERIFY(rh.ret >= 0);
- rep.pack_header(rh);
+ rep.write_header(rh);
b1 = rep;
IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc "
- << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce;
+ << h.xid << ", proc " << std::hex << proc << " ret " << std::dec
+ << rh.ret << ", clt " << h.clt_nonce;
add_reply(h.clt_nonce, h.xid, b1);
case FORGOTTEN: // very old request and we don't have the response anymore
IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce;
rh.ret = rpc_protocol::atmostonce_failure;
- rep.pack_header(rh);
+ rep.write_header(rh);
c->send(rep);
break;
}
if (intret < 0) return intret;
unmarshall u(rep, true, r);
if (u.okdone() != true) {
- LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably " <<
- "calling RPC 0x" << hex << proc << " with the wrong return type.";
+ LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably "
+ << "calling RPC 0x" << std::hex << proc << " with the wrong return type.";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
proc_id_t id;
};
- const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
+ union header_t { request_header req; reply_header rep; };
+ const size_t RPC_HEADER_SZ = sizeof(header_t) + sizeof(rpc_sz_t);
const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind
}
-ENDIAN_SWAPPABLE(rpc_protocol::request_header)
-ENDIAN_SWAPPABLE(rpc_protocol::reply_header)
-
#endif
static string dst; //server's ip address
static in_port_t port;
+using std::cout;
+using std::endl;
+
// server-side handlers. they must be methods of some class
// to simplify rpcs::reg(). a server process can have handlers
// from multiple classes.
static void testmarshall() {
marshall m;
rpc_protocol::request_header rh{1,2,3,4,5};
- m.pack_header(rh);
+ m.write_header(rh);
VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
unmarshall un(b, true);
rpc_protocol::request_header rh1;
- un.unpack_header(rh1);
+ un.read_header(rh1);
VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0);
int i1;
unsigned long long l1;
for(int i = 0; i < 100; i++){
int which = (random() % 2);
int arg = (random() % 1000);
- int rep;
+ int rep = -1;
auto start = steady_clock::now();
#include "types.h"
#include "fifo.h"
-typedef function<void()> job_t;
+typedef std::function<void()> job_t;
class thread_pool {
public:
// Start accepting synchronization request (statetransferreq) now!
insync = true;
cfg->get_view(vid_insync, backups);
- backups.erase(find(backups.begin(), backups.end(), cfg->myaddr()));
+ backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr()));
LOG << "backups " << backups;
sync_cond.wait(rsm_mutex_lock);
insync = false;
rsm_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
if (stf && last_myvs != r.last) {
}
if (cl == 0 || ret != rsm_protocol::OK) {
- LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret;
+ LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret;
return false;
}
LOG << "succeeded " << log;
r = marshall(ret, rep.content()).content();
}
+static void logHexString(locked_ostream && log, const string & s) {
+ log << std::setfill('0') << std::setw(2) << std::hex;
+ for (size_t i=0; i<s.size(); i++)
+ log << (unsigned int)(unsigned char)s[i];
+}
+
//
// Clients call client_invoke to invoke a procedure on the replicated state
// machine: the primary receives the request, assigns it a sequence
// machine.
//
rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id_t procno, const string & req) {
- LOG << "invoke procno 0x" << hex << procno;
+ LOG << "invoke procno 0x" << std::hex << procno;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
partition1(rsm_mutex_lock);
}
}
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<req.size(); i++)
- log << (unsigned int)(unsigned char)req[i];
- }
+ logHexString(LOG, req);
execute(procno, req, r);
- {
- auto && log = LOG << setfill('0') << setw(2) << hex;
- for (size_t i=0; i<r.size(); i++)
- log << (unsigned int)(unsigned char)r[i];
- }
+ logHexString(LOG, r);
last_myvs = vs;
return rsm_client_protocol::OK;
}
// according to requests' seqno
rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp vs, const string & req) {
- LOG << "invoke procno 0x" << hex << proc;
+ LOG << "invoke procno 0x" << std::hex << proc;
lock ml(invoke_mutex);
vector<string> m;
string myaddr;
if (primary == myaddr)
return rsm_protocol::ERR;
cfg->get_view(vid_commit, m);
- if (find(m.begin(), m.end(), myaddr) == m.end())
+ if (std::find(m.begin(), m.end(), myaddr) == m.end())
return rsm_protocol::ERR;
// check sequence number
LOG << "Checking sequence number";
lock ml(rsm_mutex);
if (!insync || vid != vid_insync)
return rsm_protocol::BUSY;
- backups.erase(find(backups.begin(), backups.end(), m));
+ backups.erase(std::find(backups.begin(), backups.end(), m));
if (backups.empty())
sync_cond.notify_one();
return rsm_protocol::OK;
rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) {
lock ml(rsm_client_mutex);
while (1) {
- LOG << "proc " << hex << proc << " primary " << primary;
+ LOG << "proc " << std::hex << proc << " primary " << primary;
handle h(primary);
ml.unlock();
if (!cl)
goto prim_fail;
- LOG << "proc " << hex << proc << " primary " << primary << " ret " << dec << ret;
+ LOG << "proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret;
if (ret == rsm_client_protocol::OK)
return rsm_protocol::OK;
if (ret == rsm_client_protocol::BUSY) {
continue;
}
prim_fail:
- LOG << "primary " << primary << " failed ret " << dec << ret;
+ LOG << "primary " << primary << " failed ret " << std::dec << ret;
primary_failure(ml);
LOG << "retry new primary " << primary;
}
u >> res;
if (!u.okdone()) {
LOG << "failed to unmarshall the reply.";
- LOG << "You probably forgot to set the reply string in " <<
- "rsm::client_invoke, or you may have called RPC " <<
- "0x" << hex << proc << " with the wrong return type";
+ LOG << "You probably forgot to set the reply string in "
+ << "rsm::client_invoke, or you may have called RPC "
+ << "0x" << std::hex << proc << " with the wrong return type";
LOG << "here's what I got: \"" << hexify(rep) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
}
if(!unmarshall(res, false, r).okdone()) {
LOG << "failed to unmarshall the reply.";
- LOG << "You are probably calling RPC 0x" << hex << proc <<
- " with the wrong return type.";
+ LOG << "You are probably calling RPC 0x" << std::hex << proc
+ << " with the wrong return type.";
LOG << "here's what I got: \"" << hexify(res) << "\"";
VERIFY(0);
return rpc_protocol::unmarshall_reply_failure;
LEXICOGRAPHIC_COMPARISON(viewstamp)
};
-MARSHALLABLE_STRUCT(viewstamp)
-
namespace rsm_protocol {
enum status : rpc_protocol::status { OK, ERR, BUSY};
REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp));
}
-MARSHALLABLE_STRUCT(rsm_protocol::transferres)
-
namespace rsm_test_protocol {
enum status : rpc_protocol::status {OK, ERR};
REMOTE_PROCEDURE_BASE(0x12000);
rsmtest_client *lc = new rsmtest_client(argv[1]);
string command(argv[2]);
if (command == "partition") {
- cout << "net_repair returned " << lc->net_repair(stoi(argv[3]));
+ LOG_NONMEMBER << "net_repair returned " << lc->net_repair(stoi(argv[3]));
} else if (command == "breakpoint") {
int b = stoi(argv[3]);
- cout << "breakpoint " << b << " returned " << lc->breakpoint(b);
+ LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b);
} else {
LOG_NONMEMBER << "Unknown command " << argv[2];
}
rsmtest_client::rsmtest_client(string dst) : cl(dst) {
if (cl.bind() < 0)
- cout << "rsmtest_client: call bind" << endl;
+ LOG << "rsmtest_client: call bind";
}
rsm_test_protocol::status rsmtest_client::net_repair(int heal) {
- rsm_test_protocol::status r;
+ rsm_test_protocol::status r = rsm_test_protocol::ERR;
auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::net_repair, r, heal);
VERIFY (ret == rsm_test_protocol::OK);
return r;
}
rsm_test_protocol::status rsmtest_client::breakpoint(int b) {
- rsm_test_protocol::status r;
+ rsm_test_protocol::status r = rsm_test_protocol::ERR;
auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::breakpoint, r, b);
VERIFY (ret == rsm_test_protocol::OK);
return r;
#include "threaded_log.h"
-mutex cerr_mutex;
-map<thread::id, int> thread_name_map;
-int next_thread_num = 0;
-map<const void *, int> instance_name_map;
-int next_instance_num = 0;
+static mutex log_mutex;
+static map<thread::id, int> thread_name_map;
+static int next_thread_num = 0;
+static map<const void *, int> instance_name_map;
+static int next_instance_num = 0;
int DEBUG_LEVEL = 0;
locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) {
if (tid==0)
tid = thread_name_map[thread] = ++next_thread_num;
auto utime = duration_cast<microseconds>(system_clock::now().time_since_epoch()).count() % 1000000000;
- f << setfill('0') << dec << left << setw(9) << utime << " ";
- f << setfill(' ') << log_thread_prefix << left << setw(2) << tid;
- f << " " << setw(20) << file << " " << setw(18) << func;
- return move(f);
+ f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " ";
+ f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid;
+ f << " " << std::setw(20) << file << " " << std::setw(18) << func;
+ return std::move(f);
}
locked_ostream && _log_member(locked_ostream && f, const void *ptr) {
int id = instance_name_map[ptr];
if (id == 0)
id = instance_name_map[ptr] = ++next_instance_num;
- f << "#" << left << setw(2) << id << " ";
- return move(f);
+ f << "#" << std::left << std::setw(2) << id << " ";
+ return std::move(f);
+}
+
+lock _log_lock() {
+ return lock(log_mutex);
}
#include "types.h"
-extern mutex cerr_mutex;
-extern map<thread::id, int> thread_name_map;
-extern int next_thread_num;
-extern map<const void *, int> instance_name_map;
-extern int next_instance_num;
extern char log_thread_prefix;
+extern int DEBUG_LEVEL;
struct locked_ostream {
- ostream & s;
+ std::ostream & s;
lock l;
- ~locked_ostream() { s << endl; }
+ locked_ostream(locked_ostream &&) = default;
+ ~locked_ostream() { s << std::endl; }
template <typename U>
locked_ostream & operator<<(U && u) { s << u; return *this; }
-
- typedef std::ostream& (*ostream_manipulator)(ostream&);
- locked_ostream & operator<<(ostream_manipulator manip) { s << manip; return *this; }
};
locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func);
locked_ostream && _log_member(locked_ostream && f, const void *ptr);
-#define _log_nonmember(f, ptr) f
-
-#define _LOG(_context_) _context_(_log_prefix(locked_ostream{cerr, lock(cerr_mutex)}, __FILE__, __func__), (const void *)this)
+lock _log_lock();
-#define LOG_NONMEMBER _LOG(_log_nonmember)
-#define LOG _LOG(_log_member)
-
-extern int DEBUG_LEVEL;
+#define LOG_NONMEMBER _log_prefix(locked_ostream{std::cerr, _log_lock()}, __FILE__, __func__)
+#define LOG _log_member(LOG_NONMEMBER, (const void *)this)
#define IF_LEVEL(level) if(DEBUG_LEVEL >= abs(level))
#include <sys/types.h>
#include <algorithm>
-using std::copy;
-using std::count_if;
-using std::find;
-using std::max;
-using std::min;
-using std::min_element;
-using std::move;
-using std::swap;
#include <condition_variable>
using cond = std::condition_variable;
using std::chrono::time_point_cast;
#include <exception>
-using std::exception;
#include <fstream>
using std::ifstream;
using std::ofstream;
#include <functional>
-// std::bind conflicts with BIND(2)
-using std::function;
-using std::placeholders::_1;
#include <iomanip>
#include <iostream>
-using std::cout;
-using std::cerr;
-using std::endl;
-using std::dec;
-using std::hex;
-using std::left;
-using std::setw;
-using std::setfill;
-using std::setprecision;
-using std::ostream;
-using std::istream;
-using std::ios;
#include <limits>
using std::numeric_limits;
using std::runtime_error;
#include <sstream>
-using std::ostringstream;
-using std::istringstream;
#include <string>
using std::string;
template<typename E> constexpr inline enum_type_t<E> from_enum(E e) noexcept { return (enum_type_t<E>)e; }
template<typename E> constexpr inline E to_enum(enum_type_t<E> value) noexcept { return (E)value; }
+
+template <class A, typename I=void> struct is_tuple_convertible : false_type {};
+
+template<class A> struct is_tuple_convertible<A,
+ decltype(declval<A &>()._tuple_(), void())
+> : true_type {};
+
// string manipulation
template <class A, class B>
-ostream & operator<<(ostream & o, const pair<A,B> & d) {
+std::ostream & operator<<(std::ostream & o, const pair<A,B> & d) {
return o << "<" << d.first << "," << d.second << ">";
}
auto i=v.cbegin(), end=v.cend();
if (i == end)
return string();
- ostringstream oss;
+ std::ostringstream oss;
oss << *i++;
while (i != end)
oss << delim << *i++;
}
template <class A>
-typename enable_if<is_const_iterable<A>::value && !is_same<A,string>::value, ostream>::type &
-operator<<(ostream & o, const A & a) {
+typename enable_if<is_const_iterable<A>::value && !is_same<A,string>::value, std::ostream>::type &
+operator<<(std::ostream & o, const A & a) {
return o << "[" << implode(a, ", ") << "]";
}
#include "verify.h"
#include "threaded_log.h"
-// struct tuple adapter, useful for marshalling
-// used like
+// struct tuple adapter, useful for marshalling and endian swapping. usage:
+//
// struct foo {
// int a, b;
// MEMBERS(a, b)
inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \
inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); }
-// struct ordering and comparison
-// used like
-// struct foo {
-// int a, b;
-// MEMBERS(a, b)
-// };
+// struct ordering and comparison operations; requires the use of MEMBERS.
+// usage:
+//
// LEXICOGRAPHIC_COMPARISON(foo)
#define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \
typedef typename make_indices_imp<S, tuple_indices<>, E>::type type;
};
+#define TUPLE_INDICES(_ArgPack_) typename make_tuple_indices<sizeof...(_ArgPack_)>::type()
+
// Template parameter pack expansion is not allowed in certain contexts, but
// brace initializers (for instance, calls to constructors of empty structs)
// are fair game.