Clean-ups
authorPeter Iannucci <iannucci@mit.edu>
Thu, 19 Dec 2013 08:41:54 +0000 (00:41 -0800)
committerPeter Iannucci <iannucci@mit.edu>
Thu, 19 Dec 2013 08:41:54 +0000 (00:41 -0800)
Makefile
lock_client.cc
lock_server.cc
rpc/fifo.h
rpc/rpc.h
rpc/thr_pool.cc [deleted file]
rpc/thread_pool.cc [new file with mode: 0644]
rpc/thread_pool.h [moved from rpc/thr_pool.h with 61% similarity]

index e41486f..caf3439 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@ EXTRA_TARGETS ?=
 
 all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
 
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
        rm -f $@
        ar cq $@ $^
        ranlib rpc/librpc.a
index e1bd62f..b29fff4 100644 (file)
@@ -46,15 +46,7 @@ lock_client::~lock_client() {
 }
 
 void lock_client::releaser() {
-    while (1) {
-        maybe<lock_protocol::lockid_t> mlid;
-        release_fifo.deq(&mlid);
-
-        if (!mlid) {
-            LOG << "Releaser stopping";
-            break;
-        }
-
+    while (auto mlid = release_fifo.deq()) {
         lock_protocol::lockid_t lid = mlid;
         LOG << "Releaser: " << lid;
 
@@ -74,6 +66,7 @@ void lock_client::releaser() {
         LOG << "Lock " << lid << ": none";
         st.signal();
     }
+    LOG << "Releaser stopping";
 }
 
 int lock_client::stat(lock_protocol::lockid_t lid) {
index 141a598..efb23f5 100644 (file)
@@ -39,8 +39,7 @@ lock_server::lock_server(rsm & r) : rsm_ (&r) {
 
 void lock_server::revoker () {
     while (1) {
-        lock_protocol::lockid_t lid;
-        revoke_fifo.deq(&lid);
+        lock_protocol::lockid_t lid = revoke_fifo.deq();
         LOG << "Revoking " << lid;
         if (rsm_ && !rsm_->amiprimary())
             continue;
@@ -60,8 +59,7 @@ void lock_server::revoker () {
 
 void lock_server::retryer() {
     while (1) {
-        lock_protocol::lockid_t lid;
-        retry_fifo.deq(&lid);
+        lock_protocol::lockid_t lid = retry_fifo.deq();
         if (rsm_ && !rsm_->amiprimary())
             continue;
 
index dfb4d05..9e4933a 100644 (file)
@@ -3,7 +3,6 @@
 
 #include "types.h"
 
-// blocks enq() and deq() when queue is FULL or EMPTY
 template<class T>
 class fifo {
     public:
@@ -21,19 +20,15 @@ class fifo {
             return true;
         }
 
-        void deq(T * e) {
+        T deq() {
             lock ml(m_);
             while(q_.empty())
                 non_empty_c_.wait(ml);
-            *e = q_.front();
+            T t = q_.front();
             q_.pop_front();
             if (max_ && q_.size() < max_)
                 has_space_c_.notify_one();
-        }
-
-        bool size() {
-            lock ml(m_);
-            return q_.size();
+            return t;
         }
 
     private:
index f1eb3bc..df5d89e 100644 (file)
--- a/rpc/rpc.h
+++ b/rpc/rpc.h
@@ -2,11 +2,9 @@
 #define rpc_h
 
 #include "types.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
 
 #include "rpc_protocol.h"
-#include "thr_pool.h"
+#include "thread_pool.h"
 #include "marshall.h"
 #include "marshall_wrap.h"
 #include "connection.h"
@@ -203,7 +201,7 @@ class rpcs : private connection_delegate {
 
         void dispatch(shared_ptr<connection> c, const string & buf);
 
-        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
+        unique_ptr<thread_pool> dispatchpool_{new thread_pool(6)};
         unique_ptr<connection_listener> listener_;
 
         // RPC handler for clients binding
diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc
deleted file mode 100644 (file)
index fc7be3d..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "thr_pool.h"
-
-// if blocking, then addJob() blocks when queue is full
-// otherwise, addJob() simply returns false when queue is full
-thread_pool::thread_pool(size_t sz, bool blocking)
-: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
-    for (size_t i=0; i<nthreads_; i++)
-        th_.emplace_back(&thread_pool::do_worker, this);
-}
-
-// IMPORTANT: this function can be called only when no external thread 
-// will ever use this thread pool again or is currently blocking on it
-thread_pool::~thread_pool() {
-    for (size_t i=0; i<nthreads_; i++)
-        jobq_.enq(job_t());
-
-    for (size_t i=0; i<nthreads_; i++)
-        th_[i].join();
-}
-
-bool thread_pool::addJob(const job_t & j) {
-    return jobq_.enq(j,blockadd_);
-}
-
-void thread_pool::do_worker() {
-    job_t j;
-    while (1) {
-        jobq_.deq(&j);
-        if (!j)
-            break;
-        j();
-    }
-}
diff --git a/rpc/thread_pool.cc b/rpc/thread_pool.cc
new file mode 100644 (file)
index 0000000..2c4ab06
--- /dev/null
@@ -0,0 +1,25 @@
+#include "thread_pool.h"
+
+thread_pool::thread_pool(size_t sz) : th_(sz) {
+    for (auto & t : th_)
+        t = thread(&thread_pool::do_worker, this);
+}
+
+// this function can be called only when no external thread will
+// ever use this thread pool again or is currently blocking on it
+thread_pool::~thread_pool() {
+    for (size_t i=0; i<th_.size(); i++)
+        jobq_.enq(job_t());
+
+    for (auto & t : th_)
+        t.join();
+}
+
+bool thread_pool::addJob(const job_t & j) {
+    return jobq_.enq(j, false);
+}
+
+void thread_pool::do_worker() {
+    while (auto j = jobq_.deq())
+        j();
+}
similarity index 61%
rename from rpc/thr_pool.h
rename to rpc/thread_pool.h
index 9525032..98ec655 100644 (file)
@@ -1,25 +1,21 @@
-#ifndef thr_pool_h
-#define thr_pool_h
+#ifndef thread_pool_h
+#define thread_pool_h
 
-#include "types.h"
+#include <functional>
+#include <vector>
 #include "fifo.h"
 
 typedef std::function<void()> job_t;
 
 class thread_pool {
     public:
-        thread_pool(size_t sz, bool blocking=true);
+        thread_pool(size_t sz);
         ~thread_pool();
-
         bool addJob(const job_t & j);
 
     private:
-        size_t nthreads_;
-        bool blockadd_;
-
         fifo<job_t> jobq_;
         std::vector<thread> th_;
-
         void do_worker();
 };