From 5d99dbf06a14904944f5593c63705934bdfdcfb7 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Sun, 29 Sep 2013 21:02:07 -0400 Subject: [PATCH] More dependency check-ups --- Makefile | 12 +-- endian.h | 41 ++++++++ handle.h | 2 +- lang/verify.h | 4 +- lock_client.cc | 6 +- lock_server.cc | 14 ++- lock_tester.cc | 14 +-- log.cc | 8 +- paxos.cc | 2 +- paxos.h | 2 +- rpc/connection.cc | 112 +++++++++++----------- rpc/connection.h | 15 ++- rpc/marshall.cc | 122 ++++-------------------- rpc/marshall.h | 275 ++++++++++++++++++----------------------------------- rpc/pollmgr.cc | 14 +-- rpc/rpc.cc | 188 ++++++++++++++++-------------------- rpc/rpc.h | 50 +++------- rpc/rpctest.cc | 22 ++--- rpc/thr_pool.cc | 14 +-- rpc/thr_pool.h | 4 +- rsm.cc | 62 ++++++------ rsm_client.h | 20 ++-- rsmtest_client.cc | 2 +- rsmtest_client.h | 2 +- threaded_log.h | 7 +- types.h | 25 +++++ 26 files changed, 430 insertions(+), 609 deletions(-) create mode 100644 endian.h diff --git a/Makefile b/Makefile index e2a20d9..010818d 100644 --- a/Makefile +++ b/Makefile @@ -13,17 +13,13 @@ rpc/librpc.a: rpc/rpc.o rpc/marshall.o rpc/connection.o rpc/pollmgr.o rpc/thr_po rpc/rpctest: rpc/rpctest.o threaded_log.o rpc/librpc.a -lock_demo=lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o -lock_demo : $(lock_demo) rpc/librpc.a +lock_demo : lock_demo.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a -lock_tester=lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o -lock_tester : $(lock_tester) rpc/librpc.a +lock_tester : lock_tester.o lock_client.o threaded_log.o rsm_client.o handle.o rpc/librpc.a -lock_server=lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o -lock_server : $(lock_server) rpc/librpc.a +lock_server : lock_smain.o threaded_log.o handle.o rsm.o paxos.o config.o log.o lock_server.o rpc/librpc.a -rsm_tester=rsm_tester.o rsmtest_client.o threaded_log.o -rsm_tester: $(rsm_tester) rpc/librpc.a +rsm_tester: rsm_tester.o rsmtest_client.o threaded_log.o rpc/librpc.a %.o: %.cc $(CXX) $(CXXFLAGS) -c $< -o $@ diff --git a/endian.h b/endian.h new file mode 100644 index 0000000..7385406 --- /dev/null +++ b/endian.h @@ -0,0 +1,41 @@ +#ifndef endian_h +#define endian_h + +#include + +constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1}; + +inline uint8_t hton(uint8_t t) { return t; } +inline int8_t hton(int8_t t) { return t; } +inline uint16_t hton(uint16_t t) { return htons(t); } +inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); } +inline uint32_t hton(uint32_t t) { return htonl(t); } +inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); } +inline uint64_t hton(uint64_t t) { + if (!endianness.is_little_endian) + return t; + return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32); +} +inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); } + +template inline T ntoh(T t) { return hton(t); } + +template inline tuple::type...> +tuple_hton_imp(tuple && t, tuple_indices) { + return tuple::type...>(hton(get(t))...); +} + +template inline tuple::type...> +hton(tuple && t) { + using Indices = typename make_tuple_indices::type; + return tuple_hton_imp(forward>(t), Indices()); +} + +#define ENDIAN_SWAPPABLE(_c_) \ +inline _c_ hton(_c_ && t) { \ + _c_ result; \ + result._tuple_() = hton(t._tuple_()); \ + return result; \ +} + +#endif diff --git a/handle.h b/handle.h index a513b56..d4b6223 100644 --- a/handle.h +++ b/handle.h @@ -1,5 +1,5 @@ // manage a cache of RPC connections. -// assuming cid is a std::string holding the +// assuming cid is a string holding the // host:port of the RPC server you want // to talk to: // diff --git a/lang/verify.h b/lang/verify.h index 823a48d..622aaf2 100644 --- a/lang/verify.h +++ b/lang/verify.h @@ -1,5 +1,3 @@ -// safe assertions. - #ifndef verify_client_h #define verify_client_h @@ -7,7 +5,7 @@ #include #ifdef NDEBUG -#define VERIFY(expr) do { if (!(expr)) abort(); } while (0) +#define VERIFY(expr) { if (!(expr)) abort(); } #else #define VERIFY(expr) assert(expr) #endif diff --git a/lock_client.cc b/lock_client.cc index 99dcb5b..de357f1 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -4,7 +4,7 @@ #include void lock_state::wait(lock & mutex_lock) { - auto self = std::this_thread::get_id(); + auto self = this_thread::get_id(); c[self].wait(mutex_lock); c.erase(self); } @@ -80,7 +80,7 @@ int lock_client::stat(lock_protocol::lockid_t lid) { lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { lock_state &st = get_lock_state(lid); lock sl(st.m); - auto self = std::this_thread::get_id(); + auto self = this_thread::get_id(); // check for reentrancy VERIFY(st.state != lock_state::locked || st.held_by != self); @@ -145,7 +145,7 @@ lock_protocol::status lock_client::acquire(lock_protocol::lockid_t lid) { lock_protocol::status lock_client::release(lock_protocol::lockid_t lid) { lock_state &st = get_lock_state(lid); lock sl(st.m); - auto self = std::this_thread::get_id(); + auto self = this_thread::get_id(); VERIFY(st.state == lock_state::locked && st.held_by == self); st.state = lock_state::free; LOG("Lock " << lid << ": free"); diff --git a/lock_server.cc b/lock_server.cc index d5e85a5..b724140 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -30,8 +30,8 @@ lock_state & lock_server::get_lock_state(lock_protocol::lockid_t lid) { } lock_server::lock_server(class rsm *_rsm) : rsm (_rsm) { - std::thread(&lock_server::revoker, this).detach(); - std::thread(&lock_server::retryer, this).detach(); + thread(&lock_server::revoker, this).detach(); + thread(&lock_server::retryer, this).detach(); rsm->set_state_transfer(this); } @@ -167,16 +167,14 @@ int lock_server::release(int &, lock_protocol::lockid_t lid, callback_t id, lock string lock_server::marshal_state() { lock sl(lock_table_lock); marshall rep; - rep << nacquire; - rep << lock_table; - return rep.str(); + rep << nacquire << lock_table; + return rep.content(); } void lock_server::unmarshal_state(string state) { lock sl(lock_table_lock); - unmarshall rep(state); - rep >> nacquire; - rep >> lock_table; + unmarshall rep(state, false); + rep >> nacquire >> lock_table; } lock_protocol::status lock_server::stat(int &r, lock_protocol::lockid_t lid) { diff --git a/lock_tester.cc b/lock_tester.cc index c192128..f8e2196 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -11,7 +11,7 @@ char log_thread_prefix = 'c'; // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. -std::string dst; +string dst; lock_client **lc = new lock_client * [nt]; lock_protocol::lockid_t a = "1"; lock_protocol::lockid_t b = "2"; @@ -21,7 +21,7 @@ lock_protocol::lockid_t c = "3"; // doesn't grant the same lock to both clients. // it assumes that lock names are distinct in the first byte. int ct[256]; -std::mutex count_mutex; +mutex count_mutex; void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); @@ -116,7 +116,7 @@ void test5(int i) { int main(int argc, char *argv[]) { - std::thread th[nt]; + thread th[nt]; int test = 0; setvbuf(stdout, NULL, _IONBF, 0); @@ -148,7 +148,7 @@ main(int argc, char *argv[]) if (!test || test == 2) { // test2 for (int i = 0; i < nt; i++) - th[i] = std::thread(test2, i); + th[i] = thread(test2, i); for (int i = 0; i < nt; i++) th[i].join(); } @@ -157,7 +157,7 @@ main(int argc, char *argv[]) LOG_NONMEMBER("test 3"); for (int i = 0; i < nt; i++) - th[i] = std::thread(test3, i); + th[i] = thread(test3, i); for (int i = 0; i < nt; i++) th[i].join(); } @@ -166,7 +166,7 @@ main(int argc, char *argv[]) LOG_NONMEMBER("test 4"); for (int i = 0; i < 2; i++) - th[i] = std::thread(test4, i); + th[i] = thread(test4, i); for (int i = 0; i < 2; i++) th[i].join(); } @@ -175,7 +175,7 @@ main(int argc, char *argv[]) LOG_NONMEMBER("test 5"); for (int i = 0; i < nt; i++) - th[i] = std::thread(test5, i); + th[i] = thread(test5, i); for (int i = 0; i < nt; i++) th[i].join(); } diff --git a/log.cc b/log.cc index de00d67..3b881fa 100644 --- a/log.cc +++ b/log.cc @@ -60,14 +60,14 @@ string log::dump() { void log::restore(string s) { LOG("restore: " << s); - ofstream f(name, std::ios::trunc); + ofstream f(name, ios::trunc); f << s; f.close(); } // XXX should be an atomic operation void log::loginstance(unsigned instance, string v) { - ofstream f(name, std::ios::app); + ofstream f(name, ios::app); f << "done " << instance << " " << v << "\n"; f.close(); } @@ -75,7 +75,7 @@ void log::loginstance(unsigned instance, string v) { // an acceptor should call logprop(promise) when it // receives a prepare to which it responds prepare_ok(). void log::logprop(prop_t promise) { - ofstream f(name, std::ios::app); + ofstream f(name, ios::app); f << "propseen " << promise.n << " " << promise.m << "\n"; f.close(); } @@ -83,7 +83,7 @@ void log::logprop(prop_t promise) { // an acceptor should call logaccept(accepted, accepted_value) when it // receives an accept RPC to which it replies accept_ok(). void log::logaccept(prop_t n, string v) { - ofstream f(name, std::ios::app); + ofstream f(name, ios::app); f << "accepted " << n.n << " " << n.m << " " << v << "\n"; f.close(); } diff --git a/paxos.cc b/paxos.cc index f9d5785..b39fa5b 100644 --- a/paxos.cc +++ b/paxos.cc @@ -46,7 +46,7 @@ bool proposer_acceptor::run(unsigned instance, const nodes_t & cur_nodes, const } stable = false; bool r = false; - proposal.n = std::max(promise.n, proposal.n) + 1; + proposal.n = max(promise.n, proposal.n) + 1; nodes_t accepts; value_t v = newv; if (prepare(instance, accepts, cur_nodes, v)) { diff --git a/paxos.h b/paxos.h index 426dfef..186daab 100644 --- a/paxos.h +++ b/paxos.h @@ -29,7 +29,7 @@ class proposer_acceptor { paxos_change *delegate; node_t me; - rpcs pxs = {(uint32_t)std::stoi(me)}; + rpcs pxs = {(uint32_t)stoi(me)}; bool break1 = false; bool break2 = false; diff --git a/rpc/connection.cc b/rpc/connection.cc index 86d4ec5..55e374a 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -1,11 +1,11 @@ // std::bind and syscall bind have the same name, so don't use std::bind in this file #define LIBT4_NO_FUNCTIONAL #include "connection.h" +#include +#include #include #include #include -#include -#include #include #include @@ -15,8 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) { int flags = fcntl(fd_, F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(fd_, F_SETFL, flags); + fcntl(fd_, F_SETFL, flags | O_NONBLOCK); signal(SIGPIPE, SIG_IGN); @@ -27,9 +26,7 @@ connection::connection(chanmgr *m1, int f1, int l1) connection::~connection() { VERIFY(dead_); - if (rpdu_.buf) - free(rpdu_.buf); - VERIFY(!wpdu_.buf); + VERIFY(!wpdu_.buf.size()); close(fd_); } @@ -46,12 +43,10 @@ bool connection::isdead() { void connection::closeconn() { { lock ml(m_); - if (!dead_) { - dead_ = true; - shutdown(fd_,SHUT_RDWR); - } else { + if (dead_) return; - } + dead_ = true; + shutdown(fd_,SHUT_RDWR); } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active @@ -81,18 +76,18 @@ int connection::compare(connection *another) { return 0; } -bool connection::send(char *b, size_t sz) { +bool connection::send(const string & b) { lock ml(m_); + waiters_++; - while (!dead_ && wpdu_.buf) { + while (!dead_ && wpdu_.buf.size()) send_wait_.wait(ml); - } waiters_--; - if (dead_) { + + if (dead_) return false; - } + wpdu_.buf = b; - wpdu_.sz = sz; wpdu_.solong = 0; if (lossy_) { @@ -107,19 +102,15 @@ bool connection::send(char *b, size_t sz) { ml.unlock(); PollMgr::Instance()->block_remove_fd(fd_); ml.lock(); - } else { - if (wpdu_.solong == wpdu_.sz) { - } else { - //should be rare to need to explicitly add write callback - PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); - while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.sz) { - send_complete_.wait(ml); - } - } + } else if (wpdu_.solong != wpdu_.buf.size()) { + // should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) + send_complete_.wait(ml); } - bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); - wpdu_.solong = wpdu_.sz = 0; - wpdu_.buf = NULL; + bool ret = (!dead_ && wpdu_.solong == wpdu_.buf.size()); + wpdu_.solong = 0; + wpdu_.buf.clear(); if (waiters_ > 0) send_wait_.notify_all(); return ret; @@ -130,7 +121,7 @@ void connection::write_cb(int s) { lock ml(m_); VERIFY(!dead_); VERIFY(fd_ == s); - if (wpdu_.sz == 0) { + if (wpdu_.buf.size() == 0) { PollMgr::Instance()->del_callback(fd_,CB_WRONLY); return; } @@ -139,7 +130,7 @@ void connection::write_cb(int s) { dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong < wpdu_.sz) { + if (wpdu_.solong < wpdu_.buf.size()) { return; } } @@ -154,41 +145,44 @@ void connection::read_cb(int s) { return; } + IF_LEVEL(5) LOG("got data on fd " << s); + bool succ = true; - if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { succ = readpdu(); } if (!succ) { + IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); PollMgr::Instance()->del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } - if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { - if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf)) { //chanmgr has successfully consumed the pdu - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; } } } bool connection::writepdu() { VERIFY(wpdu_.solong != size_t_max); - if (wpdu_.solong == wpdu_.sz) + if (wpdu_.solong == wpdu_.buf.size()) return true; if (wpdu_.solong == 0) { - uint32_t sz = htonl((uint32_t)wpdu_.sz); - bcopy(&sz,wpdu_.buf,sizeof(sz)); + uint32_t sz = htonl((uint32_t)wpdu_.buf.size() - sizeof(uint32_t)); + copy((const char *)&sz, (const char *)(&sz+1), &wpdu_.buf[0]); } - ssize_t n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { IF_LEVEL(1) LOG("connection::writepdu fd_ " << fd_ << " failure errno=" << errno); wpdu_.solong = size_t_max; - wpdu_.sz = 0; + wpdu_.buf.clear(); } return (errno == EAGAIN); } @@ -197,7 +191,8 @@ bool connection::writepdu() { } bool connection::readpdu() { - if (!rpdu_.sz) { + IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); + if (!rpdu_.buf.size()) { uint32_t sz1; ssize_t n = read(fd_, &sz1, sizeof(sz1)); @@ -211,33 +206,34 @@ bool connection::readpdu() { } if (n > 0 && n != sizeof(sz1)) { - IF_LEVEL(0) LOG("connection::readpdu short read of sz"); + IF_LEVEL(0) LOG("short read of sz"); return false; } size_t sz = ntohl(sz1); if (sz > MAX_PDU) { - IF_LEVEL(2) LOG("connection::readpdu read pdu TOO BIG " << sz << " network order=" << hex << sz1); + IF_LEVEL(2) LOG("read pdu TOO BIG " << sz << " network order=" << hex << sz1); return false; } - rpdu_.sz = sz; - VERIFY(rpdu_.buf == NULL); - rpdu_.buf = (char *)malloc(sz+sizeof(sz1)); - VERIFY(rpdu_.buf); - bcopy(&sz1,rpdu_.buf,sizeof(sz1)); + IF_LEVEL(5) LOG("read size of datagram = " << sz); + + VERIFY(rpdu_.buf.size() == 0); + rpdu_.buf = string(sz+sizeof(sz1), 0); + copy((const char *)&sz1, (const char *)(&sz1 + 1), &rpdu_.buf[0]); rpdu_.solong = sizeof(sz1); } - ssize_t n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + + IF_LEVEL(5) LOG("read " << n << " bytes"); + if (n <= 0) { if (errno == EAGAIN) return true; - if (rpdu_.buf) - free(rpdu_.buf); - rpdu_.buf = NULL; - rpdu_.sz = rpdu_.solong = 0; + rpdu_.buf.clear(); + rpdu_.solong = 0; return (errno == EAGAIN); } rpdu_.solong += (size_t)n; @@ -254,7 +250,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) tcp_ = socket(AF_INET, SOCK_STREAM, 0); if (tcp_ < 0) { - perror("tcpsconn::tcpsconn accept_loop socket:"); + perror("accept_loop socket:"); VERIFY(0); } @@ -268,7 +264,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) } if (listen(tcp_, 1000) < 0) { - perror("tcpsconn::tcpsconn listen:"); + perror("listen:"); VERIFY(0); } @@ -276,7 +272,7 @@ tcpsconn::tcpsconn(chanmgr *m1, unsigned int port, int lossytest) VERIFY(getsockname(tcp_, (sockaddr *)&sin, &addrlen) == 0); port_ = ntohs(sin.sin_port); - IF_LEVEL(2) LOG("tcpsconn::tcpsconn listen on " << port_ << " " << sin.sin_port); + IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); if (pipe(pipe_) < 0) { perror("accept_loop pipe:"); diff --git a/rpc/connection.h b/rpc/connection.h index 2a01e46..882c1e0 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -5,7 +5,6 @@ #include #include #include -#include #include "pollmgr.h" constexpr size_t size_t_max = numeric_limits::max(); @@ -16,18 +15,15 @@ class connection; class chanmgr { public: - virtual bool got_pdu(connection *c, char *b, size_t sz) = 0; + virtual bool got_pdu(connection *c, const string & b) = 0; virtual ~chanmgr() {} }; class connection : public aio_callback { public: struct charbuf { - charbuf(): buf(NULL), sz(0), solong(0) {} - charbuf (char *b, size_t s) : buf(b), sz(s), solong(0){} - char *buf; - size_t sz; - size_t solong; // number of bytes written or read so far + string buf; + size_t solong = 0; // number of bytes written or read so far }; connection(chanmgr *m1, int f1, int lossytest=0); @@ -37,7 +33,7 @@ class connection : public aio_callback { bool isdead(); void closeconn(); - bool send(char *b, size_t sz); + bool send(const string & b); void write_cb(int s); void read_cb(int s); @@ -46,6 +42,7 @@ class connection : public aio_callback { int ref() { lock rl(ref_m_); return refno_; } int compare(connection *another); + private: bool readpdu(); @@ -62,7 +59,7 @@ class connection : public aio_callback { int waiters_ = 0; int refno_ = 1; - const int lossy_; + int lossy_ = 0; mutex m_; mutex ref_m_; diff --git a/rpc/marshall.cc b/rpc/marshall.cc index 5c2b10c..b8371cf 100644 --- a/rpc/marshall.cc +++ b/rpc/marshall.cc @@ -1,114 +1,28 @@ #include "types.h" #include "marshall.h" -marshall & -operator<<(marshall &m, uint8_t x) { - m.rawbyte(x); - return m; -} - -marshall & -operator<<(marshall &m, uint16_t x) { - x = hton(x); - m.rawbytes((char *)&x, 2); - return m; -} - -marshall & -operator<<(marshall &m, uint32_t x) { - x = hton(x); - m.rawbytes((char *)&x, 4); - return m; -} - -marshall & operator<<(marshall &m, int32_t x) { return m << (uint32_t) x; } -marshall & operator<<(marshall &m, int8_t x) { return m << (uint8_t)x; } -marshall & operator<<(marshall &m, bool x) { return m << (uint8_t)x; } -marshall & operator<<(marshall &m, int16_t x) { return m << (uint16_t)x; } -marshall & operator<<(marshall &m, uint64_t x) { return m << (uint32_t)(x>>32) << (uint32_t)x; } - -marshall & -operator<<(marshall &m, const string &s) { - m << (unsigned int) s.size(); +MARSHALL_RAW_NETWORK_ORDER_AS(bool, uint8_t) +MARSHALL_RAW_NETWORK_ORDER(uint8_t) +MARSHALL_RAW_NETWORK_ORDER(int8_t) +MARSHALL_RAW_NETWORK_ORDER(uint16_t) +MARSHALL_RAW_NETWORK_ORDER(int16_t) +MARSHALL_RAW_NETWORK_ORDER(uint32_t) +MARSHALL_RAW_NETWORK_ORDER(int32_t) +MARSHALL_RAW_NETWORK_ORDER_AS(size_t, uint32_t) +MARSHALL_RAW_NETWORK_ORDER(uint64_t) +MARSHALL_RAW_NETWORK_ORDER(int64_t) + +marshall & operator<<(marshall &m, const string &s) { + m << (uint32_t)s.size(); m.rawbytes(s.data(), s.size()); return m; } -void marshall::pack_req_header(const request_header &h) { - size_t saved_sz = index_; - //leave the first 4-byte empty for channel to fill size of pdu - index_ = sizeof(rpc_sz_t); - *this << h.xid << h.proc << h.clt_nonce << h.srv_nonce << h.xid_rep; - index_ = saved_sz; -} - -void marshall::pack_reply_header(const reply_header &h) { - size_t saved_sz = index_; - //leave the first 4-byte empty for channel to fill size of pdu - index_ = sizeof(rpc_sz_t); - *this << h.xid << h.ret; - index_ = saved_sz; -} - -// take the contents from another unmarshall object -void -unmarshall::take_in(unmarshall &another) -{ - if(buf_) - free(buf_); - another.take_buf(&buf_, &sz_); - index_ = RPC_HEADER_SZ; - ok_ = sz_ >= RPC_HEADER_SZ?true:false; -} - -inline bool -unmarshall::ensure(size_t n) { - if (index_+n > sz_) - ok_ = false; - return ok_; -} - -inline uint8_t -unmarshall::rawbyte() -{ - if (!ensure(1)) - return 0; - return (uint8_t)buf_[index_++]; -} - -void -unmarshall::rawbytes(string &ss, size_t n) -{ - VERIFY(ensure(n)); - ss.assign(buf_+index_, n); - index_ += n; -} - -template -void -unmarshall::rawbytes(T &t) -{ - const size_t n = sizeof(T); - VERIFY(ensure(n)); - memcpy(&t, buf_+index_, n); - t = ntoh(t); - index_ += n; -} - -unmarshall & operator>>(unmarshall &u, bool &x) { x = (bool)u.rawbyte(); return u; } -unmarshall & operator>>(unmarshall &u, uint8_t &x) { x = u.rawbyte(); return u; } -unmarshall & operator>>(unmarshall &u, int8_t &x) { x = (int8_t)u.rawbyte(); return u; } -unmarshall & operator>>(unmarshall &u, uint16_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, int16_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, uint32_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, int32_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, size_t &x) { uint32_t xx; u.rawbytes(xx); x = xx; return u; } -unmarshall & operator>>(unmarshall &u, uint64_t &x) { u.rawbytes(x); return u; } -unmarshall & operator>>(unmarshall &u, int64_t &x) { u.rawbytes(x); return u; } - unmarshall & operator>>(unmarshall &u, string &s) { - unsigned sz = u.grab(); - if(u.ok()) - u.rawbytes(s, sz); + uint32_t sz = u.grab(); + if (u.ok()) { + s.resize(sz); + u.rawbytes(&s[0], sz); + } return u; } diff --git a/rpc/marshall.h b/rpc/marshall.h index 98856e4..d7f1dff 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -2,137 +2,111 @@ #define marshall_h #include "types.h" -#include -#include -#include + +// for structs or classes containing a MEMBERS declaration +class marshall; +class unmarshall; +#define FORWARD_MARSHALLABLE(_c_) \ +extern unmarshall & operator>>(unmarshall &u, typename remove_reference<_c_>::type &a); \ +extern marshall & operator<<(marshall &m, const _c_ a); +#define MARSHALLABLE(_c_) \ +inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \ +inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); } + +// for plain old data +#define MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _d_) \ +marshall & operator<<(marshall &m, _c_ x) { _d_ y = hton((_d_)x); m.rawbytes(&y, sizeof(_d_)); return m; } \ +unmarshall & operator>>(unmarshall &u, _c_ &x) { _d_ y; u.rawbytes(&y, sizeof(_d_)); x = (_c_)ntoh(y); return u; } + +#define MARSHALL_RAW_NETWORK_ORDER(_c_) MARSHALL_RAW_NETWORK_ORDER_AS(_c_, _c_) using proc_t = uint32_t; using status_t = int32_t; struct request_header { - request_header(int x=0, proc_t p=0, unsigned c=0, unsigned s=0, int xi=0) : - xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} int xid; proc_t proc; unsigned int clt_nonce; unsigned int srv_nonce; int xid_rep; + + MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) }; +FORWARD_MARSHALLABLE(request_header) +ENDIAN_SWAPPABLE(request_header) + struct reply_header { - reply_header(int x=0, int r=0): xid(x), ret(r) {} int xid; int ret; -}; - -template inline T hton(T t); - -constexpr union { uint32_t i; uint8_t is_little_endian; } endianness{1}; -template<> inline uint8_t hton(uint8_t t) { return t; } -template<> inline int8_t hton(int8_t t) { return t; } -template<> inline uint16_t hton(uint16_t t) { return htons(t); } -template<> inline int16_t hton(int16_t t) { return (int16_t)htons((uint16_t)t); } -template<> inline uint32_t hton(uint32_t t) { return htonl(t); } -template<> inline int32_t hton(int32_t t) { return (int32_t)htonl((uint32_t)t); } -template<> inline uint64_t hton(uint64_t t) { - if (!endianness.is_little_endian) - return t; - return (uint64_t)htonl((uint32_t)(t >> 32)) | ((uint64_t)htonl((uint32_t)t) << 32); -} -template<> inline int64_t hton(int64_t t) { return (int64_t)hton((uint64_t)t); } -template<> inline request_header hton(request_header h) { return {hton(h.xid), hton(h.proc), hton(h.clt_nonce), hton(h.srv_nonce), hton(h.xid_rep)}; } -template<> inline reply_header hton(reply_header h) { return {hton(h.xid), hton(h.ret)}; } + MEMBERS(xid, ret) +}; -template inline T ntoh(T t) { return hton(t); } +FORWARD_MARSHALLABLE(reply_header) +ENDIAN_SWAPPABLE(reply_header) typedef int rpc_sz_t; -//size of initial buffer allocation -#define DEFAULT_RPC_SZ 1024 -#define RPC_HEADER_SZ (max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t)) +const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); +const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation +// Template parameter pack expansion is not allowed in certain contexts, but +// brace initializers (for instance, calls to constructors of empty structs) +// are fair game. struct pass { template inline pass(Args&&...) {} }; class marshall { private: - char *buf_; // Base of the raw bytes buffer (dynamically readjusted) - size_t capacity_; // Capacity of the buffer - size_t index_; // Read/write head position + string buf_ = string(DEFAULT_RPC_SZ, 0); // Raw bytes buffer + size_t index_ = RPC_HEADER_SZ; // Read/write head position inline void reserve(size_t n) { - if((index_+n) > capacity_){ - capacity_ += max(capacity_, n); - VERIFY (buf_ != NULL); - buf_ = (char *)realloc(buf_, capacity_); - VERIFY(buf_); - } + if (index_+n > buf_.size()) + buf_.resize(index_+n); } public: template marshall(const Args&... args) { - buf_ = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); - VERIFY(buf_); - capacity_ = DEFAULT_RPC_SZ; - index_ = RPC_HEADER_SZ; (void)pass{(*this << args)...}; } - ~marshall() { - if (buf_) - free(buf_); - } - - size_t size() { return index_;} - char *cstr() { return buf_;} - const char *cstr() const { return buf_;} - - void rawbyte(uint8_t x) { - reserve(1); - buf_[index_++] = (int8_t)x; - } - - void rawbytes(const char *p, size_t n) { + void rawbytes(const void *p, size_t n) { reserve(n); - memcpy(buf_+index_, p, n); + copy((char *)p, (char *)p+n, &buf_[index_]); index_ += n; } - // Return the current content (excluding header) as a string - string get_content() { - return string(buf_+RPC_HEADER_SZ,index_-RPC_HEADER_SZ); - } - - // Return the current content (excluding header) as a string - string str() { - return get_content(); - } - - void pack_req_header(const request_header &h); - void pack_reply_header(const reply_header &h); - - void take_buf(char **b, size_t *s) { - *b = buf_; - *s = index_; - buf_ = NULL; - index_ = 0; - return; + // with header + operator string () const { return buf_.substr(0,index_); } + // without header + string content() { return buf_.substr(RPC_HEADER_SZ,index_-RPC_HEADER_SZ); } + + template + void pack_header(const T &h) { + VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ); + size_t saved_sz = index_; + index_ = sizeof(rpc_sz_t); // first 4 bytes hold length field + *this << h; + index_ = saved_sz; } }; -marshall& operator<<(marshall &, bool); -marshall& operator<<(marshall &, uint32_t); -marshall& operator<<(marshall &, int32_t); -marshall& operator<<(marshall &, uint8_t); -marshall& operator<<(marshall &, int8_t); -marshall& operator<<(marshall &, uint16_t); -marshall& operator<<(marshall &, int16_t); -marshall& operator<<(marshall &, uint64_t); -marshall& operator<<(marshall &, const string &); +FORWARD_MARSHALLABLE(bool); +FORWARD_MARSHALLABLE(uint8_t); +FORWARD_MARSHALLABLE(int8_t); +FORWARD_MARSHALLABLE(uint16_t); +FORWARD_MARSHALLABLE(int16_t); +FORWARD_MARSHALLABLE(uint32_t); +FORWARD_MARSHALLABLE(int32_t); +FORWARD_MARSHALLABLE(size_t); +FORWARD_MARSHALLABLE(uint64_t); +FORWARD_MARSHALLABLE(int64_t); +FORWARD_MARSHALLABLE(string &); template typename enable_if::value, marshall>::type & operator<<(marshall &m, const A &x) { - m << (unsigned int) x.size(); + m << (unsigned int)x.size(); for (const auto &a : x) m << a; return m; @@ -153,92 +127,48 @@ operator<<(marshall &m, E e) { return m << from_enum(e); } -class unmarshall; - -unmarshall& operator>>(unmarshall &, bool &); -unmarshall& operator>>(unmarshall &, uint8_t &); -unmarshall& operator>>(unmarshall &, int8_t &); -unmarshall& operator>>(unmarshall &, uint16_t &); -unmarshall& operator>>(unmarshall &, int16_t &); -unmarshall& operator>>(unmarshall &, uint32_t &); -unmarshall& operator>>(unmarshall &, int32_t &); -unmarshall& operator>>(unmarshall &, size_t &); -unmarshall& operator>>(unmarshall &, uint64_t &); -unmarshall& operator>>(unmarshall &, int64_t &); -unmarshall& operator>>(unmarshall &, string &); template typename enable_if::value, unmarshall>::type & operator>>(unmarshall &u, E &e); class unmarshall { private: - char *buf_; - size_t sz_; - size_t index_; - bool ok_; - - inline bool ensure(size_t n); - public: - unmarshall(): buf_(NULL),sz_(0),index_(0),ok_(false) {} - unmarshall(char *b, size_t sz): buf_(b),sz_(sz),index_(),ok_(true) {} - unmarshall(const string &s) : buf_(NULL),sz_(0),index_(0),ok_(false) - { - //take the content which does not exclude a RPC header from a string - take_content(s); - } - ~unmarshall() { - if (buf_) free(buf_); + string buf_; + size_t index_ = 0; + bool ok_ = false; + + inline bool ensure(size_t n) { + if (index_+n > buf_.size()) + ok_ = false; + return ok_; } - - //take contents from another unmarshall object - void take_in(unmarshall &another); - - //take the content which does not exclude a RPC header from a string - void take_content(const string &s) { - sz_ = s.size()+RPC_HEADER_SZ; - buf_ = (char *)realloc(buf_,sz_); - VERIFY(buf_); - index_ = RPC_HEADER_SZ; - memcpy(buf_+index_, s.data(), s.size()); - ok_ = true; + public: + unmarshall() {} + unmarshall(const string &s, bool has_header) + : buf_(s),index_(RPC_HEADER_SZ) { + if (!has_header) + buf_.insert(0, RPC_HEADER_SZ, 0); + ok_ = (buf_.size() >= RPC_HEADER_SZ); } bool ok() const { return ok_; } - char *cstr() { return buf_;} - bool okdone() const { return ok_ && index_ == sz_; } - - uint8_t rawbyte(); - void rawbytes(string &s, size_t n); - template void rawbytes(T &t); - - size_t ind() { return index_;} - size_t size() { return sz_;} - void take_buf(char **b, size_t *sz) { - *b = buf_; - *sz = sz_; - sz_ = index_ = 0; - buf_ = NULL; - } + bool okdone() const { return ok_ && index_ == buf_.size(); } - void unpack_req_header(request_header *h) { - //the first 4-byte is for channel to fill size of pdu - index_ = sizeof(rpc_sz_t); - *this >> h->xid >> h->proc >> h->clt_nonce >> h->srv_nonce >> h->xid_rep; - index_ = RPC_HEADER_SZ; + void rawbytes(void * t, size_t n) { + VERIFY(ensure(n)); + copy(&buf_[index_], &buf_[index_+n], (char *)t); + index_ += n; } - void unpack_reply_header(reply_header *h) { - //the first 4-byte is for channel to fill size of pdu + template + void unpack_header(T & h) { + // first 4 bytes hold length field + VERIFY(sizeof(T)+sizeof(rpc_sz_t) <= RPC_HEADER_SZ); index_ = sizeof(rpc_sz_t); - *this >> h->xid >> h->ret; + *this >> h; index_ = RPC_HEADER_SZ; } - template - inline A grab() { - A a; - *this >> a; - return a; - } + template inline T grab() { T t; *this >> t; return t; } }; template typename enable_if::value, unmarshall>::type & @@ -279,7 +209,7 @@ typedef function handler; // PAI 2013/09/19 // C++11 does neither of these two things for us: // 1) Declare variables using a parameter pack expansion, like so -// Args ...args; +// Args... args; // 2) Call a function with a tuple of the arguments it expects // // We implement an 'invoke' function for functions of the RPC handler @@ -293,21 +223,6 @@ typedef function handler; // 'invoke' as a parameter which will be ignored, but its type will force the // compiler to specialize 'invoke' appropriately. -// The following implementation of tuple_indices is redistributed under the MIT -// License as an insubstantial portion of the LLVM compiler infrastructure. - -template struct tuple_indices {}; -template struct make_indices_imp; -template struct make_indices_imp, E> { - typedef typename make_indices_imp, E>::type type; -}; -template struct make_indices_imp, E> { - typedef tuple_indices type; -}; -template struct make_tuple_indices { - typedef typename make_indices_imp, E>::type type; -}; - // This class encapsulates the default response to runtime unmarshalling // failures. The templated wrappers below may optionally use a different // class. @@ -324,7 +239,7 @@ struct VerifyOnFailure { // One for function pointers... -template +template typename enable_if::value, RV>::type invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { return f(r, move(get(t))...); @@ -332,7 +247,7 @@ invoke(RV, F f, void *, R & r, args_type & t, tuple_indices) { // And one for pointers to member functions... -template +template typename enable_if::value, RV>::type invoke(RV, F f, C *c, R & r, args_type & t, tuple_indices) { return (c->*f)(r, move(get(t))...); @@ -406,7 +321,7 @@ template struct marshalled_func> : public marshalled_func_imp {}; -template unmarshall & +template unmarshall & tuple_unmarshall_imp(unmarshall & u, tuple t, tuple_indices) { (void)pass{(u >> get(t))...}; return u; @@ -418,7 +333,7 @@ operator>>(unmarshall & u, tuple && t) { return tuple_unmarshall_imp(u, t, Indices()); } -template marshall & +template marshall & tuple_marshall_imp(marshall & m, tuple & t, tuple_indices) { (void)pass{(m << get(t))...}; return m; @@ -430,9 +345,7 @@ operator<<(marshall & m, tuple && t) { return tuple_marshall_imp(m, t, Indices()); } -// for structs or classes containing a MEMBERS declaration -#define MARSHALLABLE(_c_) \ -inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \ -inline marshall & operator<<(marshall &m, _c_ a) { return m << a._tuple_(); } +MARSHALLABLE(request_header) +MARSHALLABLE(reply_header) #endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index 15fba26..4254b4f 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -6,7 +6,7 @@ #include "pollmgr.h" PollMgr *PollMgr::instance = NULL; -static std::once_flag pollmgr_is_initialized; +static once_flag pollmgr_is_initialized; static void PollMgrInit() @@ -17,7 +17,7 @@ PollMgrInit() PollMgr * PollMgr::Instance() { - std::call_once(pollmgr_is_initialized, PollMgrInit); + call_once(pollmgr_is_initialized, PollMgrInit); return instance; } @@ -27,7 +27,7 @@ PollMgr::PollMgr() : pending_change_(false) aio_ = new SelectAIO(); //aio_ = new EPollAIO(); - th_ = std::thread(&PollMgr::wait_loop, this); + th_ = thread(&PollMgr::wait_loop, this); } PollMgr::~PollMgr() [[noreturn]] @@ -84,8 +84,8 @@ void PollMgr::wait_loop() [[noreturn]] { - std::vector readable; - std::vector writable; + vector readable; + vector writable; while (1) { { @@ -206,7 +206,7 @@ SelectAIO::unwatch_fd(int fd, poll_flag flag) } void -SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +SelectAIO::wait_ready(vector *readable, vector *writable) { fd_set trfds, twfds; int high; @@ -334,7 +334,7 @@ EPollAIO::is_watched(int fd, poll_flag flag) } void -EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +EPollAIO::wait_ready(vector *readable, vector *writable) { int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); for (int i = 0; i < nfds; i++) { diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 90d9608..62003dd 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -71,6 +71,8 @@ inline void set_rand_seed() { srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); } +static sockaddr_in make_sockaddr(const string &hostandport); + rpcc::rpcc(const string & d, bool retrans) : dst_(make_sockaddr(d)), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) @@ -86,20 +88,19 @@ rpcc::rpcc(const string & d, bool retrans) : } char *loss_env = getenv("RPC_LOSSY"); - if(loss_env != NULL){ + if(loss_env) lossytest_ = atoi(loss_env); - } // xid starts with 1 and latest received reply starts with 0 xid_rep_window_.push_back(0); - IF_LEVEL(2) LOG("rpcc::rpcc cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_); + IF_LEVEL(2) LOG("cltn_nonce is " << clt_nonce_ << " lossy " << lossytest_); } // IMPORTANT: destruction should happen only when no external threads // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { - IF_LEVEL(2) LOG("rpcc::~rpcc delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); + IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); if(chan_){ chan_->closeconn(); chan_->decref(); @@ -115,7 +116,7 @@ int rpcc::bind(TO to) { bind_done_ = true; srv_nonce_ = r; } else { - IF_LEVEL(2) LOG("rpcc::bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret); + IF_LEVEL(2) LOG("bind " << inet_ntoa(dst_.sin_addr) << " failed " << ret); } return ret; }; @@ -123,11 +124,11 @@ int rpcc::bind(TO to) { // Cancel all outstanding calls void rpcc::cancel(void) { lock ml(m_); - LOG("rpcc::cancel: force callers to fail"); + LOG("force callers to fail"); for(auto &p : calls_){ caller *ca = p.second; - IF_LEVEL(2) LOG("rpcc::cancel: force caller to fail"); + IF_LEVEL(2) LOG("force caller to fail"); { lock cl(ca->m); ca->done = true; @@ -140,10 +141,10 @@ void rpcc::cancel(void) { destroy_wait_ = true; destroy_wait_c_.wait(ml); } - LOG("rpcc::cancel: done"); + LOG("done"); } -int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { +int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { caller ca(0, &rep); int xid_rep; @@ -152,7 +153,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { if((proc != rpc_const::bind && !bind_done_) || (proc == rpc_const::bind && bind_done_)){ - IF_LEVEL(1) LOG("rpcc::call1 rpcc has not been bound to dst or binding twice"); + IF_LEVEL(1) LOG("rpcc has not been bound to dst or binding twice"); return rpc_const::bind_failure; } @@ -163,7 +164,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { ca.xid = xid_++; calls_[ca.xid] = &ca; - req.pack_req_header({ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); + req.pack_header(request_header{ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front()}); xid_rep = xid_rep_window_.front(); } @@ -190,11 +191,11 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { } } if (forgot.isvalid()) - ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); - ch->send(req.cstr(), req.size()); + ch->send(forgot.buf); + ch->send(req); } else IF_LEVEL(1) LOG("not reachable"); - IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " just sent req proc " << hex << proc << + IF_LEVEL(2) LOG(clt_nonce_ << " just sent req proc " << hex << proc << " xid " << dec << ca.xid << " clt_nonce " << clt_nonce_); } transmit = false; // only send once on a given channel @@ -212,14 +213,14 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { { lock cal(ca.m); while (!ca.done){ - IF_LEVEL(2) LOG("rpcc:call1: wait"); + IF_LEVEL(2) LOG("wait"); if(ca.c.wait_until(cal, nextdeadline) == cv_status::timeout){ - IF_LEVEL(2) LOG("rpcc::call1: timeout"); + IF_LEVEL(2) LOG("timeout"); break; } } if(ca.done){ - IF_LEVEL(2) LOG("rpcc::call1: reply received"); + IF_LEVEL(2) LOG("reply received"); break; } } @@ -250,7 +251,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { { lock ml(m_); if (!dup_req_.isvalid()) { - dup_req_.buf.assign(req.cstr(), req.size()); + dup_req_.buf = req; dup_req_.xid = ca.xid; } if (xid_rep > xid_rep_done_) @@ -259,7 +260,7 @@ int rpcc::call1(proc_t proc, marshall &req, unmarshall &rep, TO to) { lock cal(ca.m); - IF_LEVEL(2) LOG("rpcc::call1 " << clt_nonce_ << " call done for req proc " << hex << proc << + IF_LEVEL(2) LOG(clt_nonce_ << " call done for req proc " << hex << proc << " xid " << dec << ca.xid << " " << inet_ntoa(dst_.sin_addr) << ":" << ntohs(dst_.sin_port) << " done? " << ca.done << " ret " << ca.intret); @@ -294,14 +295,14 @@ rpcc::get_refconn(connection **ch) // // this function keeps no reference for connection *c bool -rpcc::got_pdu(connection *, char *b, size_t sz) +rpcc::got_pdu(connection *, const string & b) { - unmarshall rep(b, sz); + unmarshall rep(b, true); reply_header h; - rep.unpack_reply_header(&h); + rep.unpack_header(h); if(!rep.ok()){ - IF_LEVEL(1) LOG("rpcc:got_pdu unmarshall header failed!!!"); + IF_LEVEL(1) LOG("unmarshall header failed!!!"); return true; } @@ -310,17 +311,17 @@ rpcc::got_pdu(connection *, char *b, size_t sz) update_xid_rep(h.xid); if(calls_.find(h.xid) == calls_.end()){ - IF_LEVEL(2) LOG("rpcc::got_pdu xid " << h.xid << " no pending request"); + IF_LEVEL(2) LOG("xid " << h.xid << " no pending request"); return true; } caller *ca = calls_[h.xid]; lock cl(ca->m); if(!ca->done){ - ca->un->take_in(rep); + *ca->rep = b; ca->intret = h.ret; if(ca->intret < 0){ - IF_LEVEL(2) LOG("rpcc::got_pdu: RPC reply error for xid " << h.xid << " intret " << ca->intret); + IF_LEVEL(2) LOG("RPC reply error for xid " << h.xid << " intret " << ca->intret); } ca->done = 1; } @@ -353,21 +354,17 @@ compress: } rpcs::rpcs(unsigned int p1, size_t count) - : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) + : port_(p1), counting_(count), curr_counts_(count), reachable_ (true) { set_rand_seed(); nonce_ = (unsigned int)random(); - IF_LEVEL(2) LOG("rpcs::rpcs created with nonce " << nonce_); - - char *loss_env = getenv("RPC_LOSSY"); - if(loss_env != NULL){ - lossytest_ = atoi(loss_env); - } + IF_LEVEL(2) LOG("created with nonce " << nonce_); reg(rpc_const::bind, &rpcs::rpcbind, this); - dispatchpool_ = new ThrPool(6,false); + dispatchpool_ = new ThrPool(6, false); - listener_ = new tcpsconn(this, port_, lossytest_); + char *loss_env = getenv("RPC_LOSSY"); + listener_ = new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0); } rpcs::~rpcs() @@ -379,14 +376,14 @@ rpcs::~rpcs() } bool -rpcs::got_pdu(connection *c, char *b, size_t sz) +rpcs::got_pdu(connection *c, const string & b) { - if(!reachable_){ - IF_LEVEL(1) LOG("rpcss::got_pdu: not reachable"); - return true; - } + if(!reachable_){ + IF_LEVEL(1) LOG("not reachable"); + return true; + } - djob_t *j = new djob_t(c, b, sz); + djob_t *j = new djob_t{c, b}; c->incref(); bool succ = dispatchpool_->addJob(bind(&rpcs::dispatch, this, j)); if(!succ || !reachable_){ @@ -434,32 +431,32 @@ void rpcs::dispatch(djob_t *j) { connection *c = j->conn; - unmarshall req(j->buf, j->sz); + unmarshall req(j->buf, true); delete j; request_header h; - req.unpack_req_header(&h); + req.unpack_header(h); proc_t proc = h.proc; if(!req.ok()){ - IF_LEVEL(1) LOG("rpcs:dispatch unmarshall header failed!!!"); + IF_LEVEL(1) LOG("unmarshall header failed!!!"); c->decref(); return; } - IF_LEVEL(2) LOG("rpcs::dispatch: rpc " << h.xid << " (proc " << hex << proc << ", last_rep " << + IF_LEVEL(2) LOG("rpc " << h.xid << " (proc " << hex << proc << ", last_rep " << dec << h.xid_rep << ") from clt " << h.clt_nonce << " for srv instance " << h.srv_nonce); marshall rep; - reply_header rh(h.xid,0); + reply_header rh{h.xid,0}; // is client sending to an old instance of server? if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ - IF_LEVEL(2) LOG("rpcs::dispatch: rpc for an old server instance " << h.srv_nonce << + IF_LEVEL(2) LOG("rpc for an old server instance " << h.srv_nonce << " (current " << nonce_ << ") proc " << hex << h.proc); rh.ret = rpc_const::oldsrv_failure; - rep.pack_reply_header(rh); - c->send(rep.cstr(),rep.size()); + rep.pack_header(rh); + c->send(rep); return; } @@ -468,7 +465,7 @@ rpcs::dispatch(djob_t *j) { lock pl(procs_m_); if(procs_.count(proc) < 1){ - cerr << "rpcs::dispatch: unknown proc " << hex << proc << "." << endl; + cerr << "unknown proc " << hex << proc << "." << endl; c->decref(); VERIFY(0); return; @@ -478,8 +475,7 @@ rpcs::dispatch(djob_t *j) } rpcs::rpcstate_t stat; - char *b1 = nullptr; - size_t sz1 = 0; + string b1; if(h.clt_nonce){ // have i seen this client before? @@ -489,7 +485,7 @@ rpcs::dispatch(djob_t *j) if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid - IF_LEVEL(2) LOG("rpcs::dispatch: new client " << h.clt_nonce << " xid " << h.xid << + IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << " chan " << c->channo() << ", total clients " << (reply_window_.size()-1)); } } @@ -507,37 +503,36 @@ rpcs::dispatch(djob_t *j) } } - stat = checkduplicate_and_update(h.clt_nonce, h.xid, - h.xid_rep, &b1, &sz1); + stat = checkduplicate_and_update(h.clt_nonce, h.xid, h.xid_rep, b1); } else { // this client does not require at most once logic stat = NEW; } - switch (stat){ + switch (stat) { case NEW: // new request - if(counting_){ + if (counting_){ updatestat(proc); } rh.ret = (*f)(req, rep); if (rh.ret == rpc_const::unmarshal_args_failure) { - cerr << "rpcs::dispatch: failed to unmarshall the arguments. You are " << + cerr << "failed to unmarshall the arguments. You are " << "probably calling RPC 0x" << hex << proc << " with the wrong " << "types of arguments." << endl; VERIFY(0); } VERIFY(rh.ret >= 0); - rep.pack_reply_header(rh); - rep.take_buf(&b1,&sz1); + rep.pack_header(rh); + b1 = rep; - IF_LEVEL(2) LOG("rpcs::dispatch: sending and saving reply of size " << sz1 << " for rpc " << + IF_LEVEL(2) LOG("sending and saving reply of size " << b1.size() << " for rpc " << h.xid << ", proc " << hex << proc << " ret " << dec << rh.ret << ", clt " << h.clt_nonce); - if(h.clt_nonce > 0){ + if (h.clt_nonce > 0) { // only record replies for clients that require at-most-once logic - add_reply(h.clt_nonce, h.xid, b1, sz1); + add_reply(h.clt_nonce, h.xid, b1); } // get the latest connection to the client @@ -550,22 +545,18 @@ rpcs::dispatch(djob_t *j) } } - c->send(b1, sz1); - if(h.clt_nonce == 0){ - // reply is not added to at-most-once window, free it - free(b1); - } + c->send(rep); break; case INPROGRESS: // server is working on this request break; case DONE: // duplicate and we still have the response - c->send(b1, sz1); + c->send(b1); break; case FORGOTTEN: // very old request and we don't have the response anymore - IF_LEVEL(2) LOG("rpcs::dispatch: very old request " << h.xid << " from " << h.clt_nonce); + IF_LEVEL(2) LOG("very old request " << h.xid << " from " << h.clt_nonce); rh.ret = rpc_const::atmostonce_failure; - rep.pack_reply_header(rh); - c->send(rep.cstr(),rep.size()); + rep.pack_header(rh); + c->send(rep); break; } c->decref(); @@ -583,11 +574,11 @@ rpcs::dispatch(djob_t *j) // returns one of: // NEW: never seen this xid before. // INPROGRESS: seen this xid, and still processing it. -// DONE: seen this xid, previous reply returned in *b and *sz. +// DONE: seen this xid, previous reply returned in b. // FORGOTTEN: might have seen this xid, but deleted previous reply. rpcs::rpcstate_t rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, - int xid_rep, char **b, size_t *sz) + int xid_rep, string & b) { lock rwl(reply_window_m_); @@ -602,10 +593,8 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, if (past_xid_rep < xid_rep || past_xid_rep == -1) { // scan for deletion candidates - for (; it != l.end() && it->xid < xid_rep; it++) { - if (it->cb_present) - free(it->buf); - } + while (it != l.end() && it->xid < xid_rep) + it++; l.erase(start, it); l.begin()->xid = xid_rep; } @@ -621,12 +610,10 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, if (it != l.end() && it->xid == xid) { if (it->cb_present) { // return information about the remembered reply - *b = it->buf; - *sz = it->sz; + b = it->buf; return DONE; - } else { - return INPROGRESS; } + return INPROGRESS; } else { // remember that a new request has arrived l.insert(it, reply_t(xid)); @@ -635,14 +622,11 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, } // rpcs::dispatch calls add_reply when it is sending a reply to an RPC, -// and passes the return value in b and sz. -// add_reply() should remember b and sz. -// free_reply_window() and checkduplicate_and_update is responsible for -// calling free(b). -void -rpcs::add_reply(unsigned int clt_nonce, int xid, - char *b, size_t sz) -{ +// and passes the return value in b. +// add_reply() should remember b. +// free_reply_window() and checkduplicate_and_update are responsible for +// cleaning up the remembered values. +void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) { lock rwl(reply_window_m_); // remember the RPC reply value list &l = reply_window_[clt_nonce]; @@ -652,38 +636,26 @@ rpcs::add_reply(unsigned int clt_nonce, int xid, // there should already be an entry, so whine if there isn't if (it == l.end() || it->xid != xid) { cerr << "Could not find reply struct in add_reply" << endl; - l.insert(it, reply_t(xid, b, sz)); + l.insert(it, reply_t(xid, b)); } else { - *it = reply_t(xid, b, sz); + *it = reply_t(xid, b); } } void rpcs::free_reply_window(void) { lock rwl(reply_window_m_); - for (auto clt : reply_window_) { - for (auto it : clt.second){ - if (it.cb_present) - free(it.buf); - } - clt.second.clear(); - } reply_window_.clear(); } int rpcs::rpcbind(unsigned int &r, int) { - IF_LEVEL(2) LOG("rpcs::rpcbind called return nonce " << nonce_); + IF_LEVEL(2) LOG("called return nonce " << nonce_); r = nonce_; return 0; } -bool operator<(const sockaddr_in &a, const sockaddr_in &b){ - return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || - ((a.sin_addr.s_addr == b.sin_addr.s_addr) && - ((a.sin_port < b.sin_port)))); -} +static sockaddr_in make_sockaddr(const string &host, const string &port); -/*---------------auxilary function--------------*/ -sockaddr_in make_sockaddr(const string &hostandport) { +static sockaddr_in make_sockaddr(const string &hostandport) { auto colon = hostandport.find(':'); if (colon == string::npos) return make_sockaddr("127.0.0.1", hostandport); @@ -691,7 +663,7 @@ sockaddr_in make_sockaddr(const string &hostandport) { return make_sockaddr(hostandport.substr(0, colon), hostandport.substr(colon+1)); } -sockaddr_in make_sockaddr(const string &host, const string &port) { +static sockaddr_in make_sockaddr(const string &host, const string &port) { sockaddr_in dst; bzero(&dst, sizeof(dst)); dst.sin_family = AF_INET; diff --git a/rpc/rpc.h b/rpc/rpc.h index f01af09..5dabe4b 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -30,10 +30,10 @@ class rpcc : public chanmgr { //manages per rpc info struct caller { - caller(int _xid, unmarshall *_un) : xid(_xid), un(_un) {} + caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} int xid; - unmarshall *un; + string *rep; int intret; bool done = false; mutex m; @@ -95,10 +95,9 @@ class rpcc : public chanmgr { int islossy() { return lossytest_ > 0; } - int call1(proc_t proc, - marshall &req, unmarshall &rep, TO to); + int call1(proc_t proc, marshall &req, string &rep, TO to); - bool got_pdu(connection *c, char *b, size_t sz); + bool got_pdu(connection *c, const string & b); template int call_m(proc_t proc, marshall &req, R & r, TO to); @@ -113,8 +112,9 @@ class rpcc : public chanmgr { template int rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) { - unmarshall u; - int intret = call1(proc, req, u, to); + string rep; + int intret = call1(proc, req, rep, to); + unmarshall u(rep, true); if (intret < 0) return intret; u >> r; if (u.okdone() != true) { @@ -139,8 +139,6 @@ rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) return call_m(proc, m, r, to); } -bool operator<(const sockaddr_in &a, const sockaddr_in &b); - // rpc server endpoint. class rpcs : public chanmgr { @@ -158,22 +156,11 @@ class rpcs : public chanmgr { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (int _xid) { - xid = _xid; - cb_present = false; - buf = NULL; - sz = 0; - } - reply_t (int _xid, char *_buf, size_t _sz) { - xid = _xid; - cb_present = true; - buf = _buf; - sz = _sz; - } + reply_t (int _xid) : xid(_xid), cb_present(false) {} + reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} int xid; bool cb_present; // whether the reply buffer is valid - char *buf; // the reply buffer - size_t sz; // the size of reply buffer + string buf; // the reply buffer }; unsigned int port_; @@ -185,11 +172,10 @@ class rpcs : public chanmgr { map > reply_window_; void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, char *b, size_t sz); + void add_reply(unsigned int clt_nonce, int xid, const string & b); rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, - char **b, size_t *sz); + int xid, int rep_xid, string & b); void updatestat(proc_t proc); @@ -201,7 +187,6 @@ class rpcs : public chanmgr { size_t curr_counts_; map counts_; - int lossytest_; bool reachable_; // map proc # to function @@ -216,10 +201,8 @@ class rpcs : public chanmgr { protected: struct djob_t { - djob_t (connection *c, char *b, size_t bsz):buf(b),sz(bsz),conn(c) {} - char *buf; - size_t sz; connection *conn; + string buf; }; void dispatch(djob_t *); @@ -227,7 +210,7 @@ class rpcs : public chanmgr { void reg1(proc_t proc, handler *); ThrPool* dispatchpool_; - tcpsconn* listener_; + tcpsconn *listener_; public: rpcs(unsigned int port, size_t counts=0); @@ -238,7 +221,7 @@ class rpcs : public chanmgr { void set_reachable(bool r) { reachable_ = r; } - bool got_pdu(connection *c, char *b, size_t sz); + bool got_pdu(connection *c, const string & b); template void reg(proc_t proc, F f, C *c=nullptr); }; @@ -253,7 +236,4 @@ template void rpcs::reg(proc_t proc, F f, C *c) { reg1(proc, marshalled_func::wrap(f, c)); } -sockaddr_in make_sockaddr(const string &hostandport); -sockaddr_in make_sockaddr(const string &host, const string &port); - #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 7217b25..b90d19a 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -80,8 +80,8 @@ testmarshall() { marshall m; request_header rh{1,2,3,4,5}; - m.pack_req_header(rh); - VERIFY(m.size()==RPC_HEADER_SZ); + m.pack_header(rh); + VERIFY(((string)m).size()==RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; string s = "hallo...."; @@ -89,14 +89,12 @@ testmarshall() m << l; m << s; - char *b; - size_t sz; - m.take_buf(&b,&sz); - VERIFY(sz == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + string b = m; + VERIFY(b.size() == RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); - unmarshall un(b,sz); + unmarshall un(b, true); request_header rh1; - un.unpack_req_header(&rh1); + un.unpack_header(rh1); VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); int i1; unsigned long long l1; @@ -131,11 +129,11 @@ client1(size_t cl) int arg = (random() % 1000); int rep; - auto start = std::chrono::steady_clock::now(); + auto start = steady_clock::now(); int ret = clients[which_cl]->call(which ? 23 : 24, rep, arg); - auto end = std::chrono::steady_clock::now(); - auto diff = std::chrono::duration_cast(end - start).count(); + auto end = steady_clock::now(); + auto diff = duration_cast(end - start).count(); if (ret != 0) cout << diff << " ms have elapsed!!!" << endl; VERIFY(ret == 0); @@ -401,7 +399,7 @@ main(int argc, char *argv[]) if (isclient) { // server's address. - dst = "127.0.0.1:" + std::to_string(port); + dst = "127.0.0.1:" + to_string(port); // start the client. bind it to the server. diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 8b9691b..7d3cf7d 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -3,16 +3,14 @@ // if blocking, then addJob() blocks when queue is full // otherwise, addJob() simply returns false when queue is full ThrPool::ThrPool(size_t sz, bool blocking) -: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) -{ +: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { for (size_t i=0; i job_t; +typedef function job_t; class ThrPool { public: @@ -18,7 +18,7 @@ class ThrPool { bool blockadd_; fifo jobq_; - std::vector th_; + vector th_; void do_worker(); }; diff --git a/rsm.cc b/rsm.cc index f12f9db..843418a 100644 --- a/rsm.cc +++ b/rsm.cc @@ -86,7 +86,7 @@ #include "rsm.h" #include "rsm_client.h" -rsm::rsm(std::string _first, std::string _me) : +rsm::rsm(string _first, string _me) : stf(0), primary(_first), insync (false), inviewchange (true), vid_commit(0), partitioned (false), dopartition(false), break1(false), break2(false) { @@ -111,13 +111,13 @@ rsm::rsm(std::string _first, std::string _me) : rsmrpc->reg(rsm_protocol::joinreq, &rsm::joinreq, this); // tester must be on different port, otherwise it may partition itself - testsvr = new rpcs((uint32_t)std::stoi(_me) + 1); + testsvr = new rpcs((uint32_t)stoi(_me) + 1); testsvr->reg(rsm_test_protocol::net_repair, &rsm::test_net_repairreq, this); testsvr->reg(rsm_test_protocol::breakpoint, &rsm::breakpointreq, this); { lock ml(rsm_mutex); - std::thread(&rsm::recovery, this).detach(); + thread(&rsm::recovery, this).detach(); } } @@ -140,7 +140,7 @@ void rsm::recovery() [[noreturn]] { commit_change(cfg->view_id(), ml); } else { ml.unlock(); - std::this_thread::sleep_for(std::chrono::seconds(30)); // XXX make another node in cfg primary? + this_thread::sleep_for(seconds(30)); // XXX make another node in cfg primary? ml.lock(); } } @@ -195,7 +195,7 @@ bool rsm::sync_with_backups(lock & rsm_mutex_lock) { bool rsm::sync_with_primary(lock & rsm_mutex_lock) { // Remember the primary of vid_insync - std::string m = primary; + string m = primary; while (vid_insync == vid_commit) { if (statetransfer(m, rsm_mutex_lock)) break; @@ -208,7 +208,7 @@ bool rsm::sync_with_primary(lock & rsm_mutex_lock) { * Call to transfer state from m to the local node. * Assumes that rsm_mutex is already held. */ -bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) +bool rsm::statetransfer(string m, lock & rsm_mutex_lock) { rsm_protocol::transferres r; handle h(m); @@ -225,7 +225,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) rsm_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::statetransfer: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); + LOG("rsm::statetransfer: couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } if (stf && last_myvs != r.last) { @@ -236,7 +236,7 @@ bool rsm::statetransfer(std::string m, lock & rsm_mutex_lock) return true; } -bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { +bool rsm::statetransferdone(string m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); handle h(m); rpcc *cl = h.safebind(); @@ -251,7 +251,7 @@ bool rsm::statetransferdone(std::string m, lock & rsm_mutex_lock) { } -bool rsm::join(std::string m, lock & rsm_mutex_lock) { +bool rsm::join(string m, lock & rsm_mutex_lock) { handle h(m); int ret = 0; string log; @@ -269,7 +269,7 @@ bool rsm::join(std::string m, lock & rsm_mutex_lock) { } if (cl == 0 || ret != rsm_protocol::OK) { - LOG("rsm::join: couldn't reach " << m << " " << std::hex << cl << " " << std::dec << ret); + LOG("rsm::join: couldn't reach " << m << " " << hex << cl << " " << dec << ret); return false; } LOG("rsm::join: succeeded " << log); @@ -303,18 +303,18 @@ void rsm::commit_change(unsigned vid, lock &) { } -void rsm::execute(int procno, std::string req, std::string &r) { +void rsm::execute(int procno, string req, string &r) { LOG("execute"); handler *h = procs[procno]; VERIFY(h); - unmarshall args(req); + unmarshall args(req, false); marshall rep; - std::string reps; + string reps; auto ret = (rsm_protocol::status)(*h)(args, rep); marshall rep1; rep1 << ret; - rep1 << rep.str(); - r = rep1.str(); + rep1 << rep.content(); + r = rep1.content(); } // @@ -323,11 +323,11 @@ void rsm::execute(int procno, std::string req, std::string &r) { // number, and invokes it on all members of the replicated state // machine. // -rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std::string req) { - LOG("rsm::client_invoke: procno 0x" << std::hex << procno); +rsm_client_protocol::status rsm::client_invoke(string &r, int procno, string req) { + LOG("rsm::client_invoke: procno 0x" << hex << procno); lock ml(invoke_mutex); - std::vector m; - std::string myaddr; + vector m; + string myaddr; viewstamp vs; { lock ml2(rsm_mutex); @@ -377,11 +377,11 @@ rsm_client_protocol::status rsm::client_invoke(std::string &r, int procno, std:: // the replica must execute requests in order (with no gaps) // according to requests' seqno -rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) { - LOG("rsm::invoke: procno 0x" << std::hex << proc); +rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, string req) { + LOG("rsm::invoke: procno 0x" << hex << proc); lock ml(invoke_mutex); - std::vector m; - std::string myaddr; + vector m; + string myaddr; { lock ml2(rsm_mutex); // check if !inviewchange @@ -402,7 +402,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) return rsm_protocol::ERR; myvs++; } - std::string r; + string r; execute(proc, req, r); last_myvs = vs; breakpoint1(); @@ -412,7 +412,7 @@ rsm_protocol::status rsm::invoke(int &, int proc, viewstamp vs, std::string req) /** * RPC handler: Send back the local node's state to the caller */ -rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string src, +rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, string src, viewstamp last, unsigned vid) { lock ml(rsm_mutex); LOG("transferreq from " << src << " (" << last.vid << "," << last.seqno << ") vs (" << @@ -429,7 +429,7 @@ rsm_protocol::status rsm::transferreq(rsm_protocol::transferres &r, std::string * RPC handler: Inform the local node (the primary) that node m has synchronized * for view vid */ -rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { +rsm_protocol::status rsm::transferdonereq(int &, string m, unsigned vid) { lock ml(rsm_mutex); if (!insync || vid != vid_insync) return rsm_protocol::BUSY; @@ -442,7 +442,7 @@ rsm_protocol::status rsm::transferdonereq(int &, std::string m, unsigned vid) { // a node that wants to join an RSM as a server sends a // joinreq to the RSM's current primary; this is the // handler for that RPC. -rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { +rsm_protocol::status rsm::joinreq(string & log, string m, viewstamp last) { auto ret = rsm_protocol::OK; lock ml(rsm_mutex); @@ -481,8 +481,8 @@ rsm_protocol::status rsm::joinreq(string & log, std::string m, viewstamp last) { * so the client can switch to a different primary * when it existing primary fails */ -rsm_client_protocol::status rsm::client_members(std::vector &r, int) { - std::vector m; +rsm_client_protocol::status rsm::client_members(vector &r, int) { + vector m; lock ml(rsm_mutex); cfg->get_view(vid_commit, m); m.push_back(primary); @@ -495,7 +495,7 @@ rsm_client_protocol::status rsm::client_members(std::vector &r, int // otherwise, the lowest number node of the previous view. // caller should hold rsm_mutex void rsm::set_primary(unsigned vid) { - std::vector c, p; + vector c, p; cfg->get_view(vid, c); cfg->get_view(vid - 1, p); VERIFY (c.size() > 0); @@ -528,7 +528,7 @@ bool rsm::amiprimary() { // assumes caller holds rsm_mutex void rsm::net_repair(bool heal, lock &) { - std::vector m; + vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { if (m[i] != cfg->myaddr()) { diff --git a/rsm_client.h b/rsm_client.h index 4a80f60..0b1dc88 100644 --- a/rsm_client.h +++ b/rsm_client.h @@ -14,14 +14,14 @@ class rsm_client { protected: - std::string primary; - std::vector known_mems; - std::mutex rsm_client_mutex; + string primary; + vector known_mems; + mutex rsm_client_mutex; void primary_failure(lock & rsm_client_mutex_lock); bool init_members(lock & rsm_client_mutex_lock); public: - rsm_client(std::string dst); - rsm_protocol::status invoke(unsigned int proc, std::string &rep, const std::string &req); + rsm_client(string dst); + rsm_protocol::status invoke(unsigned int proc, string &rep, const string &req); template int call(unsigned int proc, R & r, const Args & ...a1); @@ -31,11 +31,11 @@ class rsm_client { template int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { - std::string rep; - std::string res; - int intret = invoke(proc, rep, req.cstr()); + string rep; + string res; + int intret = invoke(proc, rep, req); VERIFY( intret == rsm_client_protocol::OK ); - unmarshall u(rep); + unmarshall u(rep, false); u >> intret; if (intret < 0) return intret; u >> res; @@ -47,7 +47,7 @@ int rsm_client::call_m(unsigned int proc, R & r, const marshall & req) { VERIFY(0); return rpc_const::unmarshal_reply_failure; } - unmarshall u1(res); + unmarshall u1(res, false); u1 >> r; if(!u1.okdone()) { cerr << "rsm_client::call_m: failed to unmarshall the reply." << endl; diff --git a/rsmtest_client.cc b/rsmtest_client.cc index 3965fa9..f4238db 100644 --- a/rsmtest_client.cc +++ b/rsmtest_client.cc @@ -3,7 +3,7 @@ #include "rsmtest_client.h" #include -rsmtest_client::rsmtest_client(std::string dst) : cl(dst) { +rsmtest_client::rsmtest_client(string dst) : cl(dst) { if (cl.bind() < 0) cout << "rsmtest_client: call bind" << endl; } diff --git a/rsmtest_client.h b/rsmtest_client.h index ad3d4c1..e71fede 100644 --- a/rsmtest_client.h +++ b/rsmtest_client.h @@ -11,7 +11,7 @@ class rsmtest_client { protected: rpcc cl; public: - rsmtest_client(std::string d); + rsmtest_client(string d); virtual ~rsmtest_client() {} virtual rsm_test_protocol::status net_repair(int heal); virtual rsm_test_protocol::status breakpoint(int b); diff --git a/threaded_log.h b/threaded_log.h index 706e2b7..750c5d2 100644 --- a/threaded_log.h +++ b/threaded_log.h @@ -11,8 +11,7 @@ extern int next_instance_num; extern char log_thread_prefix; namespace std { - // This is an awful hack. But sticking this in std:: makes it possible for - // ostream_iterator to use it. + // Sticking this in std:: makes it possible for ostream_iterator to use it. template ostream & operator<<(ostream &o, const pair &d) { return o << "<" << d.first << "," << d.second << ">"; @@ -26,7 +25,7 @@ operator<<(ostream &o, const A &a) { } #define LOG_PREFIX { \ - auto _thread_ = std::this_thread::get_id(); \ + auto _thread_ = this_thread::get_id(); \ int _tid_ = thread_name_map[_thread_]; \ if (_tid_==0) \ _tid_ = thread_name_map[_thread_] = ++next_thread_num; \ @@ -39,7 +38,7 @@ operator<<(ostream &o, const A &a) { int _self_ = instance_name_map[this]; \ if (_self_==0) \ _self_ = instance_name_map[this] = ++next_instance_num; \ - cerr << "#" << setw(2) << _self_; \ + cerr << "#" << setw(2) << " " << _self_; \ } #define LOG_NONMEMBER(_x_) { \ diff --git a/types.h b/types.h index e6b5895..e78186c 100644 --- a/types.h +++ b/types.h @@ -50,6 +50,7 @@ using std::ostream; using std::istream; using std::ostream_iterator; using std::istream_iterator; +using std::ios; #include using std::numeric_limits; @@ -77,6 +78,11 @@ using std::stoi; #include using std::thread; +using std::call_once; +using std::once_flag; +namespace this_thread { + using namespace std::this_thread; +} #include using std::tuple; @@ -92,10 +98,12 @@ using std::is_member_function_pointer; using std::is_same; using std::underlying_type; using std::enable_if; +using std::remove_reference; #include using std::pair; using std::declval; +using std::forward; #include using std::vector; @@ -145,4 +153,21 @@ LEXICOGRAPHIC_OPERATOR(_c_, <) LEXICOGRAPHIC_OPERATOR(_c_, <=) \ LEXICOGRAPHIC_OPERATOR(_c_, >) LEXICOGRAPHIC_OPERATOR(_c_, >=) \ LEXICOGRAPHIC_OPERATOR(_c_, ==) LEXICOGRAPHIC_OPERATOR(_c_, !=) +// The following implementation of tuple_indices is redistributed under the MIT +// License as an insubstantial portion of the LLVM compiler infrastructure. + +template struct tuple_indices {}; +template struct make_indices_imp; +template struct make_indices_imp, E> { + typedef typename make_indices_imp, E>::type type; +}; +template struct make_indices_imp, E> { + typedef tuple_indices type; +}; +template struct make_tuple_indices { + typedef typename make_indices_imp, E>::type type; +}; + +#include "endian.h" + #endif -- 1.7.9.5