Major clean-ups. Migrating to C++11.
[invirt/third/libt4.git] / rpc / thr_pool.cc
1 #include "thr_pool.h"
2 #include <stdlib.h>
3 #include <errno.h>
4 #include "lang/verify.h"
5
6 static void
7 do_worker(void *arg)
8 {
9         ThrPool *tp = (ThrPool *)arg;
10         while (1) {
11                 ThrPool::job_t j;
12                 if (!tp->takeJob(&j))
13                         break; //die
14
15                 (void)(j.f)(j.a);
16         }
17 }
18
19 //if blocking, then addJob() blocks when queue is full
20 //otherwise, addJob() simply returns false when queue is full
21 ThrPool::ThrPool(int sz, bool blocking)
22 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
23 {
24         for (int i = 0; i < sz; i++) {
25         th_.push_back(std::thread(do_worker, this));
26         }
27 }
28
29 //IMPORTANT: this function can be called only when no external thread 
30 //will ever use this thread pool again or is currently blocking on it
31 ThrPool::~ThrPool()
32 {
33         for (int i = 0; i < nthreads_; i++) {
34                 job_t j;
35                 j.f = (void (*)(void *))NULL; //poison pill to tell worker threads to exit
36                 jobq_.enq(j);
37         }
38
39         for (int i = 0; i < nthreads_; i++) {
40         th_[i].join();
41         }
42 }
43
44 bool 
45 ThrPool::addJob(void (*f)(void *), void *a)
46 {
47         job_t j;
48         j.f = f;
49         j.a = a;
50
51         return jobq_.enq(j,blockadd_);
52 }
53
54 bool 
55 ThrPool::takeJob(job_t *j)
56 {
57         jobq_.deq(j);
58         return (j->f!=NULL);
59 }
60