From: Peter Iannucci Date: Fri, 11 Oct 2013 19:54:18 +0000 (-0400) Subject: Refactoring X-Git-Url: http://xvm.mit.edu/gitweb/invirt/third/libt4.git/commitdiff_plain/6623ac357055b95ce4fc0cbe9c5dc15524a9f20c?ds=sidebyside Refactoring --- diff --git a/Makefile b/Makefile index 5c2080b..0262c17 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ EXTRA_TARGETS ?= all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS) -rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o +rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o rm -f $@ ar cq $@ $^ ranlib rpc/librpc.a diff --git a/rpc/connection.cc b/rpc/connection.cc index 6994f53..6c406a4 100644 --- a/rpc/connection.cc +++ b/rpc/connection.cc @@ -6,8 +6,8 @@ #include #include "marshall.h" -connection::connection(chanmgr *m1, int f1, int l1) -: mgr_(m1), fd_(f1), lossy_(l1) +connection::connection(connection_delegate *m1, socket_t && f1, int l1) +: mgr_(m1), fd_(move(f1)), lossy_(l1) { fd_.flags() |= O_NONBLOCK; @@ -15,7 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1) create_time_ = steady_clock::now(); - PollMgr::Instance().add_callback(fd_, CB_RDONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this); } connection::~connection() { @@ -24,6 +24,18 @@ connection::~connection() { VERIFY(!wpdu_.buf.size()); } +shared_ptr connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) { + socket_t s = socket(AF_INET, SOCK_STREAM, 0); + s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); + if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + close(s); + return nullptr; + } + IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); + return make_shared(mgr, move(s), lossy); +} + void connection::closeconn() { { lock ml(m_); @@ -34,7 +46,7 @@ void connection::closeconn() { } //after block_remove_fd, select will never wait on fd_ //and no callbacks will be active - PollMgr::Instance().block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd_); } bool connection::send(const string & b) { @@ -61,11 +73,11 @@ bool connection::send(const string & b) { if (!writepdu()) { dead_ = true; ml.unlock(); - PollMgr::Instance().block_remove_fd(fd_); + poll_mgr::shared_mgr.block_remove_fd(fd_); ml.lock(); } else if (wpdu_.solong != wpdu_.buf.size()) { // should be rare to need to explicitly add write callback - PollMgr::Instance().add_callback(fd_, CB_WRONLY, this); + poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this); while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size()) send_complete_.wait(ml); } @@ -83,11 +95,11 @@ void connection::write_cb(int s) { VERIFY(!dead_); VERIFY(fd_ == s); if (wpdu_.buf.size() == 0) { - PollMgr::Instance().del_callback(fd_,CB_WRONLY); + poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY); return; } if (!writepdu()) { - PollMgr::Instance().del_callback(fd_, CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR); dead_ = true; } else { VERIFY(wpdu_.solong != size_t_max); @@ -102,27 +114,25 @@ void connection::write_cb(int s) { void connection::read_cb(int s) { lock ml(m_); VERIFY(fd_ == s); - if (dead_) { + if (dead_) return; - } IF_LEVEL(5) LOG("got data on fd " << s); bool succ = true; - if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) { + if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) succ = readpdu(); - } if (!succ) { IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying"); - PollMgr::Instance().del_callback(fd_,CB_RDWR); + poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR); dead_ = true; send_complete_.notify_one(); } if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) { if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) { - // chanmgr has successfully consumed the pdu + // connection_delegate has successfully consumed the pdu rpdu_.buf.clear(); rpdu_.solong = 0; } @@ -195,10 +205,10 @@ bool connection::readpdu() { return true; } -tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) +tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest) : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest) { - struct sockaddr_in sin; + sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = hton(port); @@ -206,17 +216,15 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest) tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1); tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1); - struct timeval timeout = {0, 50000}; - - if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); - if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0) + if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0) perror("accept_loop setsockopt"); // careful to exactly match type signature of bind arguments so we don't // get std::bind instead - if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { + if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) { perror("accept_loop bind"); VERIFY(0); } @@ -293,8 +301,7 @@ void tcpsconn::accept_conn() { if (FD_ISSET(pipe_[0], &rfds)) return; - if (!FD_ISSET(tcp_, &rfds)) - VERIFY(0); + VERIFY(FD_ISSET(tcp_, &rfds)); try { process_accept(); @@ -304,16 +311,3 @@ void tcpsconn::accept_conn() { } } -shared_ptr connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) { - int s = socket(AF_INET, SOCK_STREAM, 0); - int yes = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); - if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { - IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - close(s); - return nullptr; - } - IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port)); - return make_shared(mgr, s, lossy); -} - diff --git a/rpc/connection.h b/rpc/connection.h index b1df8a1..97bacbb 100644 --- a/rpc/connection.h +++ b/rpc/connection.h @@ -4,7 +4,7 @@ #include "types.h" #include #include -#include "pollmgr.h" +#include "poll_mgr.h" #include "file.h" constexpr size_t size_t_max = numeric_limits::max(); @@ -13,10 +13,10 @@ class thread_exit_exception : exception {}; class connection; -class chanmgr { +class connection_delegate { public: virtual bool got_pdu(const shared_ptr & c, const string & b) = 0; - virtual ~chanmgr() {} + virtual ~connection_delegate() {} }; class connection : public aio_callback, public enable_shared_from_this { @@ -26,7 +26,7 @@ class connection : public aio_callback, public enable_shared_from_this create_time() const { return create_time_; } + static shared_ptr to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0); + private: bool readpdu(); bool writepdu(); - chanmgr *mgr_; + connection_delegate *mgr_; const file_t fd_; bool dead_ = false; @@ -63,7 +65,7 @@ class connection : public aio_callback, public enable_shared_from_this> conns_; void process_accept(); }; - -struct bundle { - bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} - chanmgr *mgr; - int tcp; - int lossy; -}; - -shared_ptr connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); #endif diff --git a/rpc/pollmgr.cc b/rpc/poll_mgr.cc similarity index 93% rename from rpc/pollmgr.cc rename to rpc/poll_mgr.cc index aeaf7b3..94b24e3 100644 --- a/rpc/pollmgr.cc +++ b/rpc/poll_mgr.cc @@ -1,4 +1,4 @@ -#include "pollmgr.h" +#include "poll_mgr.h" #include #include #include "file.h" @@ -7,9 +7,7 @@ #include #endif -static PollMgr instance; - -PollMgr & PollMgr::Instance() { return instance; } +poll_mgr poll_mgr::shared_mgr; class wait_manager { public: @@ -51,11 +49,11 @@ class EPollAIO : public wait_manager { #endif -PollMgr::PollMgr() : aio_(new SelectAIO()) { - th_ = thread(&PollMgr::wait_loop, this); +poll_mgr::poll_mgr() : aio_(new SelectAIO()) { + th_ = thread(&poll_mgr::wait_loop, this); } -PollMgr::~PollMgr() +poll_mgr::~poll_mgr() { lock ml(m_); for (auto p : callbacks_) @@ -63,12 +61,12 @@ PollMgr::~PollMgr() pending_change_ = true; shutdown_ = true; changedone_c_.wait(ml); - delete aio_; + aio_ = nullptr; th_.join(); } void -PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) +poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch) { lock ml(m_); aio_->watch_fd(fd, flag); @@ -79,7 +77,7 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) // Remove all callbacks related to fd. After this returns, we guarantee that // callbacks related to fd will never be called again. -void PollMgr::block_remove_fd(int fd) { +void poll_mgr::block_remove_fd(int fd) { lock ml(m_); aio_->unwatch_fd(fd, CB_RDWR); pending_change_ = true; @@ -87,13 +85,13 @@ void PollMgr::block_remove_fd(int fd) { callbacks_[fd] = nullptr; } -void PollMgr::del_callback(int fd, poll_flag flag) { +void poll_mgr::del_callback(int fd, poll_flag flag) { lock ml(m_); if (aio_->unwatch_fd(fd, flag)) callbacks_[fd] = nullptr; } -void PollMgr::wait_loop() { +void poll_mgr::wait_loop() { vector readable; vector writable; aio_callback * cb; diff --git a/rpc/pollmgr.h b/rpc/poll_mgr.h similarity index 80% rename from rpc/pollmgr.h rename to rpc/poll_mgr.h index ede35f8..bd451cf 100644 --- a/rpc/pollmgr.h +++ b/rpc/poll_mgr.h @@ -1,5 +1,5 @@ -#ifndef pollmgr_h -#define pollmgr_h +#ifndef poll_mgr_h +#define poll_mgr_h #include "types.h" @@ -20,12 +20,12 @@ class aio_callback { virtual ~aio_callback() {} }; -class PollMgr { +class poll_mgr { public: - PollMgr(); - ~PollMgr(); + poll_mgr(); + ~poll_mgr(); - static PollMgr & Instance(); + static poll_mgr shared_mgr; void add_callback(int fd, poll_flag flag, aio_callback *ch); void del_callback(int fd, poll_flag flag); @@ -37,7 +37,7 @@ class PollMgr { cond changedone_c_; map callbacks_; - class wait_manager *aio_; + unique_ptr aio_; bool pending_change_=false, shutdown_=false; thread th_; diff --git a/rpc/rpc.cc b/rpc/rpc.cc index a08e287..abbe470 100644 --- a/rpc/rpc.cc +++ b/rpc/rpc.cc @@ -264,7 +264,7 @@ rpcc::get_refconn(shared_ptr & ch) { lock ml(chan_m_); if (!chan_ || chan_->isdead()) - chan_ = connect_to_dst(dst_, this, lossytest_); + chan_ = connection::to_dst(dst_, this, lossytest_); if (chan_) ch = chan_; @@ -342,7 +342,7 @@ rpcs::rpcs(in_port_t p1, size_t count) IF_LEVEL(2) LOG("created with nonce " << nonce_); reg(rpc_const::bind, &rpcs::rpcbind, this); - dispatchpool_ = unique_ptr(new ThrPool(6, false)); + dispatchpool_ = unique_ptr(new thread_pool(6, false)); } void rpcs::start() { diff --git a/rpc/rpc.h b/rpc/rpc.h index 02c7c62..4f9a231 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -30,7 +30,7 @@ class rpc_const { // rpc client endpoint. // manages a xid space per destination socket // threaded: multiple threads can be sending RPCs, -class rpcc : public chanmgr { +class rpcc : public connection_delegate { private: //manages per rpc info @@ -135,7 +135,7 @@ rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... arg } // rpc server endpoint. -class rpcs : public chanmgr { +class rpcs : public connection_delegate { typedef enum { NEW, // new RPC, not a duplicate @@ -200,7 +200,7 @@ class rpcs : public chanmgr { // internal handler registration void reg1(proc_t proc, handler *); - unique_ptr dispatchpool_; + unique_ptr dispatchpool_; unique_ptr listener_; public: diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc index 64b3263..4988dab 100644 --- a/rpc/thr_pool.cc +++ b/rpc/thr_pool.cc @@ -2,15 +2,15 @@ // if blocking, then addJob() blocks when queue is full // otherwise, addJob() simply returns false when queue is full -ThrPool::ThrPool(size_t sz, bool blocking) +thread_pool::thread_pool(size_t sz, bool blocking) : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) { for (size_t i=0; i job_t; -class ThrPool { +class thread_pool { public: - ThrPool(size_t sz, bool blocking=true); - ~ThrPool(); + thread_pool(size_t sz, bool blocking=true); + ~thread_pool(); bool addJob(const job_t &j); diff --git a/types.h b/types.h index d2d5411..3ad73fb 100644 --- a/types.h +++ b/types.h @@ -66,6 +66,7 @@ using std::enable_shared_from_this; using std::make_shared; using std::shared_ptr; using std::unique_ptr; +using std::weak_ptr; #include using std::mutex;