From: Peter Iannucci Date: Thu, 10 Oct 2013 16:35:48 +0000 (-0400) Subject: Lots more clean-ups X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/3abd3952c1f4441f0dd6eae9883b2d01ed9cd56b Lots more clean-ups --- diff --git a/config.cc b/config.cc index 5373007..35654d8 100644 --- a/config.cc +++ b/config.cc @@ -204,7 +204,7 @@ config::heartbeat_t config::doheartbeat(const string &m, lock &cfg_mutex_lock) { cfg_mutex_lock.unlock(); int r = 0, ret = rpc_const::bind_failure; if (rpcc *cl = h.safebind()) - ret = cl->call_timeout(paxos_protocol::heartbeat, rpcc::to(100), r, me, vid); + ret = cl->call_timeout(paxos_protocol::heartbeat, milliseconds(100), r, me, vid); cfg_mutex_lock.lock(); heartbeat_t res = OK; diff --git a/handle.cc b/handle.cc index 5287a35..1cb5cc2 100644 --- a/handle.cc +++ b/handle.cc @@ -40,7 +40,7 @@ rpcc * handle::safebind() { // value to support the assumption. // // With RPC_LOSSY=5, tests may fail due to delays and time outs. - int ret = cl->bind(rpcc::to(1000)); + int ret = cl->bind(milliseconds(1000)); if (ret < 0) { LOG("bind failure! " << h->m << " " << ret); delete cl; diff --git a/paxos.cc b/paxos.cc index b83a044..8b00ad8 100644 --- a/paxos.cc +++ b/paxos.cc @@ -94,7 +94,7 @@ bool proposer_acceptor::prepare(unsigned instance, nodes_t & accepts, if (!r) continue; auto status = (paxos_protocol::status)r->call_timeout( - paxos_protocol::preparereq, rpcc::to(100), res, me, instance, proposal); + paxos_protocol::preparereq, milliseconds(100), res, me, instance, proposal); if (status == paxos_protocol::OK) { if (res.oldinstance) { LOG("commiting old instance!"); @@ -125,7 +125,7 @@ void proposer_acceptor::accept(unsigned instance, nodes_t & accepts, continue; bool accept = false; int status = r->call_timeout( - paxos_protocol::acceptreq, rpcc::to(100), accept, me, instance, proposal, v); + paxos_protocol::acceptreq, milliseconds(100), accept, me, instance, proposal, v); if (status == paxos_protocol::OK && accept) accepts.push_back(i); } @@ -138,7 +138,7 @@ void proposer_acceptor::decide(unsigned instance, const nodes_t & accepts, const if (!r) continue; int res = 0; - r->call_timeout(paxos_protocol::decidereq, rpcc::to(100), res, me, instance, v); + r->call_timeout(paxos_protocol::decidereq, milliseconds(100), res, me, instance, v); } } diff --git a/rpc/connection.cc b/rpc/connection.cc index 4681ae9..33e891c 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -2,30 +2,26 @@ #include "rpc_protocol.h" #include #include -#include #include #include #include -#include #include "marshall.h" connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), lossy_(l1) { - int flags = fcntl(fd_, F_GETFL, NULL); - fcntl(fd_, F_SETFL, flags | O_NONBLOCK); + fd_.flags() |= O_NONBLOCK; signal(SIGPIPE, SIG_IGN); create_time_ = steady_clock::now(); - PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); + PollMgr::Instance().add_callback(fd_, CB_RDONLY, this); } connection::~connection() { VERIFY(dead_); VERIFY(!wpdu_.buf.size()); - close(fd_); } void connection::incref() { @@ -48,7 +44,7 @@ void connection::closeconn() { } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active - PollMgr::Instance()->block_remove_fd(fd_); + PollMgr::Instance().block_remove_fd(fd_); } void connection::decref() { @@ -98,11 +94,11 @@ bool connection::send(const string & b) { if (!writepdu()) { dead_ = true; ml.unlock(); - PollMgr::Instance()->block_remove_fd(fd_); + PollMgr::Instance().block_remove_fd(fd_); ml.lock(); } else if (wpdu_.solong != wpdu_.buf.size()) { // should be rare to need to explicitly add write callback - PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + PollMgr::Instance().add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) send_complete_.wait(ml); } @@ -120,11 +116,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.buf.size() == 0) { - PollMgr::Instance()->del_callback(fd_,CB_WRONLY); + PollMgr::Instance().del_callback(fd_,CB_WRONLY); return; } if (!writepdu()) { - PollMgr::Instance()->del_callback(fd_, CB_RDWR); + PollMgr::Instance().del_callback(fd_, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); @@ -152,7 +148,7 @@ void connection::read_cb(int s) { if (!succ) { IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); - PollMgr::Instance()->del_callback(fd_,CB_RDWR); + PollMgr::Instance().del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } @@ -188,11 +184,10 @@ bool connection::readpdu() { IF_LEVEL(5) LOG("the receive buffer has length " << rpdu_.buf.size()); if (!rpdu_.buf.size()) { rpc_sz_t sz1; - ssize_t n = read(fd_, &sz1, sizeof(sz1)); + ssize_t n = fd_.read(sz1); - if (n == 0) { + if (n == 0) return false; - } if (n < 0) { VERIFY(errno!=EAGAIN); @@ -218,7 +213,7 @@ bool connection::readpdu() { rpdu_.solong = sizeof(sz1); } - ssize_t n = read(fd_, &rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); + ssize_t n = fd_.read(&rpdu_.buf[rpdu_.solong], rpdu_.buf.size() - rpdu_.solong); IF_LEVEL(5) LOG("read " << n << " bytes"); @@ -234,32 +229,33 @@ bool connection::readpdu() { } tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) -: mgr_(m1), lossy_(lossytest) +: tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest) { struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = hton(port); - tcp_ = socket(AF_INET, SOCK_STREAM, 0); - if (tcp_ < 0) { - perror("accept_loop socket:"); - VERIFY(0); - } + tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); + tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); - int yes = 1; - setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); - setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + struct timeval timeout = {0, 50000}; + + if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0) + perror("accept_loop setsockopt"); + + if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0) + perror("accept_loop setsockopt"); // careful to exactly match type signature of bind arguments so we don't // get std::bind instead - if (bind(tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { - perror("accept_loop tcp bind:"); + if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { + perror("accept_loop bind"); VERIFY(0); } if (listen(tcp_, 1000) < 0) { - perror("listen:"); + perror("accept_loop listen"); VERIFY(0); } @@ -269,24 +265,19 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) IF_LEVEL(2) LOG("listen on " << port_ << " " << sin.sin_port); - if (pipe(pipe_) < 0) { - perror("accept_loop pipe:"); - VERIFY(0); - } + file_t::pipe(pipe_); - int flags = fcntl(pipe_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipe_[0], F_SETFL, flags); + pipe_[0].flags() |= O_NONBLOCK; th_ = thread(&tcpsconn::accept_conn, this); } tcpsconn::~tcpsconn() { - VERIFY(close(pipe_[1]) == 0); + pipe_[1].close(); th_.join(); - //close all the active connections + // close all the active connections map::iterator i; for (i = conns_.begin(); i != conns_.end(); i++) { i->second->closeconn(); @@ -325,40 +316,34 @@ void tcpsconn::process_accept() { void tcpsconn::accept_conn() { fd_set rfds; - int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; - - try { - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_[0], &rfds); - FD_SET(tcp_, &rfds); - - int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); - - if (ret < 0) { - if (errno == EINTR) { - continue; - } else { - perror("accept_conn select:"); - IF_LEVEL(0) LOG("accept_conn failure errno " << errno); - VERIFY(0); - } - } - - if (FD_ISSET(pipe_[0], &rfds)) { - close(pipe_[0]); - close(tcp_); - return; - } - else if (FD_ISSET(tcp_, &rfds)) { - process_accept(); - } else { - VERIFY(0); - } + int max_fd = max((int)pipe_[0], (int)tcp_); + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0 && errno == EINTR) + continue; + else if (ret < 0) { + perror("accept_conn select:"); + IF_LEVEL(0) LOG("accept_conn failure errno " << errno); + VERIFY(0); + } + + if (FD_ISSET(pipe_[0], &rfds)) + return; + + if (!FD_ISSET(tcp_, &rfds)) + VERIFY(0); + + try { + process_accept(); + } catch (thread_exit_exception e) { + break; } - } - catch (thread_exit_exception e) - { } } diff --git a/rpc/connection.h b/rpc/connection.h index 1eb625b..3e19a93 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -6,6 +6,7 @@ #include #include #include "pollmgr.h" +#include "file.h" constexpr size_t size_t_max = numeric_limits::max(); @@ -49,7 +50,7 @@ class connection : public aio_callback { bool writepdu(); chanmgr *mgr_; - const int fd_; + const file_t fd_; bool dead_ = false; charbuf wpdu_; @@ -77,9 +78,9 @@ class tcpsconn { in_port_t port_; mutex m_; thread th_; - int pipe_[2]; + file_t pipe_[2]; - int tcp_; //file desciptor for accepting connection + socket_t tcp_; // listens for connections chanmgr *mgr_; int lossy_; map conns_; diff --git a/rpc/file.h b/rpc/file.h new file mode 100644 index 0000000..75c0d6e --- /dev/null +++ b/rpc/file.h @@ -0,0 +1,56 @@ +#ifndef file_h +#define file_h + +#include +#include +#include "types.h" +#include + +class file_t { + private: + int fd_; + + class flags_t { + private: + const file_t & f_; + int flags_; + public: + flags_t(const file_t & f) : f_(f), flags_(fcntl(f_.fd_, F_GETFL, NULL)) { } + ~flags_t() { fcntl(f_.fd_, F_SETFL, flags_); } + operator int & () { return flags_; } + }; + public: + inline file_t(int fd=-1) : fd_(fd) {} + inline file_t(const file_t &) = delete; + inline file_t(file_t && other) : fd_(-1) { std::swap(fd_, other.fd_); } + inline ~file_t() { if (fd_ != -1) ::close(fd_); } + static inline void pipe(file_t *ends) { + int fds[2]; + VERIFY(::pipe(fds) == 0); + ends[0].fd_ = fds[0]; + ends[1].fd_ = fds[1]; + } + inline operator int() const { if (fd_ == -1) throw "no fd"; return fd_; } + inline flags_t flags() const { return *this; } + inline void close() { + ::close(fd_); + fd_ = -1; + } + template + inline ssize_t read(T & t) const { return ::read(fd_, &t, sizeof(T)); } + inline ssize_t read(void * t, size_t n) const { return ::read(fd_, t, n); } + template + inline ssize_t write(const T & t) const { return ::write(fd_, &t, sizeof(T)); } + inline ssize_t write(const void * t, size_t n) const { return ::write(fd_, t, n); } +}; + +class socket_t : public file_t { + public: + socket_t(int fd=-1) : file_t(fd) {} + template + int setsockopt(int level, int option, T && value) { + return ::setsockopt(*this, level, option, &value, sizeof(T)); + } +}; + +#endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc index a938284..4acff93 100644 --- a/rpc/pollmgr.cc +++ b/rpc/pollmgr.cc @@ -1,46 +1,77 @@ #include "types.h" #include -#include -#include +#include +#include "file.h" -#include "pollmgr.h" +#ifdef __linux__ +#include +#endif -PollMgr *PollMgr::instance = NULL; -static once_flag pollmgr_is_initialized; +#include "pollmgr.h" -static void -PollMgrInit() -{ - PollMgr::instance = new PollMgr(); -} +static PollMgr instance; + +PollMgr & PollMgr::Instance() { return instance; } + +class wait_manager { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual void wait_ready(vector & readable, vector & writable) = 0; + virtual ~wait_manager() throw() {} +}; + +class SelectAIO : public wait_manager { + public : + SelectAIO(); + ~SelectAIO() {} + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + fd_set rfds_, wfds_; + int highfds_; + file_t pipe_[2]; + mutex m_; +}; -PollMgr * -PollMgr::Instance() -{ - call_once(pollmgr_is_initialized, PollMgrInit); - return instance; -} +#ifdef __linux__ +class EPollAIO : public wait_manager { + public: + EPollAIO() {} + ~EPollAIO() throw() { } + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + void wait_ready(vector & readable, vector & writable); + + private: + file_t poll_ = epoll_create(MAX_POLL_FDS); + struct epoll_event ready_[MAX_POLL_FDS]; + vector fdstatus_ = vector(MAX_POLL_FDS); +}; +#endif -PollMgr::PollMgr() : pending_change_(false) -{ - bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); - aio_ = new SelectAIO(); - //aio_ = new EPollAIO(); +PollMgr::PollMgr() : aio_(new SelectAIO()) { th_ = thread(&PollMgr::wait_loop, this); } -PollMgr::~PollMgr() [[noreturn]] +PollMgr::~PollMgr() { - //never kill me!!! - VERIFY(0); + lock ml(m_); + for (auto p : callbacks_) + aio_->unwatch_fd(p.first, CB_RDWR); + pending_change_ = true; + shutdown_ = true; + changedone_c_.wait(ml); + delete aio_; + th_.join(); } void PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { - VERIFY(fd < MAX_POLL_FDS); - lock ml(m_); aio_->watch_fd(fd, flag); @@ -48,44 +79,26 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) callbacks_[fd] = ch; } -//remove all callbacks related to fd -//the return guarantees that callbacks related to fd -//will never be called again -void -PollMgr::block_remove_fd(int fd) -{ +// Remove all callbacks related to fd. After this returns, we guarantee that +// callbacks related to fd will never be called again. +void PollMgr::block_remove_fd(int fd) { lock ml(m_); aio_->unwatch_fd(fd, CB_RDWR); pending_change_ = true; changedone_c_.wait(ml); - callbacks_[fd] = NULL; + callbacks_[fd] = nullptr; } -void -PollMgr::del_callback(int fd, poll_flag flag) -{ +void PollMgr::del_callback(int fd, poll_flag flag) { lock ml(m_); - if (aio_->unwatch_fd(fd, flag)) { - callbacks_[fd] = NULL; - } + if (aio_->unwatch_fd(fd, flag)) + callbacks_[fd] = nullptr; } -bool -PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) -{ - lock ml(m_); - if (!callbacks_[fd] || callbacks_[fd]!=c) - return false; - - return aio_->is_watched(fd, flag); -} - -void -PollMgr::wait_loop() [[noreturn]] -{ - +void PollMgr::wait_loop() { vector readable; vector writable; + aio_callback * cb; while (1) { { @@ -93,121 +106,80 @@ PollMgr::wait_loop() [[noreturn]] if (pending_change_) { pending_change_ = false; changedone_c_.notify_all(); + if (shutdown_) + break; } } readable.clear(); writable.clear(); - aio_->wait_ready(&readable,&writable); - - if (!readable.size() && !writable.size()) { - continue; - } - //no locking of m_ - //because no add_callback() and del_callback should - //modify callbacks_[fd] while the fd is not dead - for (unsigned int i = 0; i < readable.size(); i++) { - int fd = readable[i]; - if (callbacks_[fd]) - callbacks_[fd]->read_cb(fd); + aio_->wait_ready(readable, writable); + + for (auto fd : readable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->read_cb(fd); } - for (unsigned int i = 0; i < writable.size(); i++) { - int fd = writable[i]; - if (callbacks_[fd]) - callbacks_[fd]->write_cb(fd); + for (auto fd : writable) { + { lock ml(m_); cb = callbacks_[fd]; } + if (cb) cb->write_cb(fd); } } } -SelectAIO::SelectAIO() : highfds_(0) +SelectAIO::SelectAIO() { FD_ZERO(&rfds_); FD_ZERO(&wfds_); - VERIFY(pipe(pipefd_) == 0); - FD_SET(pipefd_[0], &rfds_); - highfds_ = pipefd_[0]; + file_t::pipe(pipe_); - int flags = fcntl(pipefd_[0], F_GETFL, NULL); - flags |= O_NONBLOCK; - fcntl(pipefd_[0], F_SETFL, flags); -} + FD_SET(pipe_[0], &rfds_); + highfds_ = pipe_[0]; -SelectAIO::~SelectAIO() -{ + pipe_[0].flags() |= O_NONBLOCK; } -void -SelectAIO::watch_fd(int fd, poll_flag flag) -{ +void SelectAIO::watch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); + lock ml(m_); if (highfds_ <= fd) highfds_ = fd; - if (flag == CB_RDONLY) { - FD_SET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - FD_SET(fd,&wfds_); - }else { + if (flag & CB_RDONLY) FD_SET(fd,&rfds_); + + if (flag & CB_WRONLY) FD_SET(fd,&wfds_); - } - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + VERIFY(pipe_[1].write((char)1)==1); } -bool -SelectAIO::is_watched(int fd, poll_flag flag) -{ - lock ml(m_); - if (flag == CB_RDONLY) { - return FD_ISSET(fd,&rfds_); - }else if (flag == CB_WRONLY) { - return FD_ISSET(fd,&wfds_); - }else{ - return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); - } -} +bool SelectAIO::unwatch_fd(int fd, poll_flag flag) { + VERIFY(fd < MAX_POLL_FDS); -bool -SelectAIO::unwatch_fd(int fd, poll_flag flag) -{ lock ml(m_); - if (flag == CB_RDONLY) { + VERIFY((flag & ~CB_RDWR) == 0); + if (flag & CB_RDONLY) FD_CLR(fd, &rfds_); - }else if (flag == CB_WRONLY) { + if (flag & CB_WRONLY) FD_CLR(fd, &wfds_); - }else if (flag == CB_RDWR) { - FD_CLR(fd, &wfds_); - FD_CLR(fd, &rfds_); - }else{ - VERIFY(0); - } - if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { - if (fd == highfds_) { - int newh = pipefd_[0]; - for (int i = 0; i <= highfds_; i++) { - if (FD_ISSET(i, &rfds_)) { - newh = i; - }else if (FD_ISSET(i, &wfds_)) { - newh = i; - } - } - highfds_ = newh; - } - } - if (flag == CB_RDWR) { - char tmp = 1; - VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + int newh = pipe_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_) || FD_ISSET(i, &wfds_)) + newh = i; } + highfds_ = newh; + + if (flag == CB_RDWR) + VERIFY(pipe_[1].write((char)1)==1); + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); } -void -SelectAIO::wait_ready(vector *readable, vector *writable) -{ +void SelectAIO::wait_ready(vector & readable, vector & writable) { + fd_set trfds, twfds; int high; @@ -220,130 +192,89 @@ SelectAIO::wait_ready(vector *readable, vector *writable) int ret = select(high+1, &trfds, &twfds, NULL, NULL); - if (ret < 0) { - if (errno == EINTR) { - return; - } else { - perror("select:"); - IF_LEVEL(0) LOG("select_loop failure errno " << errno); - VERIFY(0); - } + if (ret < 0 && errno == EINTR) + return; + else if (ret < 0) { + perror("select:"); + IF_LEVEL(0) LOG("select_loop failure errno " << errno); + VERIFY(0); } for (int fd = 0; fd <= high; fd++) { - if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + if (fd == pipe_[0] && FD_ISSET(fd, &trfds)) { char tmp; - VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(pipe_[0].read(tmp)==1); VERIFY(tmp==1); - }else { - if (FD_ISSET(fd, &twfds)) { - writable->push_back(fd); - } - if (FD_ISSET(fd, &trfds)) { - readable->push_back(fd); - } + } else { + if (FD_ISSET(fd, &twfds)) + writable.push_back(fd); + + if (FD_ISSET(fd, &trfds)) + readable.push_back(fd); } } } #ifdef __linux__ -EPollAIO::EPollAIO() -{ - pollfd_ = epoll_create(MAX_POLL_FDS); - VERIFY(pollfd_ >= 0); - bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); -} - -EPollAIO::~EPollAIO() -{ - close(pollfd_); -} +void EPollAIO::watch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; -static inline -int poll_flag_to_event(poll_flag flag) -{ - int f; - if (flag == CB_RDONLY) { - f = EPOLLIN; - }else if (flag == CB_WRONLY) { - f = EPOLLOUT; - }else { //flag == CB_RDWR - f = EPOLLIN | EPOLLOUT; - } - return f; -} - -void -EPollAIO::watch_fd(int fd, poll_flag flag) -{ VERIFY(fd < MAX_POLL_FDS); struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - fdstatus_[fd] |= (int)flag; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (unsigned)flag; ev.events = EPOLLET; - ev.data.fd = fd; + ev.data.fd = fd_; - if (fdstatus_[fd] & CB_RDONLY) { + if (fdstatus_[fd] & CB_RDONLY) ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { + + if (fdstatus_[fd] & CB_WRONLY) ev.events |= EPOLLOUT; - } - if (flag == CB_RDWR) { + if (flag == CB_RDWR) VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); } -bool -EPollAIO::unwatch_fd(int fd, poll_flag flag) -{ +bool EPollAIO::unwatch_fd(int fd_, poll_flag flag) { + size_t fd = (size_t)fd_; + VERIFY(fd < MAX_POLL_FDS); - fdstatus_[fd] &= ~(int)flag; + fdstatus_[fd] &= ~(unsigned)flag; struct epoll_event ev; - int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + int op = fdstatus_[fd] ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; ev.events = EPOLLET; - ev.data.fd = fd; + ev.data.fd = fd_; - if (fdstatus_[fd] & CB_RDONLY) { + if (fdstatus_[fd] & CB_RDONLY) ev.events |= EPOLLIN; - } - if (fdstatus_[fd] & CB_WRONLY) { + + if (fdstatus_[fd] & CB_WRONLY) ev.events |= EPOLLOUT; - } - if (flag == CB_RDWR) { + if (flag == CB_RDWR) VERIFY(op == EPOLL_CTL_DEL); - } - VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + + VERIFY(epoll_ctl(poll_, op, fd, &ev) == 0); return (op == EPOLL_CTL_DEL); } -bool -EPollAIO::is_watched(int fd, poll_flag flag) -{ - VERIFY(fd < MAX_POLL_FDS); - return ((fdstatus_[fd] & CB_MASK) == flag); -} +void EPollAIO::wait_ready(vector & readable, vector & writable) { -void -EPollAIO::wait_ready(vector *readable, vector *writable) -{ - int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + int nfds = epoll_wait(poll_, ready_, MAX_POLL_FDS, -1); for (int i = 0; i < nfds; i++) { - if (ready_[i].events & EPOLLIN) { - readable->push_back(ready_[i].data.fd); - } - if (ready_[i].events & EPOLLOUT) { - writable->push_back(ready_[i].data.fd); - } + if (ready_[i].events & EPOLLIN) + readable.push_back(ready_[i].data.fd); + + if (ready_[i].events & EPOLLOUT) + writable.push_back(ready_[i].data.fd); } } diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h index 2da3167..ede35f8 100644 --- a/rpc/pollmgr.h +++ b/rpc/pollmgr.h @@ -2,11 +2,6 @@ #define pollmgr_h #include "types.h" -#include - -#ifdef __linux__ -#include -#endif #define MAX_POLL_FDS 128 @@ -18,15 +13,6 @@ typedef enum { CB_MASK = ~0x11, } poll_flag; -class aio_mgr { - public: - virtual void watch_fd(int fd, poll_flag flag) = 0; - virtual bool unwatch_fd(int fd, poll_flag flag) = 0; - virtual bool is_watched(int fd, poll_flag flag) = 0; - virtual void wait_ready(vector *readable, vector *writable) = 0; - virtual ~aio_mgr() {} -}; - class aio_callback { public: virtual void read_cb(int fd) = 0; @@ -39,69 +25,22 @@ class PollMgr { PollMgr(); ~PollMgr(); - static PollMgr *Instance(); - static PollMgr *CreateInst(); + static PollMgr & Instance(); void add_callback(int fd, poll_flag flag, aio_callback *ch); void del_callback(int fd, poll_flag flag); - bool has_callback(int fd, poll_flag flag, aio_callback *ch); void block_remove_fd(int fd); void wait_loop(); - - static PollMgr *instance; - static int useful; - static int useless; - private: mutex m_; cond changedone_c_; - thread th_; - - aio_callback *callbacks_[MAX_POLL_FDS]; - aio_mgr *aio_; - bool pending_change_; - -}; - -class SelectAIO : public aio_mgr { - public : - - SelectAIO(); - ~SelectAIO(); - void watch_fd(int fd, poll_flag flag); - bool unwatch_fd(int fd, poll_flag flag); - bool is_watched(int fd, poll_flag flag); - void wait_ready(vector *readable, vector *writable); - private: - - fd_set rfds_; - fd_set wfds_; - int highfds_; - int pipefd_[2]; - - mutex m_; - -}; - -#ifdef __linux__ -class EPollAIO : public aio_mgr { - public: - EPollAIO(); - ~EPollAIO(); - void watch_fd(int fd, poll_flag flag); - bool unwatch_fd(int fd, poll_flag flag); - bool is_watched(int fd, poll_flag flag); - void wait_ready(vector *readable, vector *writable); - - private: - int pollfd_; - struct epoll_event ready_[MAX_POLL_FDS]; - int fdstatus_[MAX_POLL_FDS]; + map callbacks_; + class wait_manager *aio_; + bool pending_change_=false, shutdown_=false; + thread th_; }; -#endif /* __linux */ - -#endif /* pollmgr_h */ +#endif diff --git a/rpc/rpc.cc b/rpc/rpc.cc index 32b25ab..47ac775 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -1,8 +1,8 @@ /* - The rpcc class handles client-side RPC. Each rpcc is bound to a - single RPC server. The jobs of rpcc include maintaining a connection to - server, sending RPC requests and waiting for responses, retransmissions, - at-most-once delivery etc. + The rpcc class handles client-side RPC. Each rpcc is bound to a single RPC + server. The jobs of rpcc include maintaining a connection to server, sending + RPC requests and waiting for responses, retransmissions, at-most-once delivery + etc. The rpcs class handles the server side of RPC. Each rpcs handles multiple connections from different rpcc objects. The jobs of rpcs include accepting @@ -11,8 +11,8 @@ Both rpcc and rpcs use the connection class as an abstraction for the underlying communication channel. To send an RPC request/reply, one calls - connection::send() which blocks until data is sent or the connection has failed - (thus the caller can free the buffer when send() returns). When a + connection::send() which blocks until data is sent or the connection has + failed (thus the caller can free the buffer when send() returns). When a request/reply is received, connection makes a callback into the corresponding rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). @@ -25,18 +25,16 @@ number of threads needed to manage these connections; without async IO, at least one thread is needed per connection to read data without blocking other activities.) Each rpcs object creates one thread for listening on the server - port and a pool of threads for executing RPC requests. The - thread pool allows us to control the number of threads spawned at the server - (spawning one thread per request will hurt when the server faces thousands of - requests). + port and a pool of threads for executing RPC requests. The thread pool allows + us to control the number of threads spawned at the server (spawning one thread + per request will hurt when the server faces thousands of requests). In order to delete a connection object, we must maintain a reference count. - For rpcc, - multiple client threads might be invoking the rpcc::call() functions and thus - holding multiple references to the underlying connection object. For rpcs, - multiple dispatch threads might be holding references to the same connection - object. A connection object is deleted only when the underlying connection is - dead and the reference count reaches zero. + For rpcc, multiple client threads might be invoking the rpcc::call() functions + and thus holding multiple references to the underlying connection object. For + rpcs, multiple dispatch threads might be holding references to the same + connection object. A connection object is deleted only when the underlying + connection is dead and the reference count reaches zero. This version of the RPC library explicitly joins exited threads to make sure no outstanding references exist before deleting objects. @@ -45,9 +43,9 @@ there are no outstanding calls on the rpcc object. To delete a rpcs object safely, we do the following in sequence: 1. stop - accepting new incoming connections. 2. close existing active connections. - 3. delete the dispatch thread pool which involves waiting for current active - RPC handlers to finish. It is interesting how a thread pool can be deleted + accepting new incoming connections. 2. close existing active connections. 3. + delete the dispatch thread pool which involves waiting for current active RPC + handlers to finish. It is interesting how a thread pool can be deleted without using thread cancellation. The trick is to inject x "poison pills" for a thread pool of x threads. Upon getting a poison pill instead of a normal task, a worker thread will exit (and thread pool destructor waits to join all @@ -63,9 +61,6 @@ #include #include -const rpcc::TO rpcc::to_max = { 120000 }; -const rpcc::TO rpcc::to_min = { 1000 }; - inline void set_rand_seed() { auto now = time_point_cast(steady_clock::now()); srandom((uint32_t)now.time_since_epoch().count()^(uint32_t)getpid()); @@ -108,7 +103,7 @@ rpcc::~rpcc() { VERIFY(calls_.size() == 0); } -int rpcc::bind(TO to) { +int rpcc::bind(milliseconds to) { unsigned int r; int ret = call_timeout(rpc_const::bind, to, r, 0); if(ret == 0){ @@ -144,7 +139,7 @@ void rpcc::cancel(void) { LOG("done"); } -int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { +int rpcc::call1(proc_t proc, marshall &req, string &rep, milliseconds to) { caller ca(0, &rep); int xid_rep; @@ -168,11 +163,8 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { xid_rep = xid_rep_window_.front(); } - TO curr_to; - auto finaldeadline = steady_clock::now() + milliseconds(to.to), - nextdeadline = finaldeadline; - - curr_to.to = to_min.to; + milliseconds curr_to = rpc::to_min; + auto finaldeadline = steady_clock::now() + to, nextdeadline = finaldeadline; bool transmit = true; connection *ch = NULL; @@ -204,7 +196,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { if(finaldeadline == time_point::min()) break; - nextdeadline = steady_clock::now() + milliseconds(curr_to.to); + nextdeadline = steady_clock::now() + curr_to; if(nextdeadline > finaldeadline) { nextdeadline = finaldeadline; finaldeadline = time_point::min(); @@ -230,7 +222,7 @@ int rpcc::call1(proc_t proc, marshall &req, string &rep, TO to) { // on the new connection transmit = true; } - curr_to.to <<= 1; + curr_to *= 2; } { @@ -492,7 +484,7 @@ rpcs::dispatch(djob_t *j) // save the latest good connection to the client { - lock rwl(conss_m_); + lock rwl(conns_m_); if(conns_.find(h.clt_nonce) == conns_.end()){ c->incref(); conns_[h.clt_nonce] = c; @@ -537,7 +529,7 @@ rpcs::dispatch(djob_t *j) // get the latest connection to the client { - lock rwl(conss_m_); + lock rwl(conns_m_); if(c->isdead() && c != conns_[h.clt_nonce]){ c->decref(); c = conns_[h.clt_nonce]; diff --git a/rpc/rpc.h b/rpc/rpc.h index 065cabc..19ec96a 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -10,6 +10,11 @@ #include "marshall_wrap.h" #include "connection.h" +namespace rpc { + static constexpr milliseconds to_max{12000}; + static constexpr milliseconds to_min{100}; +} + class rpc_const { public: static const unsigned int bind = 1; // handler number reserved for bind @@ -26,7 +31,6 @@ class rpc_const { // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, class rpcc : public chanmgr { - private: //manages per rpc info @@ -66,52 +70,42 @@ class rpcc : public chanmgr { list xid_rep_window_; struct request { - request() { clear(); } void clear() { buf.clear(); xid = -1; } bool isvalid() { return xid != -1; } string buf; - int xid; + int xid = -1; }; - struct request dup_req_; + request dup_req_; int xid_rep_done_; + + int call1(proc_t proc, marshall &req, string &rep, milliseconds to); + + template + int call_m(proc_t proc, marshall &req, R & r, milliseconds to); public: rpcc(const string & d, bool retrans=true); ~rpcc(); - struct TO { - int to; - }; - static const TO to_max; - static const TO to_min; - static TO to(int x) { TO t; t.to = x; return t;} - unsigned int id() { return clt_nonce_; } - int bind(TO to = to_max); + int bind(milliseconds to = rpc::to_max); void set_reachable(bool r) { reachable_ = r; } void cancel(); - int islossy() { return lossytest_ > 0; } - - int call1(proc_t proc, marshall &req, string &rep, TO to); - bool got_pdu(connection *c, const string & b); - template - int call_m(proc_t proc, marshall &req, R & r, TO to); - template inline int call(proc_t proc, R & r, const Args&... args); template - inline int call_timeout(proc_t proc, TO to, R & r, const Args&... args); + inline int call_timeout(proc_t proc, milliseconds to, R & r, const Args&... args); }; template int -rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) +rpcc::call_m(proc_t proc, marshall &req, R & r, milliseconds to) { string rep; int intret = call1(proc, req, rep, to); @@ -130,11 +124,11 @@ rpcc::call_m(proc_t proc, marshall &req, R & r, TO to) template inline int rpcc::call(proc_t proc, R & r, const Args&... args) { - return call_timeout(proc, rpcc::to_max, r, args...); + return call_timeout(proc, rpc::to_max, r, args...); } template inline int -rpcc::call_timeout(proc_t proc, const rpcc::TO to, R & r, const Args&... args) +rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... args) { marshall m{args...}; return call_m(proc, m, r, to); @@ -196,7 +190,7 @@ class rpcs : public chanmgr { mutex procs_m_; // protect insert/delete to procs[] mutex count_m_; //protect modification of counts mutex reply_window_m_; // protect reply window et al - mutex conss_m_; // protect conns_ + mutex conns_m_; // protect conns_ protected: @@ -224,17 +218,15 @@ class rpcs : public chanmgr { bool got_pdu(connection *c, const string & b); - template void reg(proc_t proc, F f, C *c=nullptr); -}; + struct ReturnOnFailure { + static inline int unmarshall_args_failure() { + return rpc_const::unmarshal_args_failure; + } + }; -struct ReturnOnFailure { - static inline int unmarshall_args_failure() { - return rpc_const::unmarshal_args_failure; + template void reg(proc_t proc, F f, C *c=nullptr) { + reg1(proc, marshalled_func::wrap(f, c)); } }; -template void rpcs::reg(proc_t proc, F f, C *c) { - reg1(proc, marshalled_func::wrap(f, c)); -} - #endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc index 2f58e5d..723df82 100644 --- a/rpc/rpctest.cc +++ b/rpc/rpctest.cc @@ -166,7 +166,7 @@ client3(void *xx) for(int i = 0; i < 4; i++){ int rep = 0; - int ret = c->call_timeout(24, rpcc::to(300), rep, i); + int ret = c->call_timeout(24, milliseconds(300), rep, i); VERIFY(ret == rpc_const::timeout_failure || rep == i+2); } } @@ -187,14 +187,14 @@ simple_tests(rpcc *c) cout << " -- string concat RPC .. ok" << endl; // small request, big reply (perhaps req via UDP, reply via TCP) - intret = c->call_timeout(25, rpcc::to(20000), rep, 70000); + intret = c->call_timeout(25, milliseconds(20000), rep, 70000); VERIFY(intret == 0); VERIFY(rep.size() == 70000); cout << " -- small request, big reply .. ok" << endl; // specify a timeout value to an RPC that should succeed (udp) int xx = 0; - intret = c->call_timeout(23, rpcc::to(300), xx, 77); + intret = c->call_timeout(23, milliseconds(300), xx, 77); VERIFY(intret == 0 && xx == 78); cout << " -- no spurious timeout .. ok" << endl; @@ -202,7 +202,7 @@ simple_tests(rpcc *c) { string arg(1000, 'x'); string rep2; - c->call_timeout(22, rpcc::to(300), rep2, arg, (string)"x"); + c->call_timeout(22, milliseconds(300), rep2, arg, (string)"x"); VERIFY(rep2.size() == 1001); cout << " -- no spurious timeout .. ok" << endl; } @@ -217,7 +217,7 @@ simple_tests(rpcc *c) string non_existent = "127.0.0.1:7661"; rpcc *c1 = new rpcc(non_existent); time_t t0 = time(0); - intret = c1->bind(rpcc::to(300)); + intret = c1->bind(milliseconds(300)); time_t t1 = time(0); VERIFY(intret < 0 && (t1 - t0) <= 4); cout << " -- rpc timeout .. ok" << endl; @@ -285,7 +285,7 @@ failure_test() delete server; client1 = new rpcc(dst); - VERIFY (client1->bind(rpcc::to(3000)) < 0); + VERIFY (client1->bind(milliseconds(3000)) < 0); cout << " -- create new client and try to bind to failed server .. failed ok" << endl; delete client1; diff --git a/rsm.cc b/rsm.cc index 81f0e4c..54713cb 100644 --- a/rsm.cc +++ b/rsm.cc @@ -214,7 +214,7 @@ bool rsm::statetransfer(const string & m, lock & rsm_mutex_lock) rsm_mutex_lock.unlock(); cl = h.safebind(); if (cl) { - ret = cl->call_timeout(rsm_protocol::transferreq, rpcc::to(100), + ret = cl->call_timeout(rsm_protocol::transferreq, milliseconds(100), r, cfg->myaddr(), last_myvs, vid_insync); } rsm_mutex_lock.lock(); @@ -257,7 +257,7 @@ bool rsm::join(const string & m, lock & rsm_mutex_lock) { rsm_mutex_lock.unlock(); cl = h.safebind(); if (cl != 0) { - ret = cl->call_timeout(rsm_protocol::joinreq, rpcc::to(12000), log, + ret = cl->call_timeout(rsm_protocol::joinreq, milliseconds(12000), log, cfg->myaddr(), last_myvs); } rsm_mutex_lock.lock(); @@ -347,7 +347,7 @@ rsm_client_protocol::status rsm::client_invoke(string & r, int procno, const str if (!cl) return rsm_client_protocol::BUSY; int ignored_rval; - auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, rpcc::to(100), ignored_rval, procno, vs, req); + auto ret = (rsm_protocol::status)cl->call_timeout(rsm_protocol::invoke, milliseconds(100), ignored_rval, procno, vs, req); LOG("Invoke returned " << ret); if (ret != rsm_protocol::OK) return rsm_client_protocol::BUSY; diff --git a/rsm_client.cc b/rsm_client.cc index 01098d6..ae88169 100644 --- a/rsm_client.cc +++ b/rsm_client.cc @@ -26,7 +26,7 @@ rsm_protocol::status rsm_client::invoke(unsigned int proc, string &rep, const st rpcc *cl = h.safebind(); auto ret = rsm_client_protocol::OK; if (cl) - ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, rpcc::to(500), rep, proc, req); + ret = (rsm_client_protocol::status)cl->call_timeout(rsm_client_protocol::invoke, milliseconds(500), rep, proc, req); ml.lock(); if (!cl) @@ -61,7 +61,7 @@ bool rsm_client::init_members(lock & rsm_client_mutex_lock) { rsm_client_mutex_lock.unlock(); cl = h.safebind(); if (cl) - ret = cl->call_timeout(rsm_client_protocol::members, rpcc::to(100), known_mems, 0); + ret = cl->call_timeout(rsm_client_protocol::members, milliseconds(100), known_mems, 0); rsm_client_mutex_lock.lock(); } if (cl == 0 || ret != rsm_protocol::OK)