PEDANTRY = -Weverything -pedantic-errors -Werror -Wno-c++98-compat \
- -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
- -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
- -Wno-exit-time-destructors
-#OPTFLAGS = -ftrapv -O4
-OPTFLAGS =
-CXXFLAGS = -ggdb -MMD -I. -std=c++11 -stdlib=libc++ $(PEDANTRY) $(OPTFLAGS)
-LDFLAGS = -stdlib=libc++ $(OPTFLAGS)
-CXX = clang++
-CC = clang++
+ -Wno-c++98-compat-pedantic -Wno-padded -Wno-missing-prototypes \
+ -Wmissing-declarations -Wno-weak-vtables -Wno-global-constructors \
+ -Wno-exit-time-destructors -pedantic -Wall -Wextra -Weffc++
+OPTFLAGS = -O0 -fno-omit-frame-pointer #-fsanitize=address ,thread,undefined -fsanitize-memory-track-origins
+STDLIB = -stdlib=libc++
+#STDLIB =
+CXX = clang++-mp-3.4
+#CXX = g++-mp-4.8
+CXXFLAGS = -std=c++11 -ggdb3 -MMD -I. $(STDLIB) $(PEDANTRY) $(OPTFLAGS)
+LDFLAGS = -std=c++11 $(STDLIB) $(OPTFLAGS)
+CC := $(CXX)
EXTRA_TARGETS = signatures
socketfilterfw=/usr/libexec/ApplicationFirewall/socketfilterfw
return r;
}
-void config::heartbeater() [[noreturn]] {
+void config::heartbeater() {
lock cfg_mutex_lock(cfg_mutex);
while (1) {
void restore(const string &s);
bool add(const string &, unsigned view_id);
bool ismember(const string &m, unsigned view_id);
- void heartbeater(void);
+ void heartbeater NORETURN ();
void paxos_commit(unsigned instance, const string &v);
rpcs *get_rpcs() { return paxos.get_rpcs(); }
void breakpoint(int b) { paxos.breakpoint(b); }
rlsrpc->start();
}
-void lock_client::releaser() [[noreturn]] {
+void lock_client::releaser() {
while (1) {
lock_protocol::lockid_t lid;
release_fifo.deq(&lid);
lock_protocol::status acquire(lock_protocol::lockid_t);
lock_protocol::status release(lock_protocol::lockid_t);
int stat(lock_protocol::lockid_t);
- void releaser();
+ void releaser NORETURN ();
rlock_protocol::status revoke_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
rlock_protocol::status retry_handler(int &, lock_protocol::lockid_t, lock_protocol::xid_t);
};
rsm_->set_state_transfer(this);
}
-void lock_server::revoker() [[noreturn]] {
+void lock_server::revoker () {
while (1) {
lock_protocol::lockid_t lid;
revoke_fifo.deq(&lid);
}
}
-void lock_server::retryer() [[noreturn]] {
+void lock_server::retryer() {
while (1) {
lock_protocol::lockid_t lid;
retry_fifo.deq(&lid);
MEMBERS(held, held_by, wanted_by)
};
-MARSHALLABLE(lock_state)
+MARSHALLABLE_STRUCT(lock_state)
typedef map<lock_protocol::lockid_t, lock_state> lock_map;
rsm *rsm_;
public:
lock_server(rsm *r = 0);
- void revoker();
- void retryer();
+ void revoker NORETURN ();
+ void retryer NORETURN ();
string marshal_state();
void unmarshal_state(const string & state);
lock_protocol::status acquire(int &, lock_protocol::lockid_t, const callback_t & id, lock_protocol::xid_t);
// must be >= 2
const int nt = 6; //XXX: lab1's rpc handlers are blocking. Since rpcs uses a thread pool of 10 threads, we cannot test more than 10 blocking rpc.
-string dst;
-lock_client **lc = new lock_client * [nt];
-lock_protocol::lockid_t a = "1";
-lock_protocol::lockid_t b = "2";
-lock_protocol::lockid_t c = "3";
+static string dst;
+static lock_client **lc = new lock_client * [nt];
+static lock_protocol::lockid_t a = "1";
+static lock_protocol::lockid_t b = "2";
+static lock_protocol::lockid_t c = "3";
// check_grant() and check_release() check that the lock server
// doesn't grant the same lock to both clients.
// it assumes that lock names are distinct in the first byte.
-int ct[256];
-mutex count_mutex;
+static int ct[256];
+static mutex count_mutex;
void check_grant(lock_protocol::lockid_t lid) {
lock ml(count_mutex);
void logaccept(prop_t n_a, string v);
};
-#endif /* log_h */
+#endif
// check if l2 contains a majority of the elements of l1
bool majority(const nodes_t &l1, const nodes_t &l2) {
- auto overlap = (size_t)count_if(l1.begin(), l1.end(), bind(isamember, _1, l2));
+ auto overlap = (size_t)count_if(l1.begin(), l1.end(), std::bind(isamember, _1, l2));
return overlap >= (l1.size() >> 1) + 1;
}
map<unsigned,value_t> values; // vals of each instance
friend class log;
- log l = {this, me};
+ class log l = {this, me};
void commit(unsigned instance, const value_t & v);
void commit(unsigned instance, const value_t & v, lock & pxs_mutex_lock);
LEXICOGRAPHIC_COMPARISON(prop_t)
};
-MARSHALLABLE(prop_t)
+MARSHALLABLE_STRUCT(prop_t)
namespace paxos_protocol {
enum status : rpc_protocol::status { OK, ERR };
REMOTE_PROCEDURE(4, heartbeat, (int &, string, unsigned));
};
-MARSHALLABLE(paxos_protocol::prepareres)
+MARSHALLABLE_STRUCT(paxos_protocol::prepareres)
#endif
#include <unistd.h>
#include "marshall.h"
-connection::connection(connection_delegate *m1, socket_t && f1, int l1)
-: mgr_(m1), fd_(move(f1)), lossy_(l1)
+connection::connection(connection_delegate * delegate, socket_t && f1, int l1)
+: fd(move(f1)), delegate_(delegate), lossy_(l1)
{
- fd_.flags() |= O_NONBLOCK;
+ fd.flags() |= O_NONBLOCK;
signal(SIGPIPE, SIG_IGN);
- create_time_ = steady_clock::now();
-
- poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
+ poll_mgr::shared_mgr.add_callback(fd, CB_RDONLY, this);
}
connection::~connection() {
- closeconn();
+ {
+ lock ml(m_);
+ if (dead_)
+ return;
+ dead_ = true;
+ shutdown(fd,SHUT_RDWR);
+ }
+ // after block_remove_fd, select will never wait on fd and no callbacks
+ // will be active
+ poll_mgr::shared_mgr.block_remove_fd(fd);
VERIFY(dead_);
VERIFY(!wpdu_.buf.size());
}
-shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
+shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate * delegate, int lossy) {
socket_t s = socket(AF_INET, SOCK_STREAM, 0);
s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
return nullptr;
}
IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
- return make_shared<connection>(mgr, move(s), lossy);
-}
-
-void connection::closeconn() {
- {
- lock ml(m_);
- if (dead_)
- return;
- dead_ = true;
- shutdown(fd_,SHUT_RDWR);
- }
- //after block_remove_fd, select will never wait on fd_
- //and no callbacks will be active
- poll_mgr::shared_mgr.block_remove_fd(fd_);
+ return make_shared<connection>(delegate, move(s), lossy);
}
bool connection::send(const string & b) {
if (lossy_) {
if ((random()%100) < lossy_) {
- IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd_ " << fd_);
- shutdown(fd_,SHUT_RDWR);
+ IF_LEVEL(1) LOG("send LOSSY TEST shutdown fd " << fd);
+ shutdown(fd,SHUT_RDWR);
}
}
if (!writepdu()) {
dead_ = true;
ml.unlock();
- poll_mgr::shared_mgr.block_remove_fd(fd_);
+ poll_mgr::shared_mgr.block_remove_fd(fd);
ml.lock();
} else if (wpdu_.solong != wpdu_.buf.size()) {
// should be rare to need to explicitly add write callback
- poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
+ poll_mgr::shared_mgr.add_callback(fd, CB_WRONLY, this);
while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
send_complete_.wait(ml);
}
return ret;
}
-//fd_ is ready to be written
+// fd is ready to be written
void connection::write_cb(int s) {
lock ml(m_);
VERIFY(!dead_);
- VERIFY(fd_ == s);
+ VERIFY(fd == s);
if (wpdu_.buf.size() == 0) {
- poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
+ poll_mgr::shared_mgr.del_callback(fd, CB_WRONLY);
return;
}
if (!writepdu()) {
- poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
+ poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
dead_ = true;
} else {
VERIFY(wpdu_.solong != size_t_max);
send_complete_.notify_one();
}
-// fd_ is ready to be read
+// fd is ready to be read
void connection::read_cb(int s) {
lock ml(m_);
- VERIFY(fd_ == s);
+ VERIFY(fd == s);
if (dead_)
return;
IF_LEVEL(5) LOG("got data on fd " << s);
- bool succ = true;
- if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
- succ = readpdu();
-
- if (!succ) {
- IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
- poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
- dead_ = true;
- send_complete_.notify_one();
+ if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
+ if (!readpdu()) {
+ IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
+ poll_mgr::shared_mgr.del_callback(fd, CB_RDWR);
+ dead_ = true;
+ send_complete_.notify_one();
+ }
}
if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
- if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
+ if (delegate_->got_pdu(shared_from_this(), rpdu_.buf)) {
// connection_delegate has successfully consumed the pdu
rpdu_.buf.clear();
rpdu_.solong = 0;
if (wpdu_.solong == wpdu_.buf.size())
return true;
- ssize_t n = write(fd_, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
+ ssize_t n = write(fd, &wpdu_.buf[wpdu_.solong], (wpdu_.buf.size()-wpdu_.solong));
if (n < 0) {
if (errno != EAGAIN) {
- IF_LEVEL(1) LOG("writepdu fd_ " << fd_ << " failure errno=" << errno);
+ IF_LEVEL(1) LOG("writepdu fd " << fd << " failure errno=" << errno);
wpdu_.solong = size_t_max;
wpdu_.buf.clear();
}
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
rpc_protocol::rpc_sz_t sz1;
- ssize_t n = fd_.read(sz1);
+ ssize_t n = fd.read(sz1);
if (n == 0)
return false;
IF_LEVEL(5) LOG("read size of datagram = " << sz);
- VERIFY(rpdu_.buf.size() == 0);
- rpdu_.buf = string(sz+sizeof(sz1), 0);
+ rpdu_.buf.assign(sz+sizeof(sz1), 0);
rpdu_.solong = sizeof(sz1);
}
- ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+ ssize_t n = fd.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
IF_LEVEL(5) LOG("read " << n << " bytes");
return true;
rpdu_.buf.clear();
rpdu_.solong = 0;
- return (errno == EAGAIN);
+ return false;
}
rpdu_.solong += (size_t)n;
return true;
}
-tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
-: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
+connection_listener::connection_listener(connection_delegate * delegate, in_port_t port, int lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), delegate_(delegate), lossy_(lossytest)
{
- sockaddr_in sin{}; // zero initialize
- sin.sin_family = AF_INET;
- sin.sin_port = hton(port);
-
tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000});
tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000});
- // careful to exactly match type signature of bind arguments so we don't
- // get std::bind instead
- if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+ sockaddr_in sin{}; // zero initialize
+ sin.sin_family = AF_INET;
+ sin.sin_port = hton(port);
+
+ if (bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0) {
perror("accept_loop bind");
VERIFY(0);
}
poll_mgr::shared_mgr.add_callback(tcp_, CB_RDONLY, this);
}
-tcpsconn::~tcpsconn()
-{
+connection_listener::~connection_listener() {
poll_mgr::shared_mgr.block_remove_fd(tcp_);
-
- for (auto & i : conns_)
- i.second->closeconn();
}
-void tcpsconn::read_cb(int) {
+void connection_listener::read_cb(int) {
sockaddr_in sin;
socklen_t slen = sizeof(sin);
int s1 = accept(tcp_, (sockaddr *)&sin, &slen);
if (s1 < 0) {
- perror("tcpsconn::accept_conn error");
+ perror("connection_listener::accept_conn error");
throw thread_exit_exception();
}
IF_LEVEL(2) LOG("accept_loop got connection fd=" << s1 << " " << inet_ntoa(sin.sin_addr) << ":" << ntoh(sin.sin_port));
- auto ch = make_shared<connection>(mgr_, s1, lossy_);
+ auto ch = make_shared<connection>(delegate_, s1, lossy_);
// garbage collect dead connections
for (auto i = conns_.begin(); i != conns_.end();) {
++i;
}
- conns_[ch->channo()] = ch;
+ conns_[s1] = ch;
}
class connection : private aio_callback, public enable_shared_from_this<connection> {
public:
- struct charbuf {
- string buf;
- size_t solong = 0; // number of bytes written or read so far
- };
-
- connection(connection_delegate *m1, socket_t && f1, int lossytest=0);
+ connection(connection_delegate * delegate, socket_t && f1, int lossytest=0);
~connection();
- int channo() { return fd_; }
- bool isdead() { lock ml(m_); return dead_; }
- void closeconn();
+ bool isdead() { return dead_; }
bool send(const string & b);
- time_point<steady_clock> create_time() const { return create_time_; }
-
static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
+ const time_point<steady_clock> create_time = steady_clock::now();
+ const file_t fd;
+
private:
void write_cb(int s);
void read_cb(int s);
bool readpdu();
bool writepdu();
- connection_delegate *mgr_;
- const file_t fd_;
+ connection_delegate * delegate_;
bool dead_ = false;
+ struct charbuf {
+ string buf;
+ size_t solong = 0; // number of bytes written or read so far
+ };
+
charbuf wpdu_;
charbuf rpdu_;
- time_point<steady_clock> create_time_;
-
int waiters_ = 0;
int lossy_ = 0;
cond send_wait_;
};
-class tcpsconn : private aio_callback {
+class connection_listener : private aio_callback {
public:
- tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
- ~tcpsconn();
+ connection_listener(connection_delegate * delegate, in_port_t port, int lossytest=0);
+ ~connection_listener();
inline in_port_t port() { return port_; }
private:
void write_cb(int) {}
mutex m_;
socket_t tcp_; // listens for connections
- connection_delegate *mgr_;
+ connection_delegate * delegate_;
int lossy_;
map<int, shared_ptr<connection>> conns_;
};
public:
inline file_t(int fd=-1) : fd_(fd) {}
inline file_t(const file_t &) = delete;
- inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
+ inline file_t(file_t && other) : fd_(-1) { swap(fd_, other.fd_); }
inline ~file_t() { if (fd_ != -1) ::close(fd_); }
static inline void pipe(file_t *ends) {
int fds[2];
ends[1].fd_ = fds[1];
}
inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
- inline flags_t flags() const { return *this; }
+ inline flags_t flags() const { return {*this}; }
inline void close() {
::close(fd_);
fd_ = -1;
//
// Implements struct marshalling via tuple marshalling of members.
-#define MARSHALLABLE(_c_) \
+#define MARSHALLABLE_STRUCT(_c_) \
inline unmarshall & operator>>(unmarshall &u, _c_ &a) { return u >> a._tuple_(); } \
inline marshall & operator<<(marshall &m, const _c_ a) { return m << a._tuple_(); }
// our first two marshallable structs...
-MARSHALLABLE(rpc_protocol::request_header)
-MARSHALLABLE(rpc_protocol::reply_header)
+MARSHALLABLE_STRUCT(rpc_protocol::request_header)
+MARSHALLABLE_STRUCT(rpc_protocol::reply_header)
//
// Marshalling for STL containers
return new handler([=](unmarshall &u, marshall &m) -> RV {
// Unmarshall each argument with the correct type and store the
// result in a tuple.
- ArgsStorage t = {u._grab<typename decay<Args>::type>()...};
+ ArgsStorage t{u._grab<typename decay<Args>::type>()...};
// Verify successful unmarshalling of the entire input stream.
if (!u.okdone())
return (RV)ErrorHandler::unmarshall_args_failure();
virtual void watch_fd(int fd, poll_flag flag) = 0;
virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
- virtual ~wait_manager() throw() {}
+ virtual ~wait_manager() noexcept {}
};
class SelectAIO : public wait_manager {
-/*
- The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
- server. The jobs of rpcc include maintaining a connection to server, sending
- RPC requests and waiting for responses, retransmissions, at-most-once delivery
- etc.
-
- The rpcs class handles the server side of RPC. Each rpcs handles multiple
- connections from different rpcc objects. The jobs of rpcs include accepting
- connections, dispatching requests to registered RPC handlers, at-most-once
- delivery etc.
-
- Both rpcc and rpcs use the connection class as an abstraction for the
- underlying communication channel. To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has
- failed (thus the caller can free the buffer when send() returns). When a
- request/reply is received, connection makes a callback into the corresponding
- rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
-
- Thread organization:
- rpcc uses application threads to send RPC requests and blocks to receive the
- reply or error. All connections use a single PollMgr object to perform async
- socket IO. PollMgr creates a single thread to examine the readiness of socket
- file descriptors and informs the corresponding connection whenever a socket is
- ready to be read or written. (We use asynchronous socket IO to reduce the
- number of threads needed to manage these connections; without async IO, at
- least one thread is needed per connection to read data without blocking other
- activities.) Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests. The thread pool allows
- us to control the number of threads spawned at the server (spawning one thread
- per request will hurt when the server faces thousands of requests).
-
- In order to delete a connection object, we must maintain a reference count.
- For rpcc, multiple client threads might be invoking the rpcc::call() functions
- and thus holding multiple references to the underlying connection object. For
- rpcs, multiple dispatch threads might be holding references to the same
- connection object. A connection object is deleted only when the underlying
- connection is dead and the reference count reaches zero.
-
- This version of the RPC library explicitly joins exited threads to make sure
- no outstanding references exist before deleting objects.
-
- To delete a rpcc object safely, the users of the library must ensure that
- there are no outstanding calls on the rpcc object.
-
- To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections. 3.
- delete the dispatch thread pool which involves waiting for current active RPC
- handlers to finish. It is interesting how a thread pool can be deleted
- without using thread cancellation. The trick is to inject x "poison pills" for
- a thread pool of x threads. Upon getting a poison pill instead of a normal
- task, a worker thread will exit (and thread pool destructor waits to join all
- x exited worker threads).
- */
+//
+// The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
+// server. The jobs of rpcc include maintaining a connection to server, sending
+// RPC requests and waiting for responses, retransmissions, at-most-once delivery
+// etc.
+//
+// The rpcs class handles the server side of RPC. Each rpcs handles multiple
+// connections from different rpcc objects. The jobs of rpcs include accepting
+// connections, dispatching requests to registered RPC handlers, at-most-once
+// delivery etc.
+//
+// Both rpcc and rpcs use the connection class as an abstraction for the
+// underlying communication channel. To send an RPC request/reply, one calls
+// connection::send() which blocks until data is sent or the connection has
+// failed (thus the caller can free the buffer when send() returns). When a
+// request/reply is received, connection makes a callback into the corresponding
+// rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
+//
+// Thread organization:
+// rpcc uses application threads to send RPC requests and blocks to receive the
+// reply or error. All connections use a single PollMgr object to perform async
+// socket IO. PollMgr creates a single thread to examine the readiness of socket
+// file descriptors and informs the corresponding connection whenever a socket is
+// ready to be read or written. (We use asynchronous socket IO to reduce the
+// number of threads needed to manage these connections; without async IO, at
+// least one thread is needed per connection to read data without blocking other
+// activities.) Each rpcs object creates one thread for listening on the server
+// port and a pool of threads for executing RPC requests. The thread pool allows
+// us to control the number of threads spawned at the server (spawning one thread
+// per request will hurt when the server faces thousands of requests).
+//
+// In order to delete a connection object, we must maintain a reference count.
+// For rpcc, multiple client threads might be invoking the rpcc::call() functions
+// and thus holding multiple references to the underlying connection object. For
+// rpcs, multiple dispatch threads might be holding references to the same
+// connection object. A connection object is deleted only when the underlying
+// connection is dead and the reference count reaches zero.
+//
+// This version of the RPC library explicitly joins exited threads to make sure
+// no outstanding references exist before deleting objects.
+//
+// To delete a rpcc object safely, the users of the library must ensure that
+// there are no outstanding calls on the rpcc object.
+//
+// To delete a rpcs object safely, we do the following in sequence: 1. stop
+// accepting new incoming connections. 2. close existing active connections. 3.
+// delete the dispatch thread pool which involves waiting for current active RPC
+// handlers to finish. It is interesting how a thread pool can be deleted
+// without using thread cancellation. The trick is to inject x "poison pills" for
+// a thread pool of x threads. Upon getting a poison pill instead of a normal
+// task, a worker thread will exit (and thread pool destructor waits to join all
+// x exited worker threads).
+//
#include "rpc.h"
#include <netinet/tcp.h>
#include <netdb.h>
#include <unistd.h>
+#include <string.h>
inline void set_rand_seed() {
auto now = time_point_cast<nanoseconds>(steady_clock::now());
{
if (retrans) {
set_rand_seed();
- clt_nonce_ = (unsigned int)random();
+ clt_nonce_ = (nonce_t)random();
} else {
// special client nonce 0 means this client does not
// require at-most-once logic from the server
// are blocked inside rpcc or will use rpcc in the future
rpcc::~rpcc() {
cancel();
- IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " channo=" << (chan_?chan_->channo():-1));
- if (chan_)
- chan_->closeconn();
+ IF_LEVEL(2) LOG("delete nonce " << clt_nonce_ << " chan " << (chan_?(int)chan_->fd:-1));
+ chan_.reset();
VERIFY(calls_.size() == 0);
}
int rpcc::bind(milliseconds to) {
- unsigned int r;
- int ret = call_timeout(rpc_protocol::bind, to, r, 0);
+ nonce_t r;
+ int ret = call_timeout(rpc_protocol::bind, to, r);
if (ret == 0) {
lock ml(m_);
bind_done_ = true;
int rpcc::call1(proc_id_t proc, marshall &req, string &rep, milliseconds to) {
caller ca(0, &rep);
- int xid_rep;
+ xid_t xid_rep;
{
lock ml(m_);
}
}
-rpcs::rpcs(in_port_t p1, size_t count)
- : port_(p1), counting_(count), curr_counts_(count), reachable_ (true)
+rpcs::rpcs(in_port_t p1)
+ : port_(p1), reachable_ (true)
{
set_rand_seed();
- nonce_ = (unsigned int)random();
+ nonce_ = (nonce_t)random();
IF_LEVEL(2) LOG("created with nonce " << nonce_);
reg(rpc_protocol::bind, &rpcs::rpcbind, this);
void rpcs::start() {
char *loss_env = getenv("RPC_LOSSY");
- listener_ = unique_ptr<tcpsconn>(new tcpsconn(this, port_, loss_env ? atoi(loss_env) : 0));
+ listener_.reset(new connection_listener(this, port_, loss_env ? atoi(loss_env) : 0));
}
rpcs::~rpcs() {
return true;
}
- return dispatchpool_->addJob(bind(&rpcs::dispatch, this, c, b));
+ return dispatchpool_->addJob(std::bind(&rpcs::dispatch, this, c, b));
}
void rpcs::reg1(proc_id_t proc, handler *h) {
VERIFY(procs_.count(proc) >= 1);
}
-void rpcs::updatestat(proc_id_t proc) {
- lock cl(count_m_);
- counts_[proc]++;
- curr_counts_--;
- if (curr_counts_ == 0) {
- LOG("RPC STATS: ");
- for (auto i = counts_.begin(); i != counts_.end(); i++)
- LOG(hex << i->first << ":" << dec << i->second);
-
- lock rwl(reply_window_m_);
-
- size_t totalrep = 0, maxrep = 0;
- for (auto clt : reply_window_) {
- totalrep += clt.second.size();
- if (clt.second.size() > maxrep)
- maxrep = clt.second.size();
- }
- IF_LEVEL(1) LOG("REPLY WINDOW: clients " << (reply_window_.size()-1) << " total reply " <<
- totalrep << " max per client " << maxrep);
- curr_counts_ = counting_;
- }
-}
-
void rpcs::dispatch(shared_ptr<connection> c, const string & buf) {
unmarshall req(buf, true);
VERIFY (reply_window_[h.clt_nonce].size() == 0); // create
reply_window_[h.clt_nonce].push_back(reply_t(-1)); // store starting reply xid
IF_LEVEL(2) LOG("new client " << h.clt_nonce << " xid " << h.xid <<
- " chan " << c->channo() << ", total clients " << (reply_window_.size()-1));
+ " chan " << c->fd << ", total clients " << (reply_window_.size()-1));
}
}
lock rwl(conns_m_);
if (conns_.find(h.clt_nonce) == conns_.end())
conns_[h.clt_nonce] = c;
- else if (conns_[h.clt_nonce]->create_time() < c->create_time())
+ else if (conns_[h.clt_nonce]->create_time < c->create_time)
conns_[h.clt_nonce] = c;
}
switch (stat) {
case NEW: // new request
- if (counting_)
- updatestat(proc);
-
rh.ret = (*f)(req, rep);
if (rh.ret == rpc_protocol::unmarshal_args_failure) {
LOG("failed to unmarshall the arguments. You are " <<
// DONE: seen this xid, previous reply returned in b.
// FORGOTTEN: might have seen this xid, but deleted previous reply.
rpcs::rpcstate_t
-rpcs::checkduplicate_and_update(unsigned int clt_nonce, int xid,
- int xid_rep, string & b)
+rpcs::checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+ xid_t xid_rep, string & b)
{
lock rwl(reply_window_m_);
VERIFY(l.size() > 0);
VERIFY(xid >= xid_rep);
- int past_xid_rep = l.begin()->xid;
+ xid_t past_xid_rep = l.begin()->xid;
list<reply_t>::iterator start = l.begin(), it = ++start;
// add_reply() should remember b.
// free_reply_window() and checkduplicate_and_update are responsible for
// cleaning up the remembered values.
-void rpcs::add_reply(unsigned int clt_nonce, int xid, const string & b) {
+void rpcs::add_reply(nonce_t clt_nonce, xid_t xid, const string & b) {
lock rwl(reply_window_m_);
// remember the RPC reply value
list<reply_t> &l = reply_window_[clt_nonce];
reply_window_.clear();
}
-int rpcs::rpcbind(unsigned int &r, int) {
+int rpcs::rpcbind(nonce_t &r) {
IF_LEVEL(2) LOG("called return nonce " << nonce_);
r = nonce_;
return 0;
using proc_id_t = rpc_protocol::proc_id_t;
template <class S>
using proc_t = rpc_protocol::proc_t<S>;
+ using nonce_t = rpc_protocol::nonce_t;
+ using xid_t = rpc_protocol::xid_t;
// manages per rpc info
struct caller {
- caller(int _xid, string *_rep) : xid(_xid), rep(_rep) {}
+ caller(xid_t _xid, string *_rep) : xid(_xid), rep(_rep) {}
int xid;
string *rep;
};
void get_refconn(shared_ptr<connection> & ch);
- void update_xid_rep(int xid);
+ void update_xid_rep(xid_t xid);
sockaddr_in dst_;
- unsigned int clt_nonce_;
- unsigned int srv_nonce_;
+ nonce_t clt_nonce_;
+ nonce_t srv_nonce_;
bool bind_done_;
- int xid_;
+ xid_t xid_;
int lossytest_;
bool retrans_;
bool reachable_;
cond destroy_wait_c_;
map<int, caller *> calls_;
- list<int> xid_rep_window_;
+ list<xid_t> xid_rep_window_;
struct request {
void clear() { buf.clear(); xid = -1; }
bool isvalid() { return xid != -1; }
string buf;
- int xid = -1;
+ xid_t xid = -1;
};
request dup_req_;
int xid_rep_done_;
rpcc(const string & d, bool retrans=true);
~rpcc();
- unsigned int id() { return clt_nonce_; }
+ nonce_t id() { return clt_nonce_; }
int bind(milliseconds to = rpc::to_max);
using proc_id_t = rpc_protocol::proc_id_t;
template <class S>
using proc_t = rpc_protocol::proc_t<S>;
+ using nonce_t = rpc_protocol::nonce_t;
+ using xid_t = rpc_protocol::xid_t;
typedef enum {
NEW, // new RPC, not a duplicate
// has been sent; in that case buf points to a copy of the reply,
// and sz holds the size of the reply.
struct reply_t {
- reply_t (int _xid) : xid(_xid), cb_present(false) {}
- reply_t (int _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
- int xid;
+ reply_t (xid_t _xid) : xid(_xid), cb_present(false) {}
+ reply_t (xid_t _xid, const string & _buf) : xid(_xid), cb_present(true), buf(_buf) {}
+ xid_t xid;
bool cb_present; // whether the reply buffer is valid
string buf; // the reply buffer
};
in_port_t port_;
- unsigned int nonce_;
+ nonce_t nonce_;
// provide at most once semantics by maintaining a window of replies
// per client that that client hasn't acknowledged receiving yet.
// indexed by client nonce.
- map<unsigned int, list<reply_t>> reply_window_;
+ map<nonce_t, list<reply_t>> reply_window_;
void free_reply_window(void);
- void add_reply(unsigned int clt_nonce, int xid, const string & b);
-
- rpcstate_t checkduplicate_and_update(unsigned int clt_nonce,
- int xid, int rep_xid, string & b);
+ void add_reply(nonce_t clt_nonce, xid_t xid, const string & b);
- void updatestat(proc_id_t proc);
+ rpcstate_t checkduplicate_and_update(nonce_t clt_nonce, xid_t xid,
+ xid_t rep_xid, string & b);
// latest connection to the client
- map<unsigned int, shared_ptr<connection>> conns_;
-
- // counting
- const size_t counting_;
- size_t curr_counts_;
- map<proc_id_t, size_t> counts_;
+ map<nonce_t, shared_ptr<connection>> conns_;
bool reachable_;
map<proc_id_t, handler *> procs_;
mutex procs_m_; // protect insert/delete to procs[]
- mutex count_m_; // protect modification of counts
mutex reply_window_m_; // protect reply window et al
mutex conns_m_; // protect conns_
void reg1(proc_id_t proc, handler *);
unique_ptr<thread_pool> dispatchpool_;
- unique_ptr<tcpsconn> listener_;
+ unique_ptr<connection_listener> listener_;
// RPC handler for clients binding
- rpc_protocol::status rpcbind(unsigned int &r, int a);
+ rpc_protocol::status rpcbind(nonce_t &r);
bool got_pdu(const shared_ptr<connection> & c, const string & b);
public:
- rpcs(in_port_t port, size_t counts=0);
+ rpcs(in_port_t port);
~rpcs();
void set_reachable(bool r) { reachable_ = r; }
- template<class P, class F, class C=void> void reg(proc_t<P> proc, F f, C *c=nullptr) {
+ template<class P, class F, class C=void> inline void reg(proc_t<P> proc, F f, C *c=nullptr) {
static_assert(is_valid_registration<P, F>::value, "RPC handler registered with incorrect argument types");
struct ReturnOnFailure {
static inline int unmarshall_args_failure() {
using status = int32_t;
using rpc_sz_t = uint32_t;
+ using nonce_t = uint32_t;
+ using xid_t = int32_t;
enum : status {
timeout_failure = -1,
};
struct request_header {
- int xid;
+ xid_t xid;
proc_id_t proc;
- unsigned int clt_nonce;
- unsigned int srv_nonce;
- int xid_rep;
+ nonce_t clt_nonce;
+ nonce_t srv_nonce;
+ xid_t xid_rep;
MEMBERS(xid, proc, clt_nonce, srv_nonce, xid_rep)
};
struct reply_header {
- int xid;
+ xid_t xid;
int ret;
MEMBERS(xid, ret)
const size_t RPC_HEADER_SZ = max(sizeof(request_header), sizeof(reply_header)) + sizeof(rpc_sz_t);
const size_t DEFAULT_RPC_SZ = 1024; // size of initial buffer allocation
- const size_t MAX_PDU = 10<<20; //maximum PDF is 10M
+ const size_t MAX_PDU = 10<<20; // maximum PDF is 10M
-#define REMOTE_PROCEDURE_BASE(_base_) enum proc_no : ::rpc_protocol::proc_id_t { base = _base_ };
-#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr ::rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
+#define REMOTE_PROCEDURE_BASE(_base_) static constexpr rpc_protocol::proc_id_t base = _base_;
+#define REMOTE_PROCEDURE(_offset_, _name_, _args_) static constexpr rpc_protocol::proc_t<status _args_> _name_{base + _offset_};
REMOTE_PROCEDURE_BASE(0);
- REMOTE_PROCEDURE(1, bind, (unsigned int &, int)); // handler number reserved for bind
+ REMOTE_PROCEDURE(1, bind, (nonce_t &)); // handler number reserved for bind
};
ENDIAN_SWAPPABLE(rpc_protocol::request_header)
#include <arpa/inet.h>
#include <getopt.h>
#include <unistd.h>
+#include <string.h>
#define NUM_CL 2
char log_thread_prefix = 'r';
-rpcs *server; // server rpc object
-rpcc *clients[NUM_CL]; // client rpc object
-string dst; //server's ip address
-in_port_t port;
+static rpcs *server; // server rpc object
+static rpcc *clients[NUM_CL]; // client rpc object
+static string dst; //server's ip address
+static in_port_t port;
// server-side handlers. they must be methods of some class
// to simplify rpcs::reg(). a server process can have handlers
}
int srv::handle_bigrep(string &r, const size_t len) {
- r = string((size_t)len, 'x');
+ r = string(len, 'x');
return 0;
}
-srv service;
+static srv service;
void startserver() {
server = new rpcs(port);
VERIFY(((string)m).size()==rpc_protocol::RPC_HEADER_SZ);
int i = 12345;
unsigned long long l = 1223344455L;
+ size_t sz = 101010101;
string s = "hallo....";
m << i;
m << l;
m << s;
+ m << sz;
string b = m;
- VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int));
+ VERIFY(b.size() == rpc_protocol::RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int)+sizeof(uint32_t));
unmarshall un(b, true);
rpc_protocol::request_header rh1;
int i1;
unsigned long long l1;
string s1;
+ size_t sz1;
un >> i1;
un >> l1;
un >> s1;
+ un >> sz1;
VERIFY(un.okdone());
- VERIFY(i1==i && l1==l && s1==s);
+ VERIFY(i1==i && l1==l && s1==s && sz1==sz);
}
void client1(size_t cl) {
// huge RPC
string big(1000000, 'x');
intret = c->call(srv_protocol::_22, rep, big, (string)"z");
+ VERIFY(intret == 0);
VERIFY(rep.size() == 1000001);
cout << " -- huge 1M rpc request .. ok" << endl;
}
// The recovery thread runs this function
-void rsm::recovery() [[noreturn]] {
+void rsm::recovery() {
bool r = true;
lock ml(rsm_mutex);
}
}
execute(procno, req, r);
+ for (size_t i=0; i<r.size(); i++) {
+ LOG(hex << setfill('0') << setw(2) << (unsigned int)(unsigned char)r[i]);
+ }
last_myvs = vs;
return rsm_client_protocol::OK;
}
}
//
-// RPC handler: Send back all the nodes this local knows about to client
-// so the client can switch to a different primary
-// when it existing primary fails
+// RPC handler: Responds with the list of known nodes for fall-back on a
+// primary failure
//
rsm_client_protocol::status rsm::client_members(vector<string> &r, int) {
vector<string> m;
}
-// Testing server
-
-// Simulate partitions
+// Test RPCs -- simulate partitions and failures
-// assumes caller holds rsm_mutex
-void rsm::net_repair(bool heal, lock &) {
+void rsm::net_repair(bool heal, lock &/*rsm_mutex_lock*/) {
vector<string> m;
cfg->get_view(vid_commit, m);
for (unsigned i = 0; i < m.size(); i++) {
lock ml(rsm_mutex);
LOG("heal " << heal << " (dopartition " <<
dopartition << ", partitioned " << partitioned << ")");
- if (heal) {
+ if (heal)
net_repair(heal, ml);
- partitioned = false;
- } else {
+ else
dopartition = true;
- partitioned = false;
- }
- r = rsm_test_protocol::OK;
- return r;
+ partitioned = false;
+ return r = rsm_test_protocol::OK;
}
// simulate failure at breakpoint 1 and 2
bool amiprimary();
void set_state_transfer(rsm_state_transfer *_stf) { stf = _stf; }
- void recovery();
+ void recovery NORETURN ();
void commit_change(unsigned vid);
template<class P, class F, class C=void> void reg(rpc_protocol::proc_t<P> proc, F f, C *c=nullptr) {
LEXICOGRAPHIC_COMPARISON(viewstamp)
};
-MARSHALLABLE(viewstamp)
+MARSHALLABLE_STRUCT(viewstamp)
namespace rsm_protocol {
enum status : rpc_protocol::status { OK, ERR, BUSY};
REMOTE_PROCEDURE(4, joinreq, (string &, string, viewstamp));
};
-MARSHALLABLE(rsm_protocol::transferres)
+MARSHALLABLE_STRUCT(rsm_protocol::transferres)
namespace rsm_test_protocol {
enum status : rpc_protocol::status {OK, ERR};
#include <algorithm>
using std::copy;
-using std::move;
+using std::count_if;
+using std::find;
using std::max;
using std::min;
using std::min_element;
-using std::find;
-using std::count_if;
+using std::move;
+using std::swap;
#include <condition_variable>
using cond = std::condition_variable;
using std::cv_status;
#include <chrono>
-using std::chrono::seconds;
-using std::chrono::milliseconds;
+using std::chrono::duration_cast;
using std::chrono::microseconds;
+using std::chrono::milliseconds;
using std::chrono::nanoseconds;
+using std::chrono::seconds;
using std::chrono::steady_clock;
using std::chrono::system_clock;
-using std::chrono::duration_cast;
-using std::chrono::time_point_cast;
using std::chrono::time_point;
+using std::chrono::time_point_cast;
#include <exception>
using std::exception;
#include <fstream>
-using std::ofstream;
using std::ifstream;
+using std::ofstream;
-#ifndef LIBT4_NO_FUNCTIONAL
#include <functional>
+// std::bind conflicts with BIND(2)
using std::function;
-using std::bind;
using std::placeholders::_1;
-#endif
#include <iomanip>
#include <iostream>
#include "endian.h"
+#ifndef __has_attribute
+#define __has_attribute(x) 0
+#endif
+
+#if __has_attribute(noreturn)
+#define NORETURN [[noreturn]]
+#else
+#define NORETURN
+#endif
+
#endif