Refactoring
authorPeter Iannucci <iannucci@mit.edu>
Fri, 11 Oct 2013 19:54:18 +0000 (15:54 -0400)
committerPeter Iannucci <iannucci@mit.edu>
Fri, 11 Oct 2013 19:54:18 +0000 (15:54 -0400)
Makefile
rpc/connection.cc
rpc/connection.h
rpc/poll_mgr.cc [moved from rpc/pollmgr.cc with 93% similarity]
rpc/poll_mgr.h [moved from rpc/pollmgr.h with 80% similarity]
rpc/rpc.cc
rpc/rpc.h
rpc/thr_pool.cc
rpc/thr_pool.h
types.h

index 5c2080b..0262c17 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@ EXTRA_TARGETS ?=
 
 all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
 
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/pollmgr.o rpc/thr_pool.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o
        rm -f $@
        ar cq $@ $^
        ranlib rpc/librpc.a
index 6994f53..6c406a4 100644 (file)
@@ -6,8 +6,8 @@
 #include <unistd.h>
 #include "marshall.h"
 
-connection::connection(chanmgr *m1, int f1, int l1)
-: mgr_(m1), fd_(f1), lossy_(l1)
+connection::connection(connection_delegate *m1, socket_t && f1, int l1)
+: mgr_(m1), fd_(move(f1)), lossy_(l1)
 {
     fd_.flags() |= O_NONBLOCK;
 
@@ -15,7 +15,7 @@ connection::connection(chanmgr *m1, int f1, int l1)
 
     create_time_ = steady_clock::now();
 
-    PollMgr::Instance().add_callback(fd_, CB_RDONLY, this);
+    poll_mgr::shared_mgr.add_callback(fd_, CB_RDONLY, this);
 }
 
 connection::~connection() {
@@ -24,6 +24,18 @@ connection::~connection() {
     VERIFY(!wpdu_.buf.size());
 }
 
+shared_ptr<connection> connection::to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy) {
+    socket_t s = socket(AF_INET, SOCK_STREAM, 0);
+    s.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
+    if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
+        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+        close(s);
+        return nullptr;
+    }
+    IF_LEVEL(2) LOG_NONMEMBER("connection::to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
+    return make_shared<connection>(mgr, move(s), lossy);
+}
+
 void connection::closeconn() {
     {
         lock ml(m_);
@@ -34,7 +46,7 @@ void connection::closeconn() {
     }
     //after block_remove_fd, select will never wait on fd_
     //and no callbacks will be active
-    PollMgr::Instance().block_remove_fd(fd_);
+    poll_mgr::shared_mgr.block_remove_fd(fd_);
 }
 
 bool connection::send(const string & b) {
@@ -61,11 +73,11 @@ bool connection::send(const string & b) {
     if (!writepdu()) {
         dead_ = true;
         ml.unlock();
-        PollMgr::Instance().block_remove_fd(fd_);
+        poll_mgr::shared_mgr.block_remove_fd(fd_);
         ml.lock();
     } else if (wpdu_.solong != wpdu_.buf.size()) {
         // should be rare to need to explicitly add write callback
-        PollMgr::Instance().add_callback(fd_, CB_WRONLY, this);
+        poll_mgr::shared_mgr.add_callback(fd_, CB_WRONLY, this);
         while (!dead_ && wpdu_.solong != size_t_max && wpdu_.solong < wpdu_.buf.size())
             send_complete_.wait(ml);
     }
@@ -83,11 +95,11 @@ void connection::write_cb(int s) {
     VERIFY(!dead_);
     VERIFY(fd_ == s);
     if (wpdu_.buf.size() == 0) {
-        PollMgr::Instance().del_callback(fd_,CB_WRONLY);
+        poll_mgr::shared_mgr.del_callback(fd_,CB_WRONLY);
         return;
     }
     if (!writepdu()) {
-        PollMgr::Instance().del_callback(fd_, CB_RDWR);
+        poll_mgr::shared_mgr.del_callback(fd_, CB_RDWR);
         dead_ = true;
     } else {
         VERIFY(wpdu_.solong != size_t_max);
@@ -102,27 +114,25 @@ void connection::write_cb(int s) {
 void connection::read_cb(int s) {
     lock ml(m_);
     VERIFY(fd_ == s);
-    if (dead_)  {
+    if (dead_)
         return;
-    }
 
     IF_LEVEL(5) LOG("got data on fd " << s);
 
     bool succ = true;
-    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size()) {
+    if (!rpdu_.buf.size() || rpdu_.solong < rpdu_.buf.size())
         succ = readpdu();
-    }
 
     if (!succ) {
         IF_LEVEL(5) LOG("readpdu on fd " << s << " failed; dying");
-        PollMgr::Instance().del_callback(fd_,CB_RDWR);
+        poll_mgr::shared_mgr.del_callback(fd_,CB_RDWR);
         dead_ = true;
         send_complete_.notify_one();
     }
 
     if (rpdu_.buf.size() && rpdu_.buf.size() == rpdu_.solong) {
         if (mgr_->got_pdu(shared_from_this(), rpdu_.buf)) {
-            // chanmgr has successfully consumed the pdu
+            // connection_delegate has successfully consumed the pdu
             rpdu_.buf.clear();
             rpdu_.solong = 0;
         }
@@ -195,10 +205,10 @@ bool connection::readpdu() {
     return true;
 }
 
-tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
+tcpsconn::tcpsconn(connection_delegate *m1, in_port_t port, int lossytest)
 : tcp_(socket(AF_INET, SOCK_STREAM, 0)), mgr_(m1), lossy_(lossytest)
 {
-    struct sockaddr_in sin;
+    sockaddr_in sin;
     memset(&sin, 0, sizeof(sin));
     sin.sin_family = AF_INET;
     sin.sin_port = hton(port);
@@ -206,17 +216,15 @@ tcpsconn::tcpsconn(chanmgr *m1, in_port_t port, int lossytest)
     tcp_.setsockopt(SOL_SOCKET, SO_REUSEADDR, (int)1);
     tcp_.setsockopt(IPPROTO_TCP, TCP_NODELAY, (int)1);
 
-    struct timeval timeout = {0, 50000};
-
-    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout) < 0)
+    if (tcp_.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeval{0, 50000}) < 0)
         perror("accept_loop setsockopt");
 
-    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout) < 0)
+    if (tcp_.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeval{0, 50000}) < 0)
         perror("accept_loop setsockopt");
 
     // careful to exactly match type signature of bind arguments so we don't
     // get std::bind instead
-    if (bind((int)tcp_, (const struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+    if (bind((int)tcp_, (const sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
         perror("accept_loop bind");
         VERIFY(0);
     }
@@ -293,8 +301,7 @@ void tcpsconn::accept_conn() {
         if (FD_ISSET(pipe_[0], &rfds))
             return;
 
-        if (!FD_ISSET(tcp_, &rfds))
-            VERIFY(0);
+        VERIFY(FD_ISSET(tcp_, &rfds));
 
         try {
             process_accept();
@@ -304,16 +311,3 @@ void tcpsconn::accept_conn() {
     }
 }
 
-shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) {
-    int s = socket(AF_INET, SOCK_STREAM, 0);
-    int yes = 1;
-    setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
-    if (connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) {
-        IF_LEVEL(1) LOG_NONMEMBER("failed to " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
-        close(s);
-        return nullptr;
-    }
-    IF_LEVEL(2) LOG_NONMEMBER("connect_to_dst fd=" << s << " to dst " << inet_ntoa(dst.sin_addr) << ":" << ntoh(dst.sin_port));
-    return make_shared<connection>(mgr, s, lossy);
-}
-
index b1df8a1..97bacbb 100644 (file)
@@ -4,7 +4,7 @@
 #include "types.h"
 #include <arpa/inet.h>
 #include <netinet/in.h>
-#include "pollmgr.h"
+#include "poll_mgr.h"
 #include "file.h"
 
 constexpr size_t size_t_max = numeric_limits<size_t>::max();
@@ -13,10 +13,10 @@ class thread_exit_exception : exception {};
 
 class connection;
 
-class chanmgr {
+class connection_delegate {
     public:
         virtual bool got_pdu(const shared_ptr<connection> & c, const string & b) = 0;
-        virtual ~chanmgr() {}
+        virtual ~connection_delegate() {}
 };
 
 class connection : public aio_callback, public enable_shared_from_this<connection> {
@@ -26,7 +26,7 @@ class connection : public aio_callback, public enable_shared_from_this<connectio
             size_t solong = 0; // number of bytes written or read so far
         };
 
-        connection(chanmgr *m1, int f1, int lossytest=0);
+        connection(connection_delegate *m1, socket_t && f1, int lossytest=0);
         ~connection();
 
         int channo() { return fd_; }
@@ -39,12 +39,14 @@ class connection : public aio_callback, public enable_shared_from_this<connectio
 
         time_point<steady_clock> create_time() const { return create_time_; }
 
+        static shared_ptr<connection> to_dst(const sockaddr_in &dst, connection_delegate *mgr, int lossy=0);
+
     private:
 
         bool readpdu();
         bool writepdu();
 
-        chanmgr *mgr_;
+        connection_delegate *mgr_;
         const file_t fd_;
         bool dead_ = false;
 
@@ -63,7 +65,7 @@ class connection : public aio_callback, public enable_shared_from_this<connectio
 
 class tcpsconn {
     public:
-        tcpsconn(chanmgr *m1, in_port_t port, int lossytest=0);
+        tcpsconn(connection_delegate *m1, in_port_t port, int lossytest=0);
         ~tcpsconn();
         inline in_port_t port() { return port_; }
         void accept_conn();
@@ -74,19 +76,10 @@ class tcpsconn {
         file_t pipe_[2];
 
         socket_t tcp_; // listens for connections
-        chanmgr *mgr_;
+        connection_delegate *mgr_;
         int lossy_;
         map<int, shared_ptr<connection>> conns_;
 
         void process_accept();
 };
-
-struct bundle {
-    bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {}
-    chanmgr *mgr;
-    int tcp;
-    int lossy;
-};
-
-shared_ptr<connection> connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0);
 #endif
similarity index 93%
rename from rpc/pollmgr.cc
rename to rpc/poll_mgr.cc
index aeaf7b3..94b24e3 100644 (file)
@@ -1,4 +1,4 @@
-#include "pollmgr.h"
+#include "poll_mgr.h"
 #include <errno.h>
 #include <sys/select.h>
 #include "file.h"
@@ -7,9 +7,7 @@
 #include <sys/epoll.h>
 #endif
 
-static PollMgr instance;
-
-PollMgr & PollMgr::Instance() { return instance; }
+poll_mgr poll_mgr::shared_mgr;
 
 class wait_manager {
     public:
@@ -51,11 +49,11 @@ class EPollAIO : public wait_manager {
 #endif
 
 
-PollMgr::PollMgr() : aio_(new SelectAIO()) {
-    th_ = thread(&PollMgr::wait_loop, this);
+poll_mgr::poll_mgr() : aio_(new SelectAIO()) {
+    th_ = thread(&poll_mgr::wait_loop, this);
 }
 
-PollMgr::~PollMgr()
+poll_mgr::~poll_mgr()
 {
     lock ml(m_);
     for (auto p : callbacks_)
@@ -63,12 +61,12 @@ PollMgr::~PollMgr()
     pending_change_ = true;
     shutdown_ = true;
     changedone_c_.wait(ml);
-    delete aio_;
+    aio_ = nullptr;
     th_.join();
 }
 
 void
-PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
+poll_mgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 {
     lock ml(m_);
     aio_->watch_fd(fd, flag);
@@ -79,7 +77,7 @@ PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 
 // Remove all callbacks related to fd.  After this returns, we guarantee that
 // callbacks related to fd will never be called again.
-void PollMgr::block_remove_fd(int fd) {
+void poll_mgr::block_remove_fd(int fd) {
     lock ml(m_);
     aio_->unwatch_fd(fd, CB_RDWR);
     pending_change_ = true;
@@ -87,13 +85,13 @@ void PollMgr::block_remove_fd(int fd) {
     callbacks_[fd] = nullptr;
 }
 
-void PollMgr::del_callback(int fd, poll_flag flag) {
+void poll_mgr::del_callback(int fd, poll_flag flag) {
     lock ml(m_);
     if (aio_->unwatch_fd(fd, flag))
         callbacks_[fd] = nullptr;
 }
 
-void PollMgr::wait_loop() {
+void poll_mgr::wait_loop() {
     vector<int> readable;
     vector<int> writable;
     aio_callback * cb;
similarity index 80%
rename from rpc/pollmgr.h
rename to rpc/poll_mgr.h
index ede35f8..bd451cf 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef pollmgr_h
-#define pollmgr_h 
+#ifndef poll_mgr_h
+#define poll_mgr_h 
 
 #include "types.h"
 
@@ -20,12 +20,12 @@ class aio_callback {
         virtual ~aio_callback() {}
 };
 
-class PollMgr {
+class poll_mgr {
     public:
-        PollMgr();
-        ~PollMgr();
+        poll_mgr();
+        ~poll_mgr();
 
-        static PollMgr & Instance();
+        static poll_mgr shared_mgr;
 
         void add_callback(int fd, poll_flag flag, aio_callback *ch);
         void del_callback(int fd, poll_flag flag);
@@ -37,7 +37,7 @@ class PollMgr {
         cond changedone_c_;
 
         map<int, aio_callback *> callbacks_;
-        class wait_manager *aio_;
+        unique_ptr<class wait_manager> aio_;
         bool pending_change_=false, shutdown_=false;
 
         thread th_;
index a08e287..abbe470 100644 (file)
@@ -264,7 +264,7 @@ rpcc::get_refconn(shared_ptr<connection> & ch)
 {
     lock ml(chan_m_);
     if (!chan_ || chan_->isdead())
-        chan_ = connect_to_dst(dst_, this, lossytest_);
+        chan_ = connection::to_dst(dst_, this, lossytest_);
 
     if (chan_)
         ch = chan_;
@@ -342,7 +342,7 @@ rpcs::rpcs(in_port_t p1, size_t count)
     IF_LEVEL(2) LOG("created with nonce " << nonce_);
 
     reg(rpc_const::bind, &rpcs::rpcbind, this);
-    dispatchpool_ = unique_ptr<ThrPool>(new ThrPool(6, false));
+    dispatchpool_ = unique_ptr<thread_pool>(new thread_pool(6, false));
 }
 
 void rpcs::start() {
index 02c7c62..4f9a231 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -30,7 +30,7 @@ class rpc_const {
 // rpc client endpoint.
 // manages a xid space per destination socket
 // threaded: multiple threads can be sending RPCs,
-class rpcc : public chanmgr {
+class rpcc : public connection_delegate {
     private:
 
         //manages per rpc info
@@ -135,7 +135,7 @@ rpcc::call_timeout(proc_t proc, const milliseconds to, R & r, const Args&... arg
 }
 
 // rpc server endpoint.
-class rpcs : public chanmgr {
+class rpcs : public connection_delegate {
 
     typedef enum {
         NEW,  // new RPC, not a duplicate
@@ -200,7 +200,7 @@ class rpcs : public chanmgr {
     // internal handler registration
     void reg1(proc_t proc, handler *);
 
-    unique_ptr<ThrPool> dispatchpool_;
+    unique_ptr<thread_pool> dispatchpool_;
     unique_ptr<tcpsconn> listener_;
 
     public:
index 64b3263..4988dab 100644 (file)
@@ -2,15 +2,15 @@
 
 // if blocking, then addJob() blocks when queue is full
 // otherwise, addJob() simply returns false when queue is full
-ThrPool::ThrPool(size_t sz, bool blocking)
+thread_pool::thread_pool(size_t sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
     for (size_t i=0; i<nthreads_; i++)
-        th_.emplace_back(&ThrPool::do_worker, this);
+        th_.emplace_back(&thread_pool::do_worker, this);
 }
 
 // IMPORTANT: this function can be called only when no external thread 
 // will ever use this thread pool again or is currently blocking on it
-ThrPool::~ThrPool() {
+thread_pool::~thread_pool() {
     for (size_t i=0; i<nthreads_; i++)
         jobq_.enq(job_t());
 
@@ -18,11 +18,11 @@ ThrPool::~ThrPool() {
         th_[i].join();
 }
 
-bool ThrPool::addJob(const job_t &j) {
+bool thread_pool::addJob(const job_t &j) {
     return jobq_.enq(j,blockadd_);
 }
 
-void ThrPool::do_worker() {
+void thread_pool::do_worker() {
     job_t j;
     while (1) {
         jobq_.deq(&j);
index 5950a9c..28c5236 100644 (file)
@@ -6,10 +6,10 @@
 
 typedef function<void()> job_t;
 
-class ThrPool {
+class thread_pool {
     public:
-        ThrPool(size_t sz, bool blocking=true);
-        ~ThrPool();
+        thread_pool(size_t sz, bool blocking=true);
+        ~thread_pool();
 
         bool addJob(const job_t &j);
 
diff --git a/types.h b/types.h
index d2d5411..3ad73fb 100644 (file)
--- a/types.h
+++ b/types.h
@@ -66,6 +66,7 @@ using std::enable_shared_from_this;
 using std::make_shared;
 using std::shared_ptr;
 using std::unique_ptr;
+using std::weak_ptr;
 
 #include <mutex>
 using std::mutex;