From 4e881433f37417ccbda89c09ffdf936855d462d4 Mon Sep 17 00:00:00 2001 From: Peter Iannucci Date: Fri, 1 Nov 2013 13:33:54 -0400 Subject: [PATCH] Working on g++ compatibility --- Makefile.osx | 20 +++--- config.cc | 2 +- config.h | 2 +- lock_client.cc | 2 +- lock_client.h | 2 +- lock_server.cc | 4 +- lock_server.h | 6 +- lock_tester.cc | 14 ++--- log.h | 2 +- paxos.cc | 2 +- paxos.h | 2 +- paxos_protocol.h | 4 +- rpc/connection.cc | 119 ++++++++++++++++-------------------- rpc/connection.h | 34 +++++------ rpc/file.h | 4 +- rpc/marshall.h | 6 +- rpc/marshall_wrap.h | 2 +- rpc/poll_mgr.cc | 2 +- rpc/rpc.cc | 170 ++++++++++++++++++++++----------------------------- rpc/rpc.h | 54 ++++++++-------- rpc/rpc_protocol.h | 20 +++--- rpc/rpctest.cc | 22 ++++--- rsm.cc | 28 ++++----- rsm.h | 2 +- rsm_protocol.h | 4 +- types.h | 31 ++++++---- 26 files changed, 263 insertions(+), 297 deletions(-) diff --git a/Makefile.osx b/Makefile.osx index f69851c..80d85a3 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -1,13 +1,15 @@ PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \ - -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \ - -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \ - -Wno-exit-time-destructors -#OPTFLAGS = -ftrapv -O4 -OPTFLAGS = -CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS) -LDFLAGS = -stdlib=libc++ $(OPTFLAGS) -CXX = clang++ -CC = clang++ + -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \ + -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \ + -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++ +OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins +STDLIB = -stdlib=libc++ +#STDLIB = +CXX = clang++-mp-3.4 +#CXX = g++-mp-4.8 +CXXFLAGS = -std=c++11 -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS) +LDFLAGS = -std=c++11 $(STDLIB) $(OPTFLAGS) +CC := $(CXX) EXTRA_TARGETS = signatures socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw diff --git a/config.cc b/config.cc index 38c4c05..2b40078 100644 --- a/config.cc +++ b/config.cc @@ -145,7 +145,7 @@ bool config::remove(const string &m, lock &cfg_mutex_lock) { return r; } -void config::heartbeater() [[noreturn]] { +void config::heartbeater() { lock cfg_mutex_lock(cfg_mutex); while (1) { diff --git a/config.h b/config.h index 73940a0..7124a6e 100644 --- a/config.h +++ b/config.h @@ -41,7 +41,7 @@ class config : public paxos_change { void restore(const string &s); bool add(const string &, unsigned view_id); bool ismember(const string &m, unsigned view_id); - void heartbeater(void); + void heartbeater NORETURN (); void paxos_commit(unsigned instance, const string &v); rpcs *get_rpcs() { return paxos.get_rpcs(); } void breakpoint(int b) { paxos.breakpoint(b); } diff --git a/lock_client.cc b/lock_client.cc index 8864ce9..3c5aa89 100644 --- a/lock_client.cc +++ b/lock_client.cc @@ -46,7 +46,7 @@ lock_client::lock_client(string xdst, lock_release_user *_lu) : lu(_lu), next_xi rlsrpc->start(); } -void lock_client::releaser() [[noreturn]] { +void lock_client::releaser() { while (1) { lock_protocol::lockid_t lid; release_fifo.deq(&lid); diff --git a/lock_client.h b/lock_client.h index 728fbf7..73dfffe 100644 --- a/lock_client.h +++ b/lock_client.h @@ -63,7 +63,7 @@ class lock_client { lock_protocol::status acquire(lock_protocol::lockid_t); lock_protocol::status release(lock_protocol::lockid_t); int stat(lock_protocol::lockid_t); - void releaser(); + void releaser NORETURN (); rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t); }; diff --git a/lock_server.cc b/lock_server.cc index 522a917..0c3a6e9 100644 --- a/lock_server.cc +++ b/lock_server.cc @@ -34,7 +34,7 @@ lock_server::lock_server(rsm *r) : rsm_ (r) { rsm_->set_state_transfer(this); } -void lock_server::revoker() [[noreturn]] { +void lock_server::revoker () { while (1) { lock_protocol::lockid_t lid; revoke_fifo.deq(&lid); @@ -62,7 +62,7 @@ void lock_server::revoker() [[noreturn]] { } } -void lock_server::retryer() [[noreturn]] { +void lock_server::retryer() { while (1) { lock_protocol::lockid_t lid; retry_fifo.deq(&lid); diff --git a/lock_server.h b/lock_server.h index 6ba4902..d3ec580 100644 --- a/lock_server.h +++ b/lock_server.h @@ -22,7 +22,7 @@ public: MEMBERS(held, held_by, wanted_by) }; -MARSHALLABLE(lock_state) +MARSHALLABLE_STRUCT(lock_state) typedef map lock_map; @@ -37,8 +37,8 @@ class lock_server : public rsm_state_transfer { rsm *rsm_; public: lock_server(rsm *r = 0); - void revoker(); - void retryer(); + void revoker NORETURN (); + void retryer NORETURN (); string marshal_state(); void unmarshal_state(const string & state); lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t); diff --git a/lock_tester.cc b/lock_tester.cc index 0204a71..f535d8f 100644 --- a/lock_tester.cc +++ b/lock_tester.cc @@ -10,17 +10,17 @@ char log_thread_prefix = 'c'; // must be >= 2 const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc. -string dst; -lock_client **lc = new lock_client * [nt]; -lock_protocol::lockid_t a = "1"; -lock_protocol::lockid_t b = "2"; -lock_protocol::lockid_t c = "3"; +static string dst; +static lock_client **lc = new lock_client * [nt]; +static lock_protocol::lockid_t a = "1"; +static lock_protocol::lockid_t b = "2"; +static lock_protocol::lockid_t c = "3"; // check_grant() and check_release() check that the lock server // doesn't grant the same lock to both clients. // it assumes that lock names are distinct in the first byte. -int ct[256]; -mutex count_mutex; +static int ct[256]; +static mutex count_mutex; void check_grant(lock_protocol::lockid_t lid) { lock ml(count_mutex); diff --git a/log.h b/log.h index e8acd4a..201cb80 100644 --- a/log.h +++ b/log.h @@ -24,4 +24,4 @@ class log { void logaccept(prop_t n_a, string v); }; -#endif /* log_h */ +#endif diff --git a/paxos.cc b/paxos.cc index ab60302..a88a7a5 100644 --- a/paxos.cc +++ b/paxos.cc @@ -7,7 +7,7 @@ bool isamember(const node_t & m, const nodes_t & nodes) { // check if l2 contains a majority of the elements of l1 bool majority(const nodes_t &l1, const nodes_t &l2) { - auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2)); + auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2)); return overlap >= (l1.size() >> 1) + 1; } diff --git a/paxos.h b/paxos.h index 8c9fc8f..db18e6c 100644 --- a/paxos.h +++ b/paxos.h @@ -45,7 +45,7 @@ class proposer_acceptor { map values; // vals of each instance friend class log; - log l = {this, me}; + class log l = {this, me}; void commit(unsigned instance, const value_t & v); void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock); diff --git a/paxos_protocol.h b/paxos_protocol.h index 1f5fd3e..c61e2eb 100644 --- a/paxos_protocol.h +++ b/paxos_protocol.h @@ -12,7 +12,7 @@ struct prop_t { LEXICOGRAPHIC_COMPARISON(prop_t) }; -MARSHALLABLE(prop_t) +MARSHALLABLE_STRUCT(prop_t) namespace paxos_protocol { enum status : rpc_protocol::status { OK, ERR }; @@ -35,6 +35,6 @@ namespace paxos_protocol { REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned)); }; -MARSHALLABLE(paxos_protocol::prepareres) +MARSHALLABLE_STRUCT(paxos_protocol::prepareres) #endif diff --git a/rpc/connection.cc b/rpc/connection.cc index 358a2af..b29e136 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,25 +6,32 @@ #include #include "marshall.h" -connection::connection(connection_delegate *m1, socket_t && f1, int l1) -: mgr_(m1), fd_(move(f1)), lossy_(l1) +connection::connection(connection_delegate * delegate, socket_t && f1, int l1) +: fd(move(f1)), delegate_(delegate), lossy_(l1) { - fd_.flags() |= O_NONBLOCK; + fd.flags() |= O_NONBLOCK; signal(SIGPIPE, SIG_IGN); - create_time_ = steady_clock::now(); - - poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this); + poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this); } connection::~connection() { - closeconn(); + { + lock ml(m_); + if (dead_) + return; + dead_ = true; + shutdown(fd,SHUT_RDWR); + } + // after block_remove_fd, select will never wait on fd and no callbacks + // will be active + poll_mgr::shared_mgr.block_remove_fd(fd); VERIFY(dead_); VERIFY(!wpdu_.buf.size()); } -shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) { +shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) { socket_t s = socket(AF_INET, SOCK_STREAM, 0); s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { @@ -33,20 +40,7 @@ shared_ptr connection::to_dst(const sockaddr_in &dst, connection_del return nullptr; } IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - return make_shared(mgr, move(s), lossy); -} - -void connection::closeconn() { - { - lock ml(m_); - if (dead_) - return; - dead_ = true; - shutdown(fd_,SHUT_RDWR); - } - //after block_remove_fd, select will never wait on fd_ - //and no callbacks will be active - poll_mgr::shared_mgr.block_remove_fd(fd_); + return make_shared(delegate, move(s), lossy); } bool connection::send(const string & b) { @@ -65,19 +59,19 @@ bool connection::send(const string & b) { if (lossy_) { if ((random()%100) < lossy_) { - IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_); - shutdown(fd_,SHUT_RDWR); + IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd); + shutdown(fd,SHUT_RDWR); } } if (!writepdu()) { dead_ = true; ml.unlock(); - poll_mgr::shared_mgr.block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd); ml.lock(); } else if (wpdu_.solong != wpdu_.buf.size()) { // should be rare to need to explicitly add write callback - poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this); + poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this); while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) send_complete_.wait(ml); } @@ -89,17 +83,17 @@ bool connection::send(const string & b) { return ret; } -//fd_ is ready to be written +// fd is ready to be written void connection::write_cb(int s) { lock ml(m_); VERIFY(!dead_); - VERIFY(fd_ == s); + VERIFY(fd == s); if (wpdu_.buf.size() == 0) { - poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY); + poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY); return; } if (!writepdu()) { - poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); @@ -110,28 +104,26 @@ void connection::write_cb(int s) { send_complete_.notify_one(); } -// fd_ is ready to be read +// fd is ready to be read void connection::read_cb(int s) { lock ml(m_); - VERIFY(fd_ == s); + VERIFY(fd == s); if (dead_) return; IF_LEVEL(5) LOG("got data on fd " << s); - bool succ = true; - if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) - succ = readpdu(); - - if (!succ) { - IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); - poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR); - dead_ = true; - send_complete_.notify_one(); + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { + if (!readpdu()) { + IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); + poll_mgr::shared_mgr.del_callback(fd, CB_RDWR); + dead_ = true; + send_complete_.notify_one(); + } } if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { - if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) { + if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) { // connection_delegate has successfully consumed the pdu rpdu_.buf.clear(); rpdu_.solong = 0; @@ -144,10 +136,10 @@ bool connection::writepdu() { if (wpdu_.solong == wpdu_.buf.size()) return true; - ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); + ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong)); if (n < 0) { if (errno != EAGAIN) { - IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno); + IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno); wpdu_.solong = size_t_max; wpdu_.buf.clear(); } @@ -161,7 +153,7 @@ bool connection::readpdu() { IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); if (!rpdu_.buf.size()) { rpc_protocol::rpc_sz_t sz1; - ssize_t n = fd_.read(sz1); + ssize_t n = fd.read(sz1); if (n == 0) return false; @@ -185,12 +177,11 @@ bool connection::readpdu() { IF_LEVEL(5) LOG("read size of datagram = " << sz); - VERIFY(rpdu_.buf.size() == 0); - rpdu_.buf = string(sz+sizeof(sz1), 0); + rpdu_.buf.assign(sz+sizeof(sz1), 0); rpdu_.solong = sizeof(sz1); } - ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); IF_LEVEL(5) LOG("read " << n << " bytes"); @@ -199,27 +190,25 @@ bool connection::readpdu() { return true; rpdu_.buf.clear(); rpdu_.solong = 0; - return (errno == EAGAIN); + return false; } rpdu_.solong += (size_t)n; return true; } -tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) -: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest) +connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest) +: tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest) { - sockaddr_in sin{}; // zero initialize - sin.sin_family = AF_INET; - sin.sin_port = hton(port); - tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}); tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}); - // careful to exactly match type signature of bind arguments so we don't - // get std::bind instead - if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { + sockaddr_in sin{}; // zero initialize + sin.sin_family = AF_INET; + sin.sin_port = hton(port); + + if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) { perror("accept_loop bind"); VERIFY(0); } @@ -238,25 +227,21 @@ tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this); } -tcpsconn::~tcpsconn() -{ +connection_listener::~connection_listener() { poll_mgr::shared_mgr.block_remove_fd(tcp_); - - for (auto & i : conns_) - i.second->closeconn(); } -void tcpsconn::read_cb(int) { +void connection_listener::read_cb(int) { sockaddr_in sin; socklen_t slen = sizeof(sin); int s1 = accept(tcp_, (sockaddr *)&sin, &slen); if (s1 < 0) { - perror("tcpsconn::accept_conn error"); + perror("connection_listener::accept_conn error"); throw thread_exit_exception(); } IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port)); - auto ch = make_shared(mgr_, s1, lossy_); + auto ch = make_shared(delegate_, s1, lossy_); // garbage collect dead connections for (auto i = conns_.begin(); i != conns_.end();) { @@ -266,5 +251,5 @@ void tcpsconn::read_cb(int) { ++i; } - conns_[ch->channo()] = ch; + conns_[s1] = ch; } diff --git a/rpc/connection.h b/rpc/connection.h index 87d17e4..8f7d494 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -21,24 +21,18 @@ class connection_delegate { class connection : private aio_callback, public enable_shared_from_this { public: - struct charbuf { - string buf; - size_t solong = 0; // number of bytes written or read so far - }; - - connection(connection_delegate *m1, socket_t && f1, int lossytest=0); + connection(connection_delegate * delegate, socket_t && f1, int lossytest=0); ~connection(); - int channo() { return fd_; } - bool isdead() { lock ml(m_); return dead_; } - void closeconn(); + bool isdead() { return dead_; } bool send(const string & b); - time_point create_time() const { return create_time_; } - static shared_ptr to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); + const time_point create_time = steady_clock::now(); + const file_t fd; + private: void write_cb(int s); void read_cb(int s); @@ -46,15 +40,17 @@ class connection : private aio_callback, public enable_shared_from_this create_time_; - int waiters_ = 0; int lossy_ = 0; @@ -63,10 +59,10 @@ class connection : private aio_callback, public enable_shared_from_this> conns_; }; diff --git a/rpc/file.h b/rpc/file.h index 75c0d6e..393b98a 100644 --- a/rpc/file.h +++ b/rpc/file.h @@ -22,7 +22,7 @@ class file_t { public: inline file_t(int fd=-1) : fd_(fd) {} inline file_t(const file_t &) = delete; - inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); } + inline file_t(file_t && other) : fd_(-1) { swap(fd_, other.fd_); } inline ~file_t() { if (fd_ != -1) ::close(fd_); } static inline void pipe(file_t *ends) { int fds[2]; @@ -31,7 +31,7 @@ class file_t { ends[1].fd_ = fds[1]; } inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; } - inline flags_t flags() const { return *this; } + inline flags_t flags() const { return {*this}; } inline void close() { ::close(fd_); fd_ = -1; diff --git a/rpc/marshall.h b/rpc/marshall.h index 6412612..6e0c94a 100644 --- a/rpc/marshall.h +++ b/rpc/marshall.h @@ -149,13 +149,13 @@ operator>>(unmarshall & u, tuple && t) { // // Implements struct marshalling via tuple marshalling of members. -#define MARSHALLABLE(_c_) \ +#define MARSHALLABLE_STRUCT(_c_) \ inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \ inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); } // our first two marshallable structs... -MARSHALLABLE(rpc_protocol::request_header) -MARSHALLABLE(rpc_protocol::reply_header) +MARSHALLABLE_STRUCT(rpc_protocol::request_header) +MARSHALLABLE_STRUCT(rpc_protocol::reply_header) // // Marshalling for STL containers diff --git a/rpc/marshall_wrap.h b/rpc/marshall_wrap.h index 8d5b15a..8e10a75 100644 --- a/rpc/marshall_wrap.h +++ b/rpc/marshall_wrap.h @@ -85,7 +85,7 @@ struct marshalled_func_imp { return new handler([=](unmarshall &u, marshall &m) -> RV { // Unmarshall each argument with the correct type and store the // result in a tuple. - ArgsStorage t = {u._grab::type>()...}; + ArgsStorage t{u._grab::type>()...}; // Verify successful unmarshalling of the entire input stream. if (!u.okdone()) return (RV)ErrorHandler::unmarshall_args_failure(); diff --git a/rpc/poll_mgr.cc b/rpc/poll_mgr.cc index 94b24e3..dc42274 100644 --- a/rpc/poll_mgr.cc +++ b/rpc/poll_mgr.cc @@ -14,7 +14,7 @@ class wait_manager { virtual void watch_fd(int fd, poll_flag flag) = 0; virtual bool unwatch_fd(int fd, poll_flag flag) = 0; virtual void wait_ready(vector & readable, vector & writable) = 0; - virtual ~wait_manager() throw() {} + virtual ~wait_manager() noexcept {} }; class SelectAIO : public wait_manager { diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 964102f..0c3a97d 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -1,56 +1,56 @@ -/* - The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC - server. The jobs of rpcc include maintaining a connection to server, sending - RPC requests and waiting for responses, retransmissions, at-most-once delivery - etc. - - The rpcs class handles the server side of RPC. Each rpcs handles multiple - connections from different rpcc objects. The jobs of rpcs include accepting - connections, dispatching requests to registered RPC handlers, at-most-once - delivery etc. - - Both rpcc and rpcs use the connection class as an abstraction for the - underlying communication channel. To send an RPC request/reply, one calls - connection::send() which blocks until data is sent or the connection has - failed (thus the caller can free the buffer when send() returns). When a - request/reply is received, connection makes a callback into the corresponding - rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). - - Thread organization: - rpcc uses application threads to send RPC requests and blocks to receive the - reply or error. All connections use a single PollMgr object to perform async - socket IO. PollMgr creates a single thread to examine the readiness of socket - file descriptors and informs the corresponding connection whenever a socket is - ready to be read or written. (We use asynchronous socket IO to reduce the - number of threads needed to manage these connections; without async IO, at - least one thread is needed per connection to read data without blocking other - activities.) Each rpcs object creates one thread for listening on the server - port and a pool of threads for executing RPC requests. The thread pool allows - us to control the number of threads spawned at the server (spawning one thread - per request will hurt when the server faces thousands of requests). - - In order to delete a connection object, we must maintain a reference count. - For rpcc, multiple client threads might be invoking the rpcc::call() functions - and thus holding multiple references to the underlying connection object. For - rpcs, multiple dispatch threads might be holding references to the same - connection object. A connection object is deleted only when the underlying - connection is dead and the reference count reaches zero. - - This version of the RPC library explicitly joins exited threads to make sure - no outstanding references exist before deleting objects. - - To delete a rpcc object safely, the users of the library must ensure that - there are no outstanding calls on the rpcc object. - - To delete a rpcs object safely, we do the following in sequence: 1. stop - accepting new incoming connections. 2. close existing active connections. 3. - delete the dispatch thread pool which involves waiting for current active RPC - handlers to finish. It is interesting how a thread pool can be deleted - without using thread cancellation. The trick is to inject x "poison pills" for - a thread pool of x threads. Upon getting a poison pill instead of a normal - task, a worker thread will exit (and thread pool destructor waits to join all - x exited worker threads). - */ +// +// The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC +// server. The jobs of rpcc include maintaining a connection to server, sending +// RPC requests and waiting for responses, retransmissions, at-most-once delivery +// etc. +// +// The rpcs class handles the server side of RPC. Each rpcs handles multiple +// connections from different rpcc objects. The jobs of rpcs include accepting +// connections, dispatching requests to registered RPC handlers, at-most-once +// delivery etc. +// +// Both rpcc and rpcs use the connection class as an abstraction for the +// underlying communication channel. To send an RPC request/reply, one calls +// connection::send() which blocks until data is sent or the connection has +// failed (thus the caller can free the buffer when send() returns). When a +// request/reply is received, connection makes a callback into the corresponding +// rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). +// +// Thread organization: +// rpcc uses application threads to send RPC requests and blocks to receive the +// reply or error. All connections use a single PollMgr object to perform async +// socket IO. PollMgr creates a single thread to examine the readiness of socket +// file descriptors and informs the corresponding connection whenever a socket is +// ready to be read or written. (We use asynchronous socket IO to reduce the +// number of threads needed to manage these connections; without async IO, at +// least one thread is needed per connection to read data without blocking other +// activities.) Each rpcs object creates one thread for listening on the server +// port and a pool of threads for executing RPC requests. The thread pool allows +// us to control the number of threads spawned at the server (spawning one thread +// per request will hurt when the server faces thousands of requests). +// +// In order to delete a connection object, we must maintain a reference count. +// For rpcc, multiple client threads might be invoking the rpcc::call() functions +// and thus holding multiple references to the underlying connection object. For +// rpcs, multiple dispatch threads might be holding references to the same +// connection object. A connection object is deleted only when the underlying +// connection is dead and the reference count reaches zero. +// +// This version of the RPC library explicitly joins exited threads to make sure +// no outstanding references exist before deleting objects. +// +// To delete a rpcc object safely, the users of the library must ensure that +// there are no outstanding calls on the rpcc object. +// +// To delete a rpcs object safely, we do the following in sequence: 1. stop +// accepting new incoming connections. 2. close existing active connections. 3. +// delete the dispatch thread pool which involves waiting for current active RPC +// handlers to finish. It is interesting how a thread pool can be deleted +// without using thread cancellation. The trick is to inject x "poison pills" for +// a thread pool of x threads. Upon getting a poison pill instead of a normal +// task, a worker thread will exit (and thread pool destructor waits to join all +// x exited worker threads). +// #include "rpc.h" @@ -58,6 +58,7 @@ #include #include #include +#include inline void set_rand_seed() { auto now = time_point_cast(steady_clock::now()); @@ -72,7 +73,7 @@ rpcc::rpcc(const string & d, bool retrans) : { if (retrans) { set_rand_seed(); - clt_nonce_ = (unsigned int)random(); + clt_nonce_ = (nonce_t)random(); } else { // special client nonce 0 means this client does not // require at-most-once logic from the server @@ -94,15 +95,14 @@ rpcc::rpcc(const string & d, bool retrans) : // are blocked inside rpcc or will use rpcc in the future rpcc::~rpcc() { cancel(); - IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1)); - if (chan_) - chan_->closeconn(); + IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1)); + chan_.reset(); VERIFY(calls_.size() == 0); } int rpcc::bind(milliseconds to) { - unsigned int r; - int ret = call_timeout(rpc_protocol::bind, to, r, 0); + nonce_t r; + int ret = call_timeout(rpc_protocol::bind, to, r); if (ret == 0) { lock ml(m_); bind_done_ = true; @@ -140,7 +140,7 @@ void rpcc::cancel(void) { int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) { caller ca(0, &rep); - int xid_rep; + xid_t xid_rep; { lock ml(m_); @@ -325,11 +325,11 @@ compress: } } -rpcs::rpcs(in_port_t p1, size_t count) - : port_(p1), counting_(count), curr_counts_(count), reachable_ (true) +rpcs::rpcs(in_port_t p1) + : port_(p1), reachable_ (true) { set_rand_seed(); - nonce_ = (unsigned int)random(); + nonce_ = (nonce_t)random(); IF_LEVEL(2) LOG("created with nonce " << nonce_); reg(rpc_protocol::bind, &rpcs::rpcbind, this); @@ -338,7 +338,7 @@ rpcs::rpcs(in_port_t p1, size_t count) void rpcs::start() { char *loss_env = getenv("RPC_LOSSY"); - listener_ = unique_ptr(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0)); + listener_.reset(new connection_listener(this, port_, loss_env ? atoi(loss_env) : 0)); } rpcs::~rpcs() { @@ -354,7 +354,7 @@ bool rpcs::got_pdu(const shared_ptr & c, const string & b) { return true; } - return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b)); + return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b)); } void rpcs::reg1(proc_id_t proc, handler *h) { @@ -364,29 +364,6 @@ void rpcs::reg1(proc_id_t proc, handler *h) { VERIFY(procs_.count(proc) >= 1); } -void rpcs::updatestat(proc_id_t proc) { - lock cl(count_m_); - counts_[proc]++; - curr_counts_--; - if (curr_counts_ == 0) { - LOG("RPC STATS: "); - for (auto i = counts_.begin(); i != counts_.end(); i++) - LOG(hex << i->first << ":" << dec << i->second); - - lock rwl(reply_window_m_); - - size_t totalrep = 0, maxrep = 0; - for (auto clt : reply_window_) { - totalrep += clt.second.size(); - if (clt.second.size() > maxrep) - maxrep = clt.second.size(); - } - IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " << - totalrep << " max per client " << maxrep); - curr_counts_ = counting_; - } -} - void rpcs::dispatch(shared_ptr c, const string & buf) { unmarshall req(buf, true); @@ -440,7 +417,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { VERIFY (reply_window_[h.clt_nonce].size() == 0); // create reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid << - " chan " << c->channo() << ", total clients " << (reply_window_.size()-1)); + " chan " << c->fd << ", total clients " << (reply_window_.size()-1)); } } @@ -449,7 +426,7 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { lock rwl(conns_m_); if (conns_.find(h.clt_nonce) == conns_.end()) conns_[h.clt_nonce] = c; - else if (conns_[h.clt_nonce]->create_time() < c->create_time()) + else if (conns_[h.clt_nonce]->create_time < c->create_time) conns_[h.clt_nonce] = c; } @@ -461,9 +438,6 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { switch (stat) { case NEW: // new request - if (counting_) - updatestat(proc); - rh.ret = (*f)(req, rep); if (rh.ret == rpc_protocol::unmarshal_args_failure) { LOG("failed to unmarshall the arguments. You are " << @@ -522,8 +496,8 @@ void rpcs::dispatch(shared_ptr c, const string & buf) { // DONE: seen this xid, previous reply returned in b. // FORGOTTEN: might have seen this xid, but deleted previous reply. rpcs::rpcstate_t -rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, - int xid_rep, string & b) +rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, + xid_t xid_rep, string & b) { lock rwl(reply_window_m_); @@ -532,7 +506,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, VERIFY(l.size() > 0); VERIFY(xid >= xid_rep); - int past_xid_rep = l.begin()->xid; + xid_t past_xid_rep = l.begin()->xid; list::iterator start = l.begin(), it = ++start; @@ -571,7 +545,7 @@ rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid, // add_reply() should remember b. // free_reply_window() and checkduplicate_and_update are responsible for // cleaning up the remembered values. -void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) { +void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) { lock rwl(reply_window_m_); // remember the RPC reply value list &l = reply_window_[clt_nonce]; @@ -592,7 +566,7 @@ void rpcs::free_reply_window(void) { reply_window_.clear(); } -int rpcs::rpcbind(unsigned int &r, int) { +int rpcs::rpcbind(nonce_t &r) { IF_LEVEL(2) LOG("called return nonce " << nonce_); r = nonce_; return 0; diff --git a/rpc/rpc.h b/rpc/rpc.h index 7b65101..3ae7737 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -37,10 +37,12 @@ class rpcc : private connection_delegate { using proc_id_t = rpc_protocol::proc_id_t; template using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; // manages per rpc info struct caller { - caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {} + caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {} int xid; string *rep; @@ -51,14 +53,14 @@ class rpcc : private connection_delegate { }; void get_refconn(shared_ptr & ch); - void update_xid_rep(int xid); + void update_xid_rep(xid_t xid); sockaddr_in dst_; - unsigned int clt_nonce_; - unsigned int srv_nonce_; + nonce_t clt_nonce_; + nonce_t srv_nonce_; bool bind_done_; - int xid_; + xid_t xid_; int lossytest_; bool retrans_; bool reachable_; @@ -72,13 +74,13 @@ class rpcc : private connection_delegate { cond destroy_wait_c_; map calls_; - list xid_rep_window_; + list xid_rep_window_; struct request { void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } string buf; - int xid = -1; + xid_t xid = -1; }; request dup_req_; int xid_rep_done_; @@ -108,7 +110,7 @@ class rpcc : private connection_delegate { rpcc(const string & d, bool retrans=true); ~rpcc(); - unsigned int id() { return clt_nonce_; } + nonce_t id() { return clt_nonce_; } int bind(milliseconds to = rpc::to_max); @@ -135,6 +137,8 @@ class rpcs : private connection_delegate { using proc_id_t = rpc_protocol::proc_id_t; template using proc_t = rpc_protocol::proc_t; + using nonce_t = rpc_protocol::nonce_t; + using xid_t = rpc_protocol::xid_t; typedef enum { NEW, // new RPC, not a duplicate @@ -148,36 +152,29 @@ class rpcs : private connection_delegate { // has been sent; in that case buf points to a copy of the reply, // and sz holds the size of the reply. struct reply_t { - reply_t (int _xid) : xid(_xid), cb_present(false) {} - reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} - int xid; + reply_t (xid_t _xid) : xid(_xid), cb_present(false) {} + reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {} + xid_t xid; bool cb_present; // whether the reply buffer is valid string buf; // the reply buffer }; in_port_t port_; - unsigned int nonce_; + nonce_t nonce_; // provide at most once semantics by maintaining a window of replies // per client that that client hasn't acknowledged receiving yet. // indexed by client nonce. - map> reply_window_; + map> reply_window_; void free_reply_window(void); - void add_reply(unsigned int clt_nonce, int xid, const string & b); - - rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, - int xid, int rep_xid, string & b); + void add_reply(nonce_t clt_nonce, xid_t xid, const string & b); - void updatestat(proc_id_t proc); + rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid, + xid_t rep_xid, string & b); // latest connection to the client - map> conns_; - - // counting - const size_t counting_; - size_t curr_counts_; - map counts_; + map> conns_; bool reachable_; @@ -185,7 +182,6 @@ class rpcs : private connection_delegate { map procs_; mutex procs_m_; // protect insert/delete to procs[] - mutex count_m_; // protect modification of counts mutex reply_window_m_; // protect reply window et al mutex conns_m_; // protect conns_ @@ -195,21 +191,21 @@ class rpcs : private connection_delegate { void reg1(proc_id_t proc, handler *); unique_ptr dispatchpool_; - unique_ptr listener_; + unique_ptr listener_; // RPC handler for clients binding - rpc_protocol::status rpcbind(unsigned int &r, int a); + rpc_protocol::status rpcbind(nonce_t &r); bool got_pdu(const shared_ptr & c, const string & b); public: - rpcs(in_port_t port, size_t counts=0); + rpcs(in_port_t port); ~rpcs(); void set_reachable(bool r) { reachable_ = r; } - template void reg(proc_t

proc, F f, C *c=nullptr) { + template inline void reg(proc_t

proc, F f, C *c=nullptr) { static_assert(is_valid_registration::value, "RPC handler registered with incorrect argument types"); struct ReturnOnFailure { static inline int unmarshall_args_failure() { diff --git a/rpc/rpc_protocol.h b/rpc/rpc_protocol.h index 881de9b..4a3ff32 100644 --- a/rpc/rpc_protocol.h +++ b/rpc/rpc_protocol.h @@ -8,6 +8,8 @@ namespace rpc_protocol { using status = int32_t; using rpc_sz_t = uint32_t; + using nonce_t = uint32_t; + using xid_t = int32_t; enum : status { timeout_failure = -1, @@ -20,17 +22,17 @@ namespace rpc_protocol { }; struct request_header { - int xid; + xid_t xid; proc_id_t proc; - unsigned int clt_nonce; - unsigned int srv_nonce; - int xid_rep; + nonce_t clt_nonce; + nonce_t srv_nonce; + xid_t xid_rep; MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep) }; struct reply_header { - int xid; + xid_t xid; int ret; MEMBERS(xid, ret) @@ -44,13 +46,13 @@ namespace rpc_protocol { const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t); const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation - const size_t MAX_PDU = 10<<20; //maximum PDF is 10M + const size_t MAX_PDU = 10<<20; // maximum PDF is 10M -#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ }; -#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t _name_{base + _offset_}; +#define REMOTE_PROCEDURE_BASE(_base_) static constexpr rpc_protocol::proc_id_t base = _base_; +#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr rpc_protocol::proc_t _name_{base + _offset_}; REMOTE_PROCEDURE_BASE(0); - REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind + REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind }; ENDIAN_SWAPPABLE(rpc_protocol::request_header) diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 1963ada..4dd2af2 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -6,15 +6,16 @@ #include #include #include +#include #define NUM_CL 2 char log_thread_prefix = 'r'; -rpcs *server; // server rpc object -rpcc *clients[NUM_CL]; // client rpc object -string dst; //server's ip address -in_port_t port; +static rpcs *server; // server rpc object +static rpcc *clients[NUM_CL]; // client rpc object +static string dst; //server's ip address +static in_port_t port; // server-side handlers. they must be methods of some class // to simplify rpcs::reg(). a server process can have handlers @@ -60,11 +61,11 @@ int srv::handle_slow(int &r, const int a) { } int srv::handle_bigrep(string &r, const size_t len) { - r = string((size_t)len, 'x'); + r = string(len, 'x'); return 0; } -srv service; +static srv service; void startserver() { server = new rpcs(port); @@ -82,13 +83,15 @@ void testmarshall() { VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ); int i = 12345; unsigned long long l = 1223344455L; + size_t sz = 101010101; string s = "hallo...."; m << i; m << l; m << s; + m << sz; string b = m; - VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)); + VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)+sizeof(uint32_t)); unmarshall un(b, true); rpc_protocol::request_header rh1; @@ -97,11 +100,13 @@ void testmarshall() { int i1; unsigned long long l1; string s1; + size_t sz1; un >> i1; un >> l1; un >> s1; + un >> sz1; VERIFY(un.okdone()); - VERIFY(i1==i && l1==l && s1==s); + VERIFY(i1==i && l1==l && s1==s && sz1==sz); } void client1(size_t cl) { @@ -199,6 +204,7 @@ void simple_tests(rpcc *c) { // huge RPC string big(1000000, 'x'); intret = c->call(srv_protocol::_22, rep, big, (string)"z"); + VERIFY(intret == 0); VERIFY(rep.size() == 1000001); cout << " -- huge 1M rpc request .. ok" << endl; diff --git a/rsm.cc b/rsm.cc index 7e90b03..956f45d 100644 --- a/rsm.cc +++ b/rsm.cc @@ -121,7 +121,7 @@ void rsm::reg1(rpc_protocol::proc_id_t proc, handler *h) { } // The recovery thread runs this function -void rsm::recovery() [[noreturn]] { +void rsm::recovery() { bool r = true; lock ml(rsm_mutex); @@ -356,6 +356,9 @@ rsm_client_protocol::status rsm::client_invoke(string & r, rpc_protocol::proc_id } } execute(procno, req, r); + for (size_t i=0; i &r, int) { vector m; @@ -512,12 +514,9 @@ bool rsm::amiprimary() { } -// Testing server - -// Simulate partitions +// Test RPCs -- simulate partitions and failures -// assumes caller holds rsm_mutex -void rsm::net_repair(bool heal, lock &) { +void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) { vector m; cfg->get_view(vid_commit, m); for (unsigned i = 0; i < m.size(); i++) { @@ -534,15 +533,12 @@ rsm_test_protocol::status rsm::test_net_repairreq(rsm_test_protocol::status &r, lock ml(rsm_mutex); LOG("heal " << heal << " (dopartition " << dopartition << ", partitioned " << partitioned << ")"); - if (heal) { + if (heal) net_repair(heal, ml); - partitioned = false; - } else { + else dopartition = true; - partitioned = false; - } - r = rsm_test_protocol::OK; - return r; + partitioned = false; + return r = rsm_test_protocol::OK; } // simulate failure at breakpoint 1 and 2 diff --git a/rsm.h b/rsm.h index b402bab..14dc011 100644 --- a/rsm.h +++ b/rsm.h @@ -72,7 +72,7 @@ class rsm : public config_view_change { bool amiprimary(); void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; } - void recovery(); + void recovery NORETURN (); void commit_change(unsigned vid); template void reg(rpc_protocol::proc_t

proc, F f, C *c=nullptr) { diff --git a/rsm_protocol.h b/rsm_protocol.h index d64c0af..9cd60bd 100644 --- a/rsm_protocol.h +++ b/rsm_protocol.h @@ -20,7 +20,7 @@ struct viewstamp { LEXICOGRAPHIC_COMPARISON(viewstamp) }; -MARSHALLABLE(viewstamp) +MARSHALLABLE_STRUCT(viewstamp) namespace rsm_protocol { enum status : rpc_protocol::status { OK, ERR, BUSY}; @@ -39,7 +39,7 @@ namespace rsm_protocol { REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp)); }; -MARSHALLABLE(rsm_protocol::transferres) +MARSHALLABLE_STRUCT(rsm_protocol::transferres) namespace rsm_test_protocol { enum status : rpc_protocol::status {OK, ERR}; diff --git a/types.h b/types.h index 6e6f0f6..ede859f 100644 --- a/types.h +++ b/types.h @@ -5,41 +5,40 @@ #include using std::copy; -using std::move; +using std::count_if; +using std::find; using std::max; using std::min; using std::min_element; -using std::find; -using std::count_if; +using std::move; +using std::swap; #include using cond = std::condition_variable; using std::cv_status; #include -using std::chrono::seconds; -using std::chrono::milliseconds; +using std::chrono::duration_cast; using std::chrono::microseconds; +using std::chrono::milliseconds; using std::chrono::nanoseconds; +using std::chrono::seconds; using std::chrono::steady_clock; using std::chrono::system_clock; -using std::chrono::duration_cast; -using std::chrono::time_point_cast; using std::chrono::time_point; +using std::chrono::time_point_cast; #include using std::exception; #include -using std::ofstream; using std::ifstream; +using std::ofstream; -#ifndef LIBT4_NO_FUNCTIONAL #include +// std::bind conflicts with BIND(2) using std::function; -using std::bind; using std::placeholders::_1; -#endif #include #include @@ -217,4 +216,14 @@ struct pass { template inline pass(Args&&...) {} }; #include "endian.h" +#ifndef __has_attribute +#define __has_attribute(x) 0 +#endif + +#if __has_attribute(noreturn) +#define NORETURN [[noreturn]] +#else +#define NORETURN +#endif + #endif -- 1.7.9.5