Lots of clean-ups and simplifications
[invirt/third/libt4.git] / rpc / thr_pool.cc
index 26226cd..146764f 100644 (file)
@@ -3,58 +3,40 @@
 #include <errno.h>
 #include "lang/verify.h"
 
-static void
-do_worker(void *arg)
-{
-       ThrPool *tp = (ThrPool *)arg;
-       while (1) {
-               ThrPool::job_t j;
-               if (!tp->takeJob(&j))
-                       break; //die
-
-               (void)(j.f)(j.a);
-       }
-}
-
-//if blocking, then addJob() blocks when queue is full
-//otherwise, addJob() simply returns false when queue is full
+// if blocking, then addJob() blocks when queue is full
+// otherwise, addJob() simply returns false when queue is full
 ThrPool::ThrPool(int sz, bool blocking)
 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
 {
-       for (int i = 0; i < sz; i++) {
-        th_.push_back(std::thread(do_worker, this));
-       }
+       for (int i=0; i<nthreads_; i++)
+        th_.push_back(std::thread(&ThrPool::do_worker, this));
 }
 
-//IMPORTANT: this function can be called only when no external thread 
-//will ever use this thread pool again or is currently blocking on it
+// IMPORTANT: this function can be called only when no external thread 
+// will ever use this thread pool again or is currently blocking on it
 ThrPool::~ThrPool()
 {
-       for (int i = 0; i < nthreads_; i++) {
-               job_t j;
-               j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
-               jobq_.enq(j);
-       }
+       for (int i=0; i<nthreads_; i++)
+               jobq_.enq(job_t());
 
-       for (int i = 0; i < nthreads_; i++) {
+       for (int i=0; i<nthreads_; i++)
         th_[i].join();
-       }
 }
 
 bool 
-ThrPool::addJob(void (*f)(void *), void *a)
+ThrPool::addJob(const job_t &j)
 {
-       job_t j;
-       j.f = f;
-       j.a = a;
-
        return jobq_.enq(j,blockadd_);
 }
 
-bool 
-ThrPool::takeJob(job_t *j)
+void
+ThrPool::do_worker()
 {
-       jobq_.deq(j);
-       return (j->f!=NULL);
+    job_t j;
+       while (1) {
+        jobq_.deq(&j);
+               if (!j)
+                       break;
+               j();
+       }
 }
-