cfg_mutex_lock.unlock();
int r = 0, ret = rpc_const::bind_failure;
if (rpcc *cl = h.safebind())
- ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(100), r, me, vid);
+ ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid);
cfg_mutex_lock.lock();
heartbeat_t res = OK;
// value to support the assumption.
//
// With RPC_LOSSY=5, tests may fail due to delays and time outs.
- int ret = cl->bind(rpcc::to(1000));
+ int ret = cl->bind(milliseconds(1000));
if (ret < 0) {
LOG("bind failure! " << h->m << " " << ret);
delete cl;
if (!r)
continue;
auto status = (paxos_protocol::status)r->call_timeout(
- paxos_protocol::preparereq, rpcc::to(100), res, me, instance, proposal);
+ paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal);
if (status == paxos_protocol::OK) {
if (res.oldinstance) {
LOG("commiting old instance!");
continue;
bool accept = false;
int status = r->call_timeout(
- paxos_protocol::acceptreq, rpcc::to(100), accept, me, instance, proposal, v);
+ paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v);
if (status == paxos_protocol::OK && accept)
accepts.push_back(i);
}
if (!r)
continue;
int res = 0;
- r->call_timeout(paxos_protocol::decidereq, rpcc::to(100), res, me, instance, v);
+ r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v);
}
}
#include "rpc_protocol.h"
#include <cerrno>
#include <csignal>
-#include <fcntl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include <unistd.h>
-#include <sys/socket.h>
#include "marshall.h"
connection::connection(chanmgr *m1, int f1, int l1)
: mgr_(m1), fd_(f1), lossy_(l1)
{
- int flags = fcntl(fd_, F_GETFL, NULL);
- fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+ fd_.flags() |= O_NONBLOCK;
signal(SIGPIPE, SIG_IGN);
create_time_ = steady_clock::now();
- PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this);
+ PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
}
connection::~connection() {
VERIFY(dead_);
VERIFY(!wpdu_.buf.size());
- close(fd_);
}
void connection::incref() {
}
//after block_remove_fd, select will never wait on fd_
//and no callbacks will be active
- PollMgr::Instance()->block_remove_fd(fd_);
+ PollMgr::Instance().block_remove_fd(fd_);
}
void connection::decref() {
if (!writepdu()) {
dead_ = true;
ml.unlock();
- PollMgr::Instance()->block_remove_fd(fd_);
+ PollMgr::Instance().block_remove_fd(fd_);
ml.lock();
} else if (wpdu_.solong != wpdu_.buf.size()) {
// should be rare to need to explicitly add write callback
- PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this);
+ PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
send_complete_.wait(ml);
}
VERIFY(!dead_);
VERIFY(fd_ == s);
if (wpdu_.buf.size() == 0) {
- PollMgr::Instance()->del_callback(fd_,CB_WRONLY);
+ PollMgr::Instance().del_callback(fd_,CB_WRONLY);
return;
}
if (!writepdu()) {
- PollMgr::Instance()->del_callback(fd_, CB_RDWR);
+ PollMgr::Instance().del_callback(fd_, CB_RDWR);
dead_ = true;
} else {
VERIFY(wpdu_.solong != size_t_max);
if (!succ) {
IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
- PollMgr::Instance()->del_callback(fd_,CB_RDWR);
+ PollMgr::Instance().del_callback(fd_,CB_RDWR);
dead_ = true;
send_complete_.notify_one();
}
IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size());
if (!rpdu_.buf.size()) {
rpc_sz_t sz1;
- ssize_t n = read(fd_, &sz1, sizeof(sz1));
+ ssize_t n = fd_.read(sz1);
- if (n == 0) {
+ if (n == 0)
return false;
- }
if (n < 0) {
VERIFY(errno!=EAGAIN);
rpdu_.solong = sizeof(sz1);
}
- ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
+ ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong);
IF_LEVEL(5) LOG("read " << n << " bytes");
}
tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
-: mgr_(m1), lossy_(lossytest)
+: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = hton(port);
- tcp_ = socket(AF_INET, SOCK_STREAM, 0);
- if (tcp_ < 0) {
- perror("accept_loop socket:");
- VERIFY(0);
- }
+ tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
+ tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
- int yes = 1;
- setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
- setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+ struct timeval timeout = {0, 50000};
+
+ if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+ perror("accept_loop setsockopt");
+
+ if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+ perror("accept_loop setsockopt");
// careful to exactly match type signature of bind arguments so we don't
// get std::bind instead
- if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
- perror("accept_loop tcp bind:");
+ if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+ perror("accept_loop bind");
VERIFY(0);
}
if (listen(tcp_, 1000) < 0) {
- perror("listen:");
+ perror("accept_loop listen");
VERIFY(0);
}
IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port);
- if (pipe(pipe_) < 0) {
- perror("accept_loop pipe:");
- VERIFY(0);
- }
+ file_t::pipe(pipe_);
- int flags = fcntl(pipe_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipe_[0], F_SETFL, flags);
+ pipe_[0].flags() |= O_NONBLOCK;
th_ = thread(&tcpsconn::accept_conn, this);
}
tcpsconn::~tcpsconn()
{
- VERIFY(close(pipe_[1]) == 0);
+ pipe_[1].close();
th_.join();
- //close all the active connections
+ // close all the active connections
map<int, connection *>::iterator i;
for (i = conns_.begin(); i != conns_.end(); i++) {
i->second->closeconn();
void tcpsconn::accept_conn() {
fd_set rfds;
- int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_;
-
- try {
- while (1) {
- FD_ZERO(&rfds);
- FD_SET(pipe_[0], &rfds);
- FD_SET(tcp_, &rfds);
-
- int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
-
- if (ret < 0) {
- if (errno == EINTR) {
- continue;
- } else {
- perror("accept_conn select:");
- IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
- VERIFY(0);
- }
- }
-
- if (FD_ISSET(pipe_[0], &rfds)) {
- close(pipe_[0]);
- close(tcp_);
- return;
- }
- else if (FD_ISSET(tcp_, &rfds)) {
- process_accept();
- } else {
- VERIFY(0);
- }
+ int max_fd = max((int)pipe_[0], (int)tcp_);
+
+ while (1) {
+ FD_ZERO(&rfds);
+ FD_SET(pipe_[0], &rfds);
+ FD_SET(tcp_, &rfds);
+
+ int ret = select(max_fd+1, &rfds, NULL, NULL, NULL);
+
+ if (ret < 0 && errno == EINTR)
+ continue;
+ else if (ret < 0) {
+ perror("accept_conn select:");
+ IF_LEVEL(0) LOG("accept_conn failure errno " << errno);
+ VERIFY(0);
+ }
+
+ if (FD_ISSET(pipe_[0], &rfds))
+ return;
+
+ if (!FD_ISSET(tcp_, &rfds))
+ VERIFY(0);
+
+ try {
+ process_accept();
+ } catch (thread_exit_exception e) {
+ break;
}
- }
- catch (thread_exit_exception e)
- {
}
}
#include <arpa/inet.h>
#include <netinet/in.h>
#include "pollmgr.h"
+#include "file.h"
constexpr size_t size_t_max = numeric_limits<size_t>::max();
bool writepdu();
chanmgr *mgr_;
- const int fd_;
+ const file_t fd_;
bool dead_ = false;
charbuf wpdu_;
in_port_t port_;
mutex m_;
thread th_;
- int pipe_[2];
+ file_t pipe_[2];
- int tcp_; //file desciptor for accepting connection
+ socket_t tcp_; // listens for connections
chanmgr *mgr_;
int lossy_;
map<int, connection *> conns_;
--- /dev/null
+#ifndef file_h
+#define file_h
+
+#include <fcntl.h>
+#include <unistd.h>
+#include "types.h"
+#include <sys/socket.h>
+
+class file_t {
+ private:
+ int fd_;
+
+ class flags_t {
+ private:
+ const file_t & f_;
+ int flags_;
+ public:
+ flags_t(const file_t & f) : f_(f), flags_(fcntl(f_.fd_, F_GETFL, NULL)) { }
+ ~flags_t() { fcntl(f_.fd_, F_SETFL, flags_); }
+ operator int & () { return flags_; }
+ };
+ public:
+ inline file_t(int fd=-1) : fd_(fd) {}
+ inline file_t(const file_t &) = delete;
+ inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); }
+ inline ~file_t() { if (fd_ != -1) ::close(fd_); }
+ static inline void pipe(file_t *ends) {
+ int fds[2];
+ VERIFY(::pipe(fds) == 0);
+ ends[0].fd_ = fds[0];
+ ends[1].fd_ = fds[1];
+ }
+ inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; }
+ inline flags_t flags() const { return *this; }
+ inline void close() {
+ ::close(fd_);
+ fd_ = -1;
+ }
+ template <class T>
+ inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); }
+ inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); }
+ template <class T>
+ inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); }
+ inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); }
+};
+
+class socket_t : public file_t {
+ public:
+ socket_t(int fd=-1) : file_t(fd) {}
+ template <class T>
+ int setsockopt(int level, int option, T && value) {
+ return ::setsockopt(*this, level, option, &value, sizeof(T));
+ }
+};
+
+#endif
#include "types.h"
#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
+#include <sys/select.h>
+#include "file.h"
-#include "pollmgr.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
-PollMgr *PollMgr::instance = NULL;
-static once_flag pollmgr_is_initialized;
+#include "pollmgr.h"
-static void
-PollMgrInit()
-{
- PollMgr::instance = new PollMgr();
-}
+static PollMgr instance;
+
+PollMgr & PollMgr::Instance() { return instance; }
+
+class wait_manager {
+ public:
+ virtual void watch_fd(int fd, poll_flag flag) = 0;
+ virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
+ virtual void wait_ready(vector<int> & readable, vector<int> & writable) = 0;
+ virtual ~wait_manager() throw() {}
+};
+
+class SelectAIO : public wait_manager {
+ public :
+ SelectAIO();
+ ~SelectAIO() {}
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ void wait_ready(vector<int> & readable, vector<int> & writable);
+
+ private:
+ fd_set rfds_, wfds_;
+ int highfds_;
+ file_t pipe_[2];
+ mutex m_;
+};
-PollMgr *
-PollMgr::Instance()
-{
- call_once(pollmgr_is_initialized, PollMgrInit);
- return instance;
-}
+#ifdef __linux__
+class EPollAIO : public wait_manager {
+ public:
+ EPollAIO() {}
+ ~EPollAIO() throw() { }
+ void watch_fd(int fd, poll_flag flag);
+ bool unwatch_fd(int fd, poll_flag flag);
+ void wait_ready(vector<int> & readable, vector<int> & writable);
+
+ private:
+ file_t poll_ = epoll_create(MAX_POLL_FDS);
+ struct epoll_event ready_[MAX_POLL_FDS];
+ vector<unsigned> fdstatus_ = vector<unsigned>(MAX_POLL_FDS);
+};
+#endif
-PollMgr::PollMgr() : pending_change_(false)
-{
- bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
- aio_ = new SelectAIO();
- //aio_ = new EPollAIO();
+PollMgr::PollMgr() : aio_(new SelectAIO()) {
th_ = thread(&PollMgr::wait_loop, this);
}
-PollMgr::~PollMgr() [[noreturn]]
+PollMgr::~PollMgr()
{
- //never kill me!!!
- VERIFY(0);
+ lock ml(m_);
+ for (auto p : callbacks_)
+ aio_->unwatch_fd(p.first, CB_RDWR);
+ pending_change_ = true;
+ shutdown_ = true;
+ changedone_c_.wait(ml);
+ delete aio_;
+ th_.join();
}
void
PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
{
- VERIFY(fd < MAX_POLL_FDS);
-
lock ml(m_);
aio_->watch_fd(fd, flag);
callbacks_[fd] = ch;
}
-//remove all callbacks related to fd
-//the return guarantees that callbacks related to fd
-//will never be called again
-void
-PollMgr::block_remove_fd(int fd)
-{
+// Remove all callbacks related to fd. After this returns, we guarantee that
+// callbacks related to fd will never be called again.
+void PollMgr::block_remove_fd(int fd) {
lock ml(m_);
aio_->unwatch_fd(fd, CB_RDWR);
pending_change_ = true;
changedone_c_.wait(ml);
- callbacks_[fd] = NULL;
+ callbacks_[fd] = nullptr;
}
-void
-PollMgr::del_callback(int fd, poll_flag flag)
-{
+void PollMgr::del_callback(int fd, poll_flag flag) {
lock ml(m_);
- if (aio_->unwatch_fd(fd, flag)) {
- callbacks_[fd] = NULL;
- }
+ if (aio_->unwatch_fd(fd, flag))
+ callbacks_[fd] = nullptr;
}
-bool
-PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
-{
- lock ml(m_);
- if (!callbacks_[fd] || callbacks_[fd]!=c)
- return false;
-
- return aio_->is_watched(fd, flag);
-}
-
-void
-PollMgr::wait_loop() [[noreturn]]
-{
-
+void PollMgr::wait_loop() {
vector<int> readable;
vector<int> writable;
+ aio_callback * cb;
while (1) {
{
if (pending_change_) {
pending_change_ = false;
changedone_c_.notify_all();
+ if (shutdown_)
+ break;
}
}
readable.clear();
writable.clear();
- aio_->wait_ready(&readable,&writable);
-
- if (!readable.size() && !writable.size()) {
- continue;
- }
- //no locking of m_
- //because no add_callback() and del_callback should
- //modify callbacks_[fd] while the fd is not dead
- for (unsigned int i = 0; i < readable.size(); i++) {
- int fd = readable[i];
- if (callbacks_[fd])
- callbacks_[fd]->read_cb(fd);
+ aio_->wait_ready(readable, writable);
+
+ for (auto fd : readable) {
+ { lock ml(m_); cb = callbacks_[fd]; }
+ if (cb) cb->read_cb(fd);
}
- for (unsigned int i = 0; i < writable.size(); i++) {
- int fd = writable[i];
- if (callbacks_[fd])
- callbacks_[fd]->write_cb(fd);
+ for (auto fd : writable) {
+ { lock ml(m_); cb = callbacks_[fd]; }
+ if (cb) cb->write_cb(fd);
}
}
}
-SelectAIO::SelectAIO() : highfds_(0)
+SelectAIO::SelectAIO()
{
FD_ZERO(&rfds_);
FD_ZERO(&wfds_);
- VERIFY(pipe(pipefd_) == 0);
- FD_SET(pipefd_[0], &rfds_);
- highfds_ = pipefd_[0];
+ file_t::pipe(pipe_);
- int flags = fcntl(pipefd_[0], F_GETFL, NULL);
- flags |= O_NONBLOCK;
- fcntl(pipefd_[0], F_SETFL, flags);
-}
+ FD_SET(pipe_[0], &rfds_);
+ highfds_ = pipe_[0];
-SelectAIO::~SelectAIO()
-{
+ pipe_[0].flags() |= O_NONBLOCK;
}
-void
-SelectAIO::watch_fd(int fd, poll_flag flag)
-{
+void SelectAIO::watch_fd(int fd, poll_flag flag) {
+ VERIFY(fd < MAX_POLL_FDS);
+
lock ml(m_);
if (highfds_ <= fd)
highfds_ = fd;
- if (flag == CB_RDONLY) {
- FD_SET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- FD_SET(fd,&wfds_);
- }else {
+ if (flag & CB_RDONLY)
FD_SET(fd,&rfds_);
+
+ if (flag & CB_WRONLY)
FD_SET(fd,&wfds_);
- }
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ VERIFY(pipe_[1].write((char)1)==1);
}
-bool
-SelectAIO::is_watched(int fd, poll_flag flag)
-{
- lock ml(m_);
- if (flag == CB_RDONLY) {
- return FD_ISSET(fd,&rfds_);
- }else if (flag == CB_WRONLY) {
- return FD_ISSET(fd,&wfds_);
- }else{
- return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_));
- }
-}
+bool SelectAIO::unwatch_fd(int fd, poll_flag flag) {
+ VERIFY(fd < MAX_POLL_FDS);
-bool
-SelectAIO::unwatch_fd(int fd, poll_flag flag)
-{
lock ml(m_);
- if (flag == CB_RDONLY) {
+ VERIFY((flag & ~CB_RDWR) == 0);
+ if (flag & CB_RDONLY)
FD_CLR(fd, &rfds_);
- }else if (flag == CB_WRONLY) {
+ if (flag & CB_WRONLY)
FD_CLR(fd, &wfds_);
- }else if (flag == CB_RDWR) {
- FD_CLR(fd, &wfds_);
- FD_CLR(fd, &rfds_);
- }else{
- VERIFY(0);
- }
- if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) {
- if (fd == highfds_) {
- int newh = pipefd_[0];
- for (int i = 0; i <= highfds_; i++) {
- if (FD_ISSET(i, &rfds_)) {
- newh = i;
- }else if (FD_ISSET(i, &wfds_)) {
- newh = i;
- }
- }
- highfds_ = newh;
- }
- }
- if (flag == CB_RDWR) {
- char tmp = 1;
- VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1);
+ int newh = pipe_[0];
+ for (int i = 0; i <= highfds_; i++) {
+ if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_))
+ newh = i;
}
+ highfds_ = newh;
+
+ if (flag == CB_RDWR)
+ VERIFY(pipe_[1].write((char)1)==1);
+
return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_));
}
-void
-SelectAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
+void SelectAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
+
fd_set trfds, twfds;
int high;
int ret = select(high+1, &trfds, &twfds, NULL, NULL);
- if (ret < 0) {
- if (errno == EINTR) {
- return;
- } else {
- perror("select:");
- IF_LEVEL(0) LOG("select_loop failure errno " << errno);
- VERIFY(0);
- }
+ if (ret < 0 && errno == EINTR)
+ return;
+ else if (ret < 0) {
+ perror("select:");
+ IF_LEVEL(0) LOG("select_loop failure errno " << errno);
+ VERIFY(0);
}
for (int fd = 0; fd <= high; fd++) {
- if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) {
+ if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) {
char tmp;
- VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1);
+ VERIFY(pipe_[0].read(tmp)==1);
VERIFY(tmp==1);
- }else {
- if (FD_ISSET(fd, &twfds)) {
- writable->push_back(fd);
- }
- if (FD_ISSET(fd, &trfds)) {
- readable->push_back(fd);
- }
+ } else {
+ if (FD_ISSET(fd, &twfds))
+ writable.push_back(fd);
+
+ if (FD_ISSET(fd, &trfds))
+ readable.push_back(fd);
}
}
}
#ifdef __linux__
-EPollAIO::EPollAIO()
-{
- pollfd_ = epoll_create(MAX_POLL_FDS);
- VERIFY(pollfd_ >= 0);
- bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
-}
-
-EPollAIO::~EPollAIO()
-{
- close(pollfd_);
-}
+void EPollAIO::watch_fd(int fd_, poll_flag flag) {
+ size_t fd = (size_t)fd_;
-static inline
-int poll_flag_to_event(poll_flag flag)
-{
- int f;
- if (flag == CB_RDONLY) {
- f = EPOLLIN;
- }else if (flag == CB_WRONLY) {
- f = EPOLLOUT;
- }else { //flag == CB_RDWR
- f = EPOLLIN | EPOLLOUT;
- }
- return f;
-}
-
-void
-EPollAIO::watch_fd(int fd, poll_flag flag)
-{
VERIFY(fd < MAX_POLL_FDS);
struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
- fdstatus_[fd] |= (int)flag;
+ int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+ fdstatus_[fd] |= (unsigned)flag;
ev.events = EPOLLET;
- ev.data.fd = fd;
+ ev.data.fd = fd_;
- if (fdstatus_[fd] & CB_RDONLY) {
+ if (fdstatus_[fd] & CB_RDONLY)
ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
+
+ if (fdstatus_[fd] & CB_WRONLY)
ev.events |= EPOLLOUT;
- }
- if (flag == CB_RDWR) {
+ if (flag == CB_RDWR)
VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
- }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+ VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
}
-bool
-EPollAIO::unwatch_fd(int fd, poll_flag flag)
-{
+bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) {
+ size_t fd = (size_t)fd_;
+
VERIFY(fd < MAX_POLL_FDS);
- fdstatus_[fd] &= ~(int)flag;
+ fdstatus_[fd] &= ~(unsigned)flag;
struct epoll_event ev;
- int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
+ int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = EPOLLET;
- ev.data.fd = fd;
+ ev.data.fd = fd_;
- if (fdstatus_[fd] & CB_RDONLY) {
+ if (fdstatus_[fd] & CB_RDONLY)
ev.events |= EPOLLIN;
- }
- if (fdstatus_[fd] & CB_WRONLY) {
+
+ if (fdstatus_[fd] & CB_WRONLY)
ev.events |= EPOLLOUT;
- }
- if (flag == CB_RDWR) {
+ if (flag == CB_RDWR)
VERIFY(op == EPOLL_CTL_DEL);
- }
- VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
+
+ VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0);
return (op == EPOLL_CTL_DEL);
}
-bool
-EPollAIO::is_watched(int fd, poll_flag flag)
-{
- VERIFY(fd < MAX_POLL_FDS);
- return ((fdstatus_[fd] & CB_MASK) == flag);
-}
+void EPollAIO::wait_ready(vector<int> & readable, vector<int> & writable) {
-void
-EPollAIO::wait_ready(vector<int> *readable, vector<int> *writable)
-{
- int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1);
+ int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1);
for (int i = 0; i < nfds; i++) {
- if (ready_[i].events & EPOLLIN) {
- readable->push_back(ready_[i].data.fd);
- }
- if (ready_[i].events & EPOLLOUT) {
- writable->push_back(ready_[i].data.fd);
- }
+ if (ready_[i].events & EPOLLIN)
+ readable.push_back(ready_[i].data.fd);
+
+ if (ready_[i].events & EPOLLOUT)
+ writable.push_back(ready_[i].data.fd);
}
}
#define pollmgr_h
#include "types.h"
-#include <sys/select.h>
-
-#ifdef __linux__
-#include <sys/epoll.h>
-#endif
#define MAX_POLL_FDS 128
CB_MASK = ~0x11,
} poll_flag;
-class aio_mgr {
- public:
- virtual void watch_fd(int fd, poll_flag flag) = 0;
- virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
- virtual bool is_watched(int fd, poll_flag flag) = 0;
- virtual void wait_ready(vector<int> *readable, vector<int> *writable) = 0;
- virtual ~aio_mgr() {}
-};
-
class aio_callback {
public:
virtual void read_cb(int fd) = 0;
PollMgr();
~PollMgr();
- static PollMgr *Instance();
- static PollMgr *CreateInst();
+ static PollMgr & Instance();
void add_callback(int fd, poll_flag flag, aio_callback *ch);
void del_callback(int fd, poll_flag flag);
- bool has_callback(int fd, poll_flag flag, aio_callback *ch);
void block_remove_fd(int fd);
void wait_loop();
-
- static PollMgr *instance;
- static int useful;
- static int useless;
-
private:
mutex m_;
cond changedone_c_;
- thread th_;
-
- aio_callback *callbacks_[MAX_POLL_FDS];
- aio_mgr *aio_;
- bool pending_change_;
-
-};
-
-class SelectAIO : public aio_mgr {
- public :
-
- SelectAIO();
- ~SelectAIO();
- void watch_fd(int fd, poll_flag flag);
- bool unwatch_fd(int fd, poll_flag flag);
- bool is_watched(int fd, poll_flag flag);
- void wait_ready(vector<int> *readable, vector<int> *writable);
- private:
-
- fd_set rfds_;
- fd_set wfds_;
- int highfds_;
- int pipefd_[2];
-
- mutex m_;
-
-};
-
-#ifdef __linux__
-class EPollAIO : public aio_mgr {
- public:
- EPollAIO();
- ~EPollAIO();
- void watch_fd(int fd, poll_flag flag);
- bool unwatch_fd(int fd, poll_flag flag);
- bool is_watched(int fd, poll_flag flag);
- void wait_ready(vector<int> *readable, vector<int> *writable);
-
- private:
- int pollfd_;
- struct epoll_event ready_[MAX_POLL_FDS];
- int fdstatus_[MAX_POLL_FDS];
+ map<int, aio_callback *> callbacks_;
+ class wait_manager *aio_;
+ bool pending_change_=false, shutdown_=false;
+ thread th_;
};
-#endif /* __linux */
-
-#endif /* pollmgr_h */
+#endif
/*
- The rpcc class handles client-side RPC. Each rpcc is bound to a
- single RPC server. The jobs of rpcc include maintaining a connection to
- server, sending RPC requests and waiting for responses, retransmissions,
- at-most-once delivery etc.
+ The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC
+ server. The jobs of rpcc include maintaining a connection to server, sending
+ RPC requests and waiting for responses, retransmissions, at-most-once delivery
+ etc.
The rpcs class handles the server side of RPC. Each rpcs handles multiple
connections from different rpcc objects. The jobs of rpcs include accepting
Both rpcc and rpcs use the connection class as an abstraction for the
underlying communication channel. To send an RPC request/reply, one calls
- connection::send() which blocks until data is sent or the connection has failed
- (thus the caller can free the buffer when send() returns). When a
+ connection::send() which blocks until data is sent or the connection has
+ failed (thus the caller can free the buffer when send() returns). When a
request/reply is received, connection makes a callback into the corresponding
rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()).
number of threads needed to manage these connections; without async IO, at
least one thread is needed per connection to read data without blocking other
activities.) Each rpcs object creates one thread for listening on the server
- port and a pool of threads for executing RPC requests. The
- thread pool allows us to control the number of threads spawned at the server
- (spawning one thread per request will hurt when the server faces thousands of
- requests).
+ port and a pool of threads for executing RPC requests. The thread pool allows
+ us to control the number of threads spawned at the server (spawning one thread
+ per request will hurt when the server faces thousands of requests).
In order to delete a connection object, we must maintain a reference count.
- For rpcc,
- multiple client threads might be invoking the rpcc::call() functions and thus
- holding multiple references to the underlying connection object. For rpcs,
- multiple dispatch threads might be holding references to the same connection
- object. A connection object is deleted only when the underlying connection is
- dead and the reference count reaches zero.
+ For rpcc, multiple client threads might be invoking the rpcc::call() functions
+ and thus holding multiple references to the underlying connection object. For
+ rpcs, multiple dispatch threads might be holding references to the same
+ connection object. A connection object is deleted only when the underlying
+ connection is dead and the reference count reaches zero.
This version of the RPC library explicitly joins exited threads to make sure
no outstanding references exist before deleting objects.
there are no outstanding calls on the rpcc object.
To delete a rpcs object safely, we do the following in sequence: 1. stop
- accepting new incoming connections. 2. close existing active connections.
- 3. delete the dispatch thread pool which involves waiting for current active
- RPC handlers to finish. It is interesting how a thread pool can be deleted
+ accepting new incoming connections. 2. close existing active connections. 3.
+ delete the dispatch thread pool which involves waiting for current active RPC
+ handlers to finish. It is interesting how a thread pool can be deleted
without using thread cancellation. The trick is to inject x "poison pills" for
a thread pool of x threads. Upon getting a poison pill instead of a normal
task, a worker thread will exit (and thread pool destructor waits to join all
#include <netdb.h>
#include <unistd.h>
-const rpcc::TO rpcc::to_max = { 120000 };
-const rpcc::TO rpcc::to_min = { 1000 };
-
inline void set_rand_seed() {
auto now = time_point_cast<nanoseconds>(steady_clock::now());
srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid());
VERIFY(calls_.size() == 0);
}
-int rpcc::bind(TO to) {
+int rpcc::bind(milliseconds to) {
unsigned int r;
int ret = call_timeout(rpc_const::bind, to, r, 0);
if(ret == 0){
LOG("done");
}
-int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) {
+int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) {
caller ca(0, &rep);
int xid_rep;
xid_rep = xid_rep_window_.front();
}
- TO curr_to;
- auto finaldeadline = steady_clock::now() + milliseconds(to.to),
- nextdeadline = finaldeadline;
-
- curr_to.to = to_min.to;
+ milliseconds curr_to = rpc::to_min;
+ auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline;
bool transmit = true;
connection *ch = NULL;
if(finaldeadline == time_point<steady_clock>::min())
break;
- nextdeadline = steady_clock::now() + milliseconds(curr_to.to);
+ nextdeadline = steady_clock::now() + curr_to;
if(nextdeadline > finaldeadline) {
nextdeadline = finaldeadline;
finaldeadline = time_point<steady_clock>::min();
// on the new connection
transmit = true;
}
- curr_to.to <<= 1;
+ curr_to *= 2;
}
{
// save the latest good connection to the client
{
- lock rwl(conss_m_);
+ lock rwl(conns_m_);
if(conns_.find(h.clt_nonce) == conns_.end()){
c->incref();
conns_[h.clt_nonce] = c;
// get the latest connection to the client
{
- lock rwl(conss_m_);
+ lock rwl(conns_m_);
if(c->isdead() && c != conns_[h.clt_nonce]){
c->decref();
c = conns_[h.clt_nonce];
#include "marshall_wrap.h"
#include "connection.h"
+namespace rpc {
+ static constexpr milliseconds to_max{12000};
+ static constexpr milliseconds to_min{100};
+}
+
class rpc_const {
public:
static const unsigned int bind = 1; // handler number reserved for bind
// manages a xid space per destination socket
// threaded: multiple threads can be sending RPCs,
class rpcc : public chanmgr {
-
private:
//manages per rpc info
list<int> xid_rep_window_;
struct request {
- request() { clear(); }
void clear() { buf.clear(); xid = -1; }
bool isvalid() { return xid != -1; }
string buf;
- int xid;
+ int xid = -1;
};
- struct request dup_req_;
+ request dup_req_;
int xid_rep_done_;
+
+ int call1(proc_t proc, marshall &req, string &rep, milliseconds to);
+
+ template<class R>
+ int call_m(proc_t proc, marshall &req, R & r, milliseconds to);
public:
rpcc(const string & d, bool retrans=true);
~rpcc();
- struct TO {
- int to;
- };
- static const TO to_max;
- static const TO to_min;
- static TO to(int x) { TO t; t.to = x; return t;}
-
unsigned int id() { return clt_nonce_; }
- int bind(TO to = to_max);
+ int bind(milliseconds to = rpc::to_max);
void set_reachable(bool r) { reachable_ = r; }
void cancel();
- int islossy() { return lossytest_ > 0; }
-
- int call1(proc_t proc, marshall &req, string &rep, TO to);
-
bool got_pdu(connection *c, const string & b);
- template<class R>
- int call_m(proc_t proc, marshall &req, R & r, TO to);
-
template<class R, typename ...Args>
inline int call(proc_t proc, R & r, const Args&... args);
template<class R, typename ...Args>
- inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args);
+ inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args);
};
template<class R> int
-rpcc::call_m(proc_t proc, marshall &req, R & r, TO to)
+rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to)
{
string rep;
int intret = call1(proc, req, rep, to);
template<class R, typename... Args> inline int
rpcc::call(proc_t proc, R & r, const Args&... args)
{
- return call_timeout(proc, rpcc::to_max, r, args...);
+ return call_timeout(proc, rpc::to_max, r, args...);
}
template<class R, typename... Args> inline int
-rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args)
+rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args)
{
marshall m{args...};
return call_m(proc, m, r, to);
mutex procs_m_; // protect insert/delete to procs[]
mutex count_m_; //protect modification of counts
mutex reply_window_m_; // protect reply window et al
- mutex conss_m_; // protect conns_
+ mutex conns_m_; // protect conns_
protected:
bool got_pdu(connection *c, const string & b);
- template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr);
-};
+ struct ReturnOnFailure {
+ static inline int unmarshall_args_failure() {
+ return rpc_const::unmarshal_args_failure;
+ }
+ };
-struct ReturnOnFailure {
- static inline int unmarshall_args_failure() {
- return rpc_const::unmarshal_args_failure;
+ template<class F, class C=void> void reg(proc_t proc, F f, C *c=nullptr) {
+ reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
}
};
-template<class F, class C> void rpcs::reg(proc_t proc, F f, C *c) {
- reg1(proc, marshalled_func<F, ReturnOnFailure>::wrap(f, c));
-}
-
#endif
for(int i = 0; i < 4; i++){
int rep = 0;
- int ret = c->call_timeout(24, rpcc::to(300), rep, i);
+ int ret = c->call_timeout(24, milliseconds(300), rep, i);
VERIFY(ret == rpc_const::timeout_failure || rep == i+2);
}
}
cout << " -- string concat RPC .. ok" << endl;
// small request, big reply (perhaps req via UDP, reply via TCP)
- intret = c->call_timeout(25, rpcc::to(20000), rep, 70000);
+ intret = c->call_timeout(25, milliseconds(20000), rep, 70000);
VERIFY(intret == 0);
VERIFY(rep.size() == 70000);
cout << " -- small request, big reply .. ok" << endl;
// specify a timeout value to an RPC that should succeed (udp)
int xx = 0;
- intret = c->call_timeout(23, rpcc::to(300), xx, 77);
+ intret = c->call_timeout(23, milliseconds(300), xx, 77);
VERIFY(intret == 0 && xx == 78);
cout << " -- no spurious timeout .. ok" << endl;
{
string arg(1000, 'x');
string rep2;
- c->call_timeout(22, rpcc::to(300), rep2, arg, (string)"x");
+ c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x");
VERIFY(rep2.size() == 1001);
cout << " -- no spurious timeout .. ok" << endl;
}
string non_existent = "127.0.0.1:7661";
rpcc *c1 = new rpcc(non_existent);
time_t t0 = time(0);
- intret = c1->bind(rpcc::to(300));
+ intret = c1->bind(milliseconds(300));
time_t t1 = time(0);
VERIFY(intret < 0 && (t1 - t0) <= 4);
cout << " -- rpc timeout .. ok" << endl;
delete server;
client1 = new rpcc(dst);
- VERIFY (client1->bind(rpcc::to(3000)) < 0);
+ VERIFY (client1->bind(milliseconds(3000)) < 0);
cout << " -- create new client and try to bind to failed server .. failed ok" << endl;
delete client1;
rsm_mutex_lock.unlock();
cl = h.safebind();
if (cl) {
- ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(100),
+ ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100),
r, cfg->myaddr(), last_myvs, vid_insync);
}
rsm_mutex_lock.lock();
rsm_mutex_lock.unlock();
cl = h.safebind();
if (cl != 0) {
- ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(12000), log,
+ ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log,
cfg->myaddr(), last_myvs);
}
rsm_mutex_lock.lock();
if (!cl)
return rsm_client_protocol::BUSY;
int ignored_rval;
- auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(100), ignored_rval, procno, vs, req);
+ auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req);
LOG("Invoke returned " << ret);
if (ret != rsm_protocol::OK)
return rsm_client_protocol::BUSY;
rpcc *cl = h.safebind();
auto ret = rsm_client_protocol::OK;
if (cl)
- ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(500), rep, proc, req);
+ ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req);
ml.lock();
if (!cl)
rsm_client_mutex_lock.unlock();
cl = h.safebind();
if (cl)
- ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(100), known_mems, 0);
+ ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0);
rsm_client_mutex_lock.lock();
}
if (cl == 0 || ret != rsm_protocol::OK)