From 26ade07ab0e62b98b452fbbd18edba0450035e35 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Sat, 23 Nov 2013 22:13:52 -0500 Subject: [PATCH] Simplifications and clean-ups --- Makefile.osx | 6 ++--- config.cc | 2 +- endian.h | 14 +++++------ handle.cc | 2 +- lock_client.cc | 3 ++- lock_client.h | 4 ++++ lock_server.h | 2 -- log.cc | 12 +++++----- paxos.cc | 9 ++++--- paxos_protocol.h | 4 ---- rpc/connection.cc | 6 ++--- rpc/fifo.h | 51 ++++++++++++++++++---------------------- rpc/file.h | 2 +- rpc/marshall.h | 65 +++++++++++++++++++++++++-------------------------- rpc/marshall_wrap.h | 19 +++++++-------- rpc/rpc.cc | 37 +++++++++++++++-------------- rpc/rpc.h | 4 ++-- rpc/rpc_protocol.h | 6 ++--- rpc/rpctest.cc | 9 ++++--- rpc/thr_pool.h | 2 +- rsm.cc | 32 ++++++++++++------------- rsm_client.cc | 6 ++--- rsm_client.h | 10 ++++---- rsm_protocol.h | 4 ---- rsm_tester.cc | 4 ++-- rsmtest_client.cc | 6 ++--- threaded_log.cc | 26 ++++++++++++--------- threaded_log.h | 24 ++++++------------- types.h | 56 ++++++++++++++------------------------------ 29 files changed, 195 insertions(+), 232 deletions(-) diff --git a/Makefile.osx b/Makefile.osx index 25ea1af..ef15988 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -9,9 +9,9 @@ LDFLAGS = -std=c++11 $(STDLIB) $(OPTFLAGS) ifeq "$(USE_CLANG)" "1" PEDANTRY += \ - -Weverything -pedantic-errors -Werror -Wno-c++98-compat \ - -Wno-c++98-compat-pedantic -Wno-padded -Wno-global-constructors \ - -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++ + -Weverything -pedantic-errors -Werror -Wno-c++98-compat-pedantic \ + -Wno-padded -Wno-global-constructors -Wno-exit-time-destructors \ + -pedantic -Wall -Wextra -Weffc++ STDLIB += -stdlib=libc++ CXX = clang++-mp-3.4 diff --git a/config.cc b/config.cc index 374757f..e1b0963 100644 --- a/config.cc +++ b/config.cc @@ -160,7 +160,7 @@ void config::heartbeater() { } // who has the smallest ID? - string m = min(me, *min_element(cmems.begin(), cmems.end())); + string m = std::min(me, *std::min_element(cmems.begin(), cmems.end())); if (m == me) { // ping the other nodes diff --git a/endian.h b/endian.h index 7385406..f34d371 100644 --- a/endian.h +++ b/endian.h @@ -27,15 +27,15 @@ tuple_hton_imp(tuple && t, tuple_indices) { template inline tuple::type...> hton(tuple && t) { - using Indices = typename make_tuple_indices::type; - return tuple_hton_imp(forward>(t), Indices()); + return tuple_hton_imp(forward>(t), TUPLE_INDICES(Args)); } -#define ENDIAN_SWAPPABLE(_c_) \ -inline _c_ hton(_c_ && t) { \ - _c_ result; \ - result._tuple_() = hton(t._tuple_()); \ - return result; \ +template inline typename +enable_if::value, T>::type +hton(T && t) { + T result; + result._tuple_() = hton(t._tuple_()); + return result; } #endif diff --git a/handle.cc b/handle.cc index fa7495c..17d04af 100644 --- a/handle.cc +++ b/handle.cc @@ -34,7 +34,7 @@ rpcc * handle::safebind() { h->valid = false; } else { LOG << "bind succeeded " << h->destination; - h->client = move(client); + h->client = std::move(client); } } return h->client.get(); diff --git a/lock_client.cc b/lock_client.cc index 81c102e..7a44940 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -85,7 +85,8 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { // check for reentrancy VERIFY(st.state != lock_state::locked || st.held_by != self); - VERIFY(find(st.wanted_by.begin(), st.wanted_by.end(), self) == st.wanted_by.end()); + VERIFY(std::find(st.wanted_by.begin(), st.wanted_by.end(), self) + == st.wanted_by.end()); st.wanted_by.push_back(self); diff --git a/lock_client.h b/lock_client.h index 654cf4f..31bf905 100644 --- a/lock_client.h +++ b/lock_client.h @@ -70,7 +70,9 @@ class lock_client { #endif // C++ +#ifdef __cplusplus extern "C" { +#endif struct _t4_lock_client; typedef struct _t4_lock_client t4_lock_client; @@ -93,6 +95,8 @@ t4_status t4_lock_client_acquire(t4_lock_client *, t4_lockid_t); t4_status t4_lock_client_release(t4_lock_client *, t4_lockid_t); t4_status t4_lock_client_stat(t4_lock_client *, t4_lockid_t); +#ifdef __cplusplus } +#endif #endif diff --git a/lock_server.h b/lock_server.h index 1f30f87..da8b788 100644 --- a/lock_server.h +++ b/lock_server.h @@ -22,8 +22,6 @@ public: MEMBERS(held, held_by, wanted_by) }; -MARSHALLABLE_STRUCT(lock_state) - typedef map lock_map; class lock_server : private rsm_state_transfer { diff --git a/log.cc b/log.cc index c9af175..2f1d679 100644 --- a/log.cc +++ b/log.cc @@ -49,10 +49,10 @@ void log::logread(void) { } string log::dump() { - ifstream from(name); + std::ifstream from(name); string res; string v; - while (getline(from, v)) + while (std::getline(from, v)) res += v + "\n"; from.close(); return res; @@ -60,14 +60,14 @@ string log::dump() { void log::restore(string s) { LOG << "restore: " << s; - ofstream f(name, ios::trunc); + std::ofstream f(name, std::ios::trunc); f << s; f.close(); } // XXX should be an atomic operation void log::loginstance(unsigned instance, string v) { - ofstream f(name, ios::app); + std::ofstream f(name, std::ios::app); f << "done " << instance << " " << v << "\n"; f.close(); } @@ -75,7 +75,7 @@ void log::loginstance(unsigned instance, string v) { // an acceptor should call logprop(promise) when it // receives a prepare to which it responds prepare_ok(). void log::logprop(prop_t promise) { - ofstream f(name, ios::app); + std::ofstream f(name, std::ios::app); f << "propseen " << promise.n << " " << promise.m << "\n"; f.close(); } @@ -83,7 +83,7 @@ void log::logprop(prop_t promise) { // an acceptor should call logaccept(accepted, accepted_value) when it // receives an accept RPC to which it replies accept_ok(). void log::logaccept(prop_t n, string v) { - ofstream f(name, ios::app); + std::ofstream f(name, std::ios::app); f << "accepted " << n.n << " " << n.m << " " << v << "\n"; f.close(); } diff --git a/paxos.cc b/paxos.cc index cb32e36..a055c36 100644 --- a/paxos.cc +++ b/paxos.cc @@ -1,15 +1,18 @@ #include "paxos.h" #include "handle.h" +using namespace std::placeholders; + paxos_change::~paxos_change() {} bool isamember(const node_t & m, const nodes_t & nodes) { - return find(nodes.begin(), nodes.end(), m) != nodes.end(); + return std::find(nodes.begin(), nodes.end(), m) != nodes.end(); } // check if l2 contains a majority of the elements of l1 bool majority(const nodes_t & l1, const nodes_t & l2) { - auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2)); + auto overlap = (size_t)std::count_if( + l1.begin(), l1.end(), std::bind(isamember, _1, l2)); return overlap >= (l1.size() >> 1) + 1; } @@ -48,7 +51,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const } stable = false; bool r = false; - proposal.n = max(promise.n, proposal.n) + 1; + proposal.n = std::max(promise.n, proposal.n) + 1; nodes_t accepts; value_t v; if (prepare(instance, accepts, cur_nodes, v)) { diff --git a/paxos_protocol.h b/paxos_protocol.h index 30a3d2c..1ce3be7 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -12,8 +12,6 @@ struct prop_t { LEXICOGRAPHIC_COMPARISON(prop_t) }; -MARSHALLABLE_STRUCT(prop_t) - namespace paxos_protocol { enum status : rpc_protocol::status { OK, ERR }; struct prepareres { @@ -35,6 +33,4 @@ namespace paxos_protocol { REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned)); } -MARSHALLABLE_STRUCT(paxos_protocol::prepareres) - #endif diff --git a/rpc/connection.cc b/rpc/connection.cc index c2635ef..d118539 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -9,7 +9,7 @@ connection_delegate::~connection_delegate() {} connection::connection(connection_delegate * delegate, socket_t && f1, int l1) -: fd(move(f1)), delegate_(delegate), lossy_(l1) +: fd(std::move(f1)), delegate_(delegate), lossy_(l1) { fd.flags() |= O_NONBLOCK; @@ -42,7 +42,7 @@ shared_ptr connection::to_dst(const sockaddr_in & dst, connection_de return nullptr; } IF_LEVEL(2) LOG_NONMEMBER << "connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port); - return make_shared(delegate, move(s), lossy); + return make_shared(delegate, std::move(s), lossy); } bool connection::send(const string & b) { @@ -173,7 +173,7 @@ bool connection::readpdu() { size_t sz = ntoh(sz1); if (sz > rpc_protocol::MAX_PDU) { - IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << hex << sz1; + IF_LEVEL(2) LOG << "read pdu TOO BIG " << sz << " network order=" << std::hex << sz1; return false; } diff --git a/rpc/fifo.h b/rpc/fifo.h index e3f0c38..f8a7224 100644 --- a/rpc/fifo.h +++ b/rpc/fifo.h @@ -8,8 +8,29 @@ template class fifo { public: fifo(size_t limit=0) : max_(limit) {} - bool enq(T, bool blocking=true); - void deq(T *); + + bool enq(T e, bool blocking=true) { + lock ml(m_); + while (max_ && q_.size() >= max_) { + if (!blocking) + return false; + has_space_c_.wait(ml); + } + q_.push_back(e); + non_empty_c_.notify_one(); + return true; + } + + void deq(T * e) { + lock ml(m_); + while(q_.empty()) + non_empty_c_.wait(ml); + *e = q_.front(); + q_.pop_front(); + if (max_ && q_.size() < max_) + has_space_c_.notify_one(); + } + bool size() { lock ml(m_); return q_.size(); @@ -23,30 +44,4 @@ class fifo { size_t max_; // maximum capacity of the queue, block enq threads if exceeds this limit }; -template bool -fifo::enq(T e, bool blocking) -{ - lock ml(m_); - while (max_ && q_.size() >= max_) { - if (!blocking) - return false; - has_space_c_.wait(ml); - } - q_.push_back(e); - non_empty_c_.notify_one(); - return true; -} - -template void -fifo::deq(T *e) -{ - lock ml(m_); - while(q_.empty()) - non_empty_c_.wait(ml); - *e = q_.front(); - q_.pop_front(); - if (max_ && q_.size() < max_) - has_space_c_.notify_one(); -} - #endif diff --git a/rpc/file.h b/rpc/file.h index 393b98a..9ea313f 100644 --- a/rpc/file.h +++ b/rpc/file.h @@ -22,7 +22,7 @@ class file_t { public: inline file_t(int fd=-1) : fd_(fd) {} inline file_t(const file_t &) = delete; - inline file_t(file_t && other) : fd_(-1) { swap(fd_, other.fd_); } + inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); } inline ~file_t() { if (fd_ != -1) ::close(fd_); } static inline void pipe(file_t *ends) { int fds[2]; diff --git a/rpc/marshall.h b/rpc/marshall.h index 7b716e2..8592e4b 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -10,8 +10,8 @@ class marshall { private: - string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); // Raw bytes buffer - size_t index_ = rpc_protocol::RPC_HEADER_SZ; // Read/write head position + string buf_ = string(rpc_protocol::DEFAULT_RPC_SZ, 0); + size_t index_ = rpc_protocol::RPC_HEADER_SZ; public: template @@ -19,10 +19,10 @@ class marshall { (void)pass{(*this << args)...}; } - void rawbytes(const void *p, size_t n) { + void write(const void *p, size_t n) { if (index_+n > buf_.size()) buf_.resize(index_+n); - copy((char *)p, (char *)p+n, &buf_[index_]); + std::copy((char *)p, (char *)p+n, &buf_[index_]); index_ += n; } @@ -35,7 +35,7 @@ class marshall { // delay looking up operator<<(marshall &, rpc_sz_t) until we define it // (i.e. we define an operator for marshalling uint32_t) template inline void - pack_header(const T & h) { + write_header(const T & h) { VERIFY(sizeof(T)+sizeof(S) <= rpc_protocol::RPC_HEADER_SZ); size_t saved_sz = index_; index_ = 0; @@ -47,32 +47,33 @@ class marshall { class unmarshall { private: string buf_; - size_t index_ = 0; + size_t index_ = rpc_protocol::RPC_HEADER_SZ; bool ok_ = false; public: template unmarshall(const string & s, bool has_header, Args && ... args) - : buf_(s),index_(rpc_protocol::RPC_HEADER_SZ) { + : buf_(s) { if (!has_header) buf_.insert(0, rpc_protocol::RPC_HEADER_SZ, 0); ok_ = (buf_.size() >= rpc_protocol::RPC_HEADER_SZ); (void)pass{(*this >> args)...}; } - bool ok() const { return ok_; } - bool okdone() const { return ok_ && index_ == buf_.size(); } + inline bool ok() const { return ok_; } + inline bool okdone() const { return ok_ && index_ == buf_.size(); } - void rawbytes(void * t, size_t n) { + void read(void * t, size_t n) { if (index_+n > buf_.size()) ok_ = false; - VERIFY(ok_); - copy(&buf_[index_], &buf_[index_+n], (char *)t); - index_ += n; + if (ok_) { + std::copy(&buf_[index_], &buf_[index_+n], (char *)t); + index_ += n; + } } template inline void - unpack_header(T & h) { + read_header(T & h) { VERIFY(sizeof(T)+sizeof(rpc_protocol::rpc_sz_t) <= rpc_protocol::RPC_HEADER_SZ); // first 4 bytes hold length field index_ = sizeof(rpc_protocol::rpc_sz_t); @@ -88,8 +89,8 @@ class unmarshall { // #define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \ -inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \ -inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } +inline marshall & operator<<(marshall & m, _c_ x) { _d_ y = hton((_d_)x); m.write(&y, sizeof(_d_)); return m; } \ +inline unmarshall & operator>>(unmarshall & u, _c_ & x) { _d_ y; u.read(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } #define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) @@ -125,10 +126,9 @@ tuple_marshall_imp(marshall & m, tuple & t, tuple_indices) return m; } -template marshall & +template inline marshall & operator<<(marshall & m, tuple && t) { - using Indices = typename make_tuple_indices::type; - return tuple_marshall_imp(m, t, Indices()); + return tuple_marshall_imp(m, t, TUPLE_INDICES(Args)); } template inline unmarshall & @@ -137,10 +137,9 @@ tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices unmarshall & +template inline unmarshall & operator>>(unmarshall & u, tuple && t) { - using Indices = typename make_tuple_indices::type; - return tuple_unmarshall_imp(u, t, Indices()); + return tuple_unmarshall_imp(u, t, TUPLE_INDICES(Args)); } // @@ -148,13 +147,13 @@ operator>>(unmarshall & u, tuple && t) { // // Implements struct marshalling via tuple marshalling of members. -#define MARSHALLABLE_STRUCT(_c_) \ -inline unmarshall & operator>>(unmarshall & u, _c_ & a) { return u >> a._tuple_(); } \ -inline marshall & operator<<(marshall & m, const _c_ a) { return m << a._tuple_(); } +template inline typename +enable_if::value, unmarshall>::type & +operator>>(unmarshall & u, T & a) { return u >> a._tuple_(); } -// our first two marshallable structs... -MARSHALLABLE_STRUCT(rpc_protocol::request_header) -MARSHALLABLE_STRUCT(rpc_protocol::reply_header) +template inline typename +enable_if::value, marshall>::type & +operator<<(marshall & m, const T a) { return m << a._tuple_(); } // // Marshalling for STL containers @@ -164,7 +163,7 @@ MARSHALLABLE_STRUCT(rpc_protocol::reply_header) template inline typename enable_if::value, marshall>::type & operator<<(marshall & m, const A & x) { - m << (unsigned int)x.size(); + m << (uint32_t)x.size(); for (const auto & a : x) m << a; return m; @@ -174,7 +173,7 @@ operator<<(marshall & m, const A & x) { template inline typename enable_if::value, unmarshall>::type & operator>>(unmarshall & u, A & x) { - unsigned n = u._grab(); + uint32_t n = u._grab(); x.clear(); while (n--) x.emplace_back(u._grab()); @@ -195,7 +194,7 @@ operator>>(unmarshall & u, pair & d) { // std::map template inline unmarshall & operator>>(unmarshall & u, map & x) { - unsigned n = u._grab(); + uint32_t n = u._grab(); x.clear(); while (n--) x.emplace(u._grab>()); @@ -205,7 +204,7 @@ operator>>(unmarshall & u, map & x) { // std::string inline marshall & operator<<(marshall & m, const string & s) { m << (uint32_t)s.size(); - m.rawbytes(s.data(), s.size()); + m.write(s.data(), s.size()); return m; } @@ -213,7 +212,7 @@ inline unmarshall & operator>>(unmarshall & u, string & s) { uint32_t sz = u._grab(); if (u.ok()) { s.resize(sz); - u.rawbytes(&s[0], sz); + u.read(&s[0], sz); } return u; } diff --git a/rpc/marshall_wrap.h b/rpc/marshall_wrap.h index 4ba7a20..b895696 100644 --- a/rpc/marshall_wrap.h +++ b/rpc/marshall_wrap.h @@ -3,7 +3,7 @@ #include "marshall.h" -typedef function handler; +typedef std::function handler; // // Automatic marshalling wrappers for RPC handlers @@ -45,7 +45,7 @@ struct VerifyOnFailure { template typename enable_if::value, RV>::type inline invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { - return f(r, move(get(t))...); + return f(r, get(t)...); } // And one for pointers to member functions... @@ -53,7 +53,7 @@ invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { template typename enable_if::value, RV>::type inline invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices) { - return (c->*f)(r, move(get(t))...); + return (c->*f)(r, get(t)...); } // The class marshalled_func_imp uses partial template specialization to @@ -73,9 +73,6 @@ template struct marshalled_func_imp { static inline handler *wrap(F f, C *c=nullptr) { - // This type definition corresponds to an empty struct with - // template parameters running from 0 up to (# args) - 1. - using Indices = typename make_tuple_indices::type; // This type definition represents storage for f's unmarshalled // arguments. decay is (most notably) stripping off const // qualifiers. @@ -92,10 +89,10 @@ struct marshalled_func_imp { // Allocate space for the RPC response -- will be passed into the // function as an lvalue reference. R r; - // Perform the invocation. Note that Indices() calls the default - // constructor of the empty struct with the special template - // parameters. - RV b = invoke(RV(), f, c, r, t, Indices()); + // Perform the invocation. Note that TUPLE_INDICES calls the + // default constructor of an empty struct with template parameters + // running from 0 up to (# args) - 1. + RV b = invoke(RV(), f, c, r, t, TUPLE_INDICES(Args)); // Marshall the response. m << r; // Make like a tree. @@ -121,7 +118,7 @@ struct marshalled_func : public marshalled_func_imp {}; template -struct marshalled_func> : +struct marshalled_func> : public marshalled_func_imp {}; #endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index a451c9f..00f6d2e 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -89,7 +89,7 @@ rpcc::~rpcc() { } int rpcc::bind(milliseconds to) { - nonce_t r; + nonce_t r = 0; rpc_protocol::status ret = call_timeout(rpc_protocol::bind, to, r); if (ret == 0) { lock ml(m_); @@ -143,7 +143,7 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_header(rpc_protocol::request_header{ + req.write_header(rpc_protocol::request_header{ ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front() }); xid_rep = xid_rep_window_.front(); @@ -173,13 +173,13 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { ch->send(req); } else IF_LEVEL(1) LOG << "not reachable"; - IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << hex << proc - << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_; + IF_LEVEL(2) LOG << clt_nonce_ << " just sent req proc " << std::hex << proc + << " xid " << std::dec << ca.xid << " clt_nonce " << clt_nonce_; } transmit = false; // only send once on a given channel } - auto nextdeadline = min(steady_clock::now() + curr_to, finaldeadline); + auto nextdeadline = std::min(steady_clock::now() + curr_to, finaldeadline); curr_to *= 2; { @@ -231,8 +231,8 @@ int rpcc::call1(proc_id_t proc, milliseconds to, string & rep, marshall & req) { lock cal(ca.m); - IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << hex << proc - << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" + IF_LEVEL(2) LOG << clt_nonce_ << " call done for req proc " << std::hex << proc + << " xid " << std::dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << ntoh(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret; // destruction of req automatically frees its buffer @@ -258,7 +258,7 @@ rpcc::got_pdu(const shared_ptr &, const string & b) { unmarshall rep(b, true); rpc_protocol::reply_header h; - rep.unpack_header(h); + rep.read_header(h); if (!rep.ok()) { IF_LEVEL(1) LOG << "unmarshall header failed!!!"; @@ -342,7 +342,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); rpc_protocol::request_header h; - req.unpack_header(h); + req.read_header(h); proc_id_t proc = h.proc; if (!req.ok()) { @@ -350,8 +350,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { return; } - IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << hex << proc << ", last_rep " - << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; + IF_LEVEL(2) LOG << "rpc " << h.xid << " (proc " << std::hex << proc << ", last_rep " + << std::dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce; marshall rep; rpc_protocol::reply_header rh{h.xid,0}; @@ -359,9 +359,9 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { // is client sending to an old instance of server? if (h.srv_nonce != 0 && h.srv_nonce != nonce_) { IF_LEVEL(2) LOG << "rpc for an old server instance " << h.srv_nonce - << " (current " << nonce_ << ") proc " << hex << h.proc; + << " (current " << nonce_ << ") proc " << std::hex << h.proc; rh.ret = rpc_protocol::oldsrv_failure; - rep.pack_header(rh); + rep.write_header(rh); c->send(rep); return; } @@ -371,7 +371,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { { lock pl(procs_m_); if (procs_.count(proc) < 1) { - LOG << "unknown proc 0x" << hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; + LOG << "unknown proc 0x" << std::hex << proc << " with h.srv_nonce=" << h.srv_nonce << ", my srv_nonce=" << nonce_; VERIFY(0); return; } @@ -407,17 +407,18 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { rh.ret = (*f)(forward(req), rep); if (rh.ret == rpc_protocol::unmarshall_args_failure) { LOG << "failed to unmarshall the arguments. You are " - << "probably calling RPC 0x" << hex << proc << " with the wrong " + << "probably calling RPC 0x" << std::hex << proc << " with the wrong " << "types of arguments."; VERIFY(0); } VERIFY(rh.ret >= 0); - rep.pack_header(rh); + rep.write_header(rh); b1 = rep; IF_LEVEL(2) LOG << "sending and saving reply of size " << b1.size() << " for rpc " - << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce; + << h.xid << ", proc " << std::hex << proc << " ret " << std::dec + << rh.ret << ", clt " << h.clt_nonce; add_reply(h.clt_nonce, h.xid, b1); @@ -438,7 +439,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { case FORGOTTEN: // very old request and we don't have the response anymore IF_LEVEL(2) LOG << "very old request " << h.xid << " from " << h.clt_nonce; rh.ret = rpc_protocol::atmostonce_failure; - rep.pack_header(rh); + rep.write_header(rh); c->send(rep); break; } diff --git a/rpc/rpc.h b/rpc/rpc.h index 53a1746..b44c057 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -95,8 +95,8 @@ class rpcc : private connection_delegate { if (intret < 0) return intret; unmarshall u(rep, true, r); if (u.okdone() != true) { - LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably " << - "calling RPC 0x" << hex << proc << " with the wrong return type."; + LOG << "rpcc::call_m: failed to unmarshall the reply. You are probably " + << "calling RPC 0x" << std::hex << proc << " with the wrong return type."; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; } diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h index b2fc346..80089ff 100644 --- a/rpc/rpc_protocol.h +++ b/rpc/rpc_protocol.h @@ -44,7 +44,8 @@ namespace rpc_protocol { proc_id_t id; }; - const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); + union header_t { request_header req; reply_header rep; }; + const size_t RPC_HEADER_SZ = sizeof(header_t) + sizeof(rpc_sz_t); const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation const size_t MAX_PDU = 10<<20; // maximum PDF is 10M @@ -55,7 +56,4 @@ namespace rpc_protocol { REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind } -ENDIAN_SWAPPABLE(rpc_protocol::request_header) -ENDIAN_SWAPPABLE(rpc_protocol::reply_header) - #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index f2e8c7d..bf840d2 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -17,6 +17,9 @@ static rpcc *clients[NUM_CL]; // client rpc object static string dst; //server's ip address static in_port_t port; +using std::cout; +using std::endl; + // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers // from multiple classes. @@ -79,7 +82,7 @@ static void startserver() { static void testmarshall() { marshall m; rpc_protocol::request_header rh{1,2,3,4,5}; - m.pack_header(rh); + m.write_header(rh); VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; @@ -97,7 +100,7 @@ static void testmarshall() { unmarshall un(b, true); rpc_protocol::request_header rh1; - un.unpack_header(rh1); + un.read_header(rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; unsigned long long l1; @@ -132,7 +135,7 @@ static void client1(size_t cl) { for(int i = 0; i < 100; i++){ int which = (random() % 2); int arg = (random() % 1000); - int rep; + int rep = -1; auto start = steady_clock::now(); diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h index df11f20..a864784 100644 --- a/rpc/thr_pool.h +++ b/rpc/thr_pool.h @@ -4,7 +4,7 @@ #include "types.h" #include "fifo.h" -typedef function job_t; +typedef std::function job_t; class thread_pool { public: diff --git a/rsm.cc b/rsm.cc index 672243c..5812b33 100644 --- a/rsm.cc +++ b/rsm.cc @@ -174,7 +174,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { // Start accepting synchronization request (statetransferreq) now! insync = true; cfg->get_view(vid_insync, backups); - backups.erase(find(backups.begin(), backups.end(), cfg->myaddr())); + backups.erase(std::find(backups.begin(), backups.end(), cfg->myaddr())); LOG << "backups " << backups; sync_cond.wait(rsm_mutex_lock); insync = false; @@ -214,7 +214,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; + LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret; return false; } if (stf && last_myvs != r.last) { @@ -258,7 +258,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG << "couldn't reach " << m << " " << hex << cl << " " << dec << ret; + LOG << "couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret; return false; } LOG << "succeeded " << log; @@ -301,6 +301,12 @@ void rsm::execute(rpc_protocol::proc_id_t procno, const string & req, string & r r = marshall(ret, rep.content()).content(); } +static void logHexString(locked_ostream && log, const string & s) { + log << std::setfill('0') << std::setw(2) << std::hex; + for (size_t i=0; i m; string myaddr; @@ -349,17 +355,9 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id partition1(rsm_mutex_lock); } } - { - auto && log = LOG << setfill('0') << setw(2) << hex; - for (size_t i=0; i m; string myaddr; @@ -388,7 +386,7 @@ rsm_protocol::status rsm::invoke(int &, rpc_protocol::proc_id_t proc, viewstamp if (primary == myaddr) return rsm_protocol::ERR; cfg->get_view(vid_commit, m); - if (find(m.begin(), m.end(), myaddr) == m.end()) + if (std::find(m.begin(), m.end(), myaddr) == m.end()) return rsm_protocol::ERR; // check sequence number LOG << "Checking sequence number"; @@ -427,7 +425,7 @@ rsm_protocol::status rsm::transferdonereq(int &, const string & m, unsigned vid) lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; - backups.erase(find(backups.begin(), backups.end(), m)); + backups.erase(std::find(backups.begin(), backups.end(), m)); if (backups.empty()) sync_cond.notify_one(); return rsm_protocol::OK; diff --git a/rsm_client.cc b/rsm_client.cc index d047f8b..161ddb5 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -18,7 +18,7 @@ void rsm_client::primary_failure(lock &) { rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const string & req) { lock ml(rsm_client_mutex); while (1) { - LOG << "proc " << hex << proc << " primary " << primary; + LOG << "proc " << std::hex << proc << " primary " << primary; handle h(primary); ml.unlock(); @@ -31,7 +31,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s if (!cl) goto prim_fail; - LOG << "proc " << hex << proc << " primary " << primary << " ret " << dec << ret; + LOG << "proc " << std::hex << proc << " primary " << primary << " ret " << std::dec << ret; if (ret == rsm_client_protocol::OK) return rsm_protocol::OK; if (ret == rsm_client_protocol::BUSY) { @@ -45,7 +45,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string & rep, const s continue; } prim_fail: - LOG << "primary " << primary << " failed ret " << dec << ret; + LOG << "primary " << primary << " failed ret " << std::dec << ret; primary_failure(ml); LOG << "retry new primary " << primary; } diff --git a/rsm_client.h b/rsm_client.h index 2bdf440..303d038 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -51,17 +51,17 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { u >> res; if (!u.okdone()) { LOG << "failed to unmarshall the reply."; - LOG << "You probably forgot to set the reply string in " << - "rsm::client_invoke, or you may have called RPC " << - "0x" << hex << proc << " with the wrong return type"; + LOG << "You probably forgot to set the reply string in " + << "rsm::client_invoke, or you may have called RPC " + << "0x" << std::hex << proc << " with the wrong return type"; LOG << "here's what I got: \"" << hexify(rep) << "\""; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; } if(!unmarshall(res, false, r).okdone()) { LOG << "failed to unmarshall the reply."; - LOG << "You are probably calling RPC 0x" << hex << proc << - " with the wrong return type."; + LOG << "You are probably calling RPC 0x" << std::hex << proc + << " with the wrong return type."; LOG << "here's what I got: \"" << hexify(res) << "\""; VERIFY(0); return rpc_protocol::unmarshall_reply_failure; diff --git a/rsm_protocol.h b/rsm_protocol.h index 9e53430..28773c1 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -20,8 +20,6 @@ struct viewstamp { LEXICOGRAPHIC_COMPARISON(viewstamp) }; -MARSHALLABLE_STRUCT(viewstamp) - namespace rsm_protocol { enum status : rpc_protocol::status { OK, ERR, BUSY}; @@ -39,8 +37,6 @@ namespace rsm_protocol { REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp)); } -MARSHALLABLE_STRUCT(rsm_protocol::transferres) - namespace rsm_test_protocol { enum status : rpc_protocol::status {OK, ERR}; REMOTE_PROCEDURE_BASE(0x12000); diff --git a/rsm_tester.cc b/rsm_tester.cc index 09b2bf9..e821c99 100644 --- a/rsm_tester.cc +++ b/rsm_tester.cc @@ -17,10 +17,10 @@ int main(int argc, char *argv[]) { rsmtest_client *lc = new rsmtest_client(argv[1]); string command(argv[2]); if (command == "partition") { - cout << "net_repair returned " << lc->net_repair(stoi(argv[3])); + LOG_NONMEMBER << "net_repair returned " << lc->net_repair(stoi(argv[3])); } else if (command == "breakpoint") { int b = stoi(argv[3]); - cout << "breakpoint " << b << " returned " << lc->breakpoint(b); + LOG_NONMEMBER << "breakpoint " << b << " returned " << lc->breakpoint(b); } else { LOG_NONMEMBER << "Unknown command " << argv[2]; } diff --git a/rsmtest_client.cc b/rsmtest_client.cc index f4238db..249f1a3 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -5,18 +5,18 @@ rsmtest_client::rsmtest_client(string dst) : cl(dst) { if (cl.bind() < 0) - cout << "rsmtest_client: call bind" << endl; + LOG << "rsmtest_client: call bind"; } rsm_test_protocol::status rsmtest_client::net_repair(int heal) { - rsm_test_protocol::status r; + rsm_test_protocol::status r = rsm_test_protocol::ERR; auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::net_repair, r, heal); VERIFY (ret == rsm_test_protocol::OK); return r; } rsm_test_protocol::status rsmtest_client::breakpoint(int b) { - rsm_test_protocol::status r; + rsm_test_protocol::status r = rsm_test_protocol::ERR; auto ret = (rsm_test_protocol::status)cl.call(rsm_test_protocol::breakpoint, r, b); VERIFY (ret == rsm_test_protocol::OK); return r; diff --git a/threaded_log.cc b/threaded_log.cc index b450f52..96728f6 100644 --- a/threaded_log.cc +++ b/threaded_log.cc @@ -1,10 +1,10 @@ #include "threaded_log.h" -mutex cerr_mutex; -map thread_name_map; -int next_thread_num = 0; -map instance_name_map; -int next_instance_num = 0; +static mutex log_mutex; +static map thread_name_map; +static int next_thread_num = 0; +static map instance_name_map; +static int next_instance_num = 0; int DEBUG_LEVEL = 0; locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func) { @@ -13,16 +13,20 @@ locked_ostream && _log_prefix(locked_ostream && f, const string & file, const st if (tid==0) tid = thread_name_map[thread] = ++next_thread_num; auto utime = duration_cast(system_clock::now().time_since_epoch()).count() % 1000000000; - f << setfill('0') << dec << left << setw(9) << utime << " "; - f << setfill(' ') << log_thread_prefix << left << setw(2) << tid; - f << " " << setw(20) << file << " " << setw(18) << func; - return move(f); + f << std::setfill('0') << std::dec << std::left << std::setw(9) << utime << " "; + f << std::setfill(' ') << log_thread_prefix << std::left << std::setw(2) << tid; + f << " " << std::setw(20) << file << " " << std::setw(18) << func; + return std::move(f); } locked_ostream && _log_member(locked_ostream && f, const void *ptr) { int id = instance_name_map[ptr]; if (id == 0) id = instance_name_map[ptr] = ++next_instance_num; - f << "#" << left << setw(2) << id << " "; - return move(f); + f << "#" << std::left << std::setw(2) << id << " "; + return std::move(f); +} + +lock _log_lock() { + return lock(log_mutex); } diff --git a/threaded_log.h b/threaded_log.h index c02531e..7b1dd75 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -3,34 +3,24 @@ #include "types.h" -extern mutex cerr_mutex; -extern map thread_name_map; -extern int next_thread_num; -extern map instance_name_map; -extern int next_instance_num; extern char log_thread_prefix; +extern int DEBUG_LEVEL; struct locked_ostream { - ostream & s; + std::ostream & s; lock l; - ~locked_ostream() { s << endl; } + locked_ostream(locked_ostream &&) = default; + ~locked_ostream() { s << std::endl; } template locked_ostream & operator<<(U && u) { s << u; return *this; } - - typedef std::ostream& (*ostream_manipulator)(ostream&); - locked_ostream & operator<<(ostream_manipulator manip) { s << manip; return *this; } }; locked_ostream && _log_prefix(locked_ostream && f, const string & file, const string & func); locked_ostream && _log_member(locked_ostream && f, const void *ptr); -#define _log_nonmember(f, ptr) f - -#define _LOG(_context_) _context_(_log_prefix(locked_ostream{cerr, lock(cerr_mutex)}, __FILE__, __func__), (const void *)this) +lock _log_lock(); -#define LOG_NONMEMBER _LOG(_log_nonmember) -#define LOG _LOG(_log_member) - -extern int DEBUG_LEVEL; +#define LOG_NONMEMBER _log_prefix(locked_ostream{std::cerr, _log_lock()}, __FILE__, __func__) +#define LOG _log_member(LOG_NONMEMBER, (const void *)this) #define IF_LEVEL(level) if(DEBUG_LEVEL >= abs(level)) diff --git a/types.h b/types.h index 0241bd3..7de35e9 100644 --- a/types.h +++ b/types.h @@ -4,14 +4,6 @@ #include #include -using std::copy; -using std::count_if; -using std::find; -using std::max; -using std::min; -using std::min_element; -using std::move; -using std::swap; #include using cond = std::condition_variable; @@ -29,31 +21,15 @@ using std::chrono::time_point; using std::chrono::time_point_cast; #include -using std::exception; #include using std::ifstream; using std::ofstream; #include -// std::bind conflicts with BIND(2) -using std::function; -using std::placeholders::_1; #include #include -using std::cout; -using std::cerr; -using std::endl; -using std::dec; -using std::hex; -using std::left; -using std::setw; -using std::setfill; -using std::setprecision; -using std::ostream; -using std::istream; -using std::ios; #include using std::numeric_limits; @@ -79,8 +55,6 @@ using lock = std::unique_lock; using std::runtime_error; #include -using std::ostringstream; -using std::istringstream; #include using std::string; @@ -139,10 +113,17 @@ using enum_type_t = typename enable_if::value, typename underlying_ty template constexpr inline enum_type_t from_enum(E e) noexcept { return (enum_type_t)e; } template constexpr inline E to_enum(enum_type_t value) noexcept { return (E)value; } + +template struct is_tuple_convertible : false_type {}; + +template struct is_tuple_convertible()._tuple_(), void()) +> : true_type {}; + // string manipulation template -ostream & operator<<(ostream & o, const pair & d) { +std::ostream & operator<<(std::ostream & o, const pair & d) { return o << "<" << d.first << "," << d.second << ">"; } @@ -152,7 +133,7 @@ implode(const C & v, string delim=" ") { auto i=v.cbegin(), end=v.cend(); if (i == end) return string(); - ostringstream oss; + std::ostringstream oss; oss << *i++; while (i != end) oss << delim << *i++; @@ -171,16 +152,16 @@ inline vector explode(const string & s, string delim=" ") { } template -typename enable_if::value && !is_same::value, ostream>::type & -operator<<(ostream & o, const A & a) { +typename enable_if::value && !is_same::value, std::ostream>::type & +operator<<(std::ostream & o, const A & a) { return o << "[" << implode(a, ", ") << "]"; } #include "verify.h" #include "threaded_log.h" -// struct tuple adapter, useful for marshalling -// used like +// struct tuple adapter, useful for marshalling and endian swapping. usage: +// // struct foo { // int a, b; // MEMBERS(a, b) @@ -190,12 +171,9 @@ operator<<(ostream & o, const A & a) { inline auto _tuple_() -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } \ inline auto _tuple_() const -> decltype(tie(__VA_ARGS__)) { return tie(__VA_ARGS__); } -// struct ordering and comparison -// used like -// struct foo { -// int a, b; -// MEMBERS(a, b) -// }; +// struct ordering and comparison operations; requires the use of MEMBERS. +// usage: +// // LEXICOGRAPHIC_COMPARISON(foo) #define LEXICOGRAPHIC_OPERATOR(_c_, _op_) \ @@ -223,6 +201,8 @@ template struct make_tuple_indices { typedef typename make_indices_imp, E>::type type; }; +#define TUPLE_INDICES(_ArgPack_) typename make_tuple_indices::type() + // Template parameter pack expansion is not allowed in certain contexts, but // brace initializers (for instance, calls to constructors of empty structs) // are fair game. -- 1.7.9.5