Imported from 6.824 labs
[invirt/third/libt4.git] / rpc / thr_pool.cc
1 #include "slock.h"
2 #include "thr_pool.h"
3 #include <stdlib.h>
4 #include <errno.h>
5 #include "lang/verify.h"
6
7 static void *
8 do_worker(void *arg)
9 {
10         ThrPool *tp = (ThrPool *)arg;
11         while (1) {
12                 ThrPool::job_t j;
13                 if (!tp->takeJob(&j))
14                         break; //die
15
16                 (void)(j.f)(j.a);
17         }
18         pthread_exit(NULL);
19 }
20
21 //if blocking, then addJob() blocks when queue is full
22 //otherwise, addJob() simply returns false when queue is full
23 ThrPool::ThrPool(int sz, bool blocking)
24 : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
25 {
26         pthread_attr_init(&attr_);
27         pthread_attr_setstacksize(&attr_, 128<<10);
28
29         for (int i = 0; i < sz; i++) {
30                 pthread_t t;
31                 VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0);
32                 th_.push_back(t);
33         }
34 }
35
36 //IMPORTANT: this function can be called only when no external thread 
37 //will ever use this thread pool again or is currently blocking on it
38 ThrPool::~ThrPool()
39 {
40         for (int i = 0; i < nthreads_; i++) {
41                 job_t j;
42                 j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit
43                 jobq_.enq(j);
44         }
45
46         for (int i = 0; i < nthreads_; i++) {
47                 VERIFY(pthread_join(th_[i], NULL)==0);
48         }
49
50         VERIFY(pthread_attr_destroy(&attr_)==0);
51 }
52
53 bool 
54 ThrPool::addJob(void *(*f)(void *), void *a)
55 {
56         job_t j;
57         j.f = f;
58         j.a = a;
59
60         return jobq_.enq(j,blockadd_);
61 }
62
63 bool 
64 ThrPool::takeJob(job_t *j)
65 {
66         jobq_.deq(j);
67         return (j->f!=NULL);
68 }
69