all: lock_demo lock_server lock_tester rsm_tester rpc/rpctest $(EXTRA_TARGETS)
-rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thr_pool.o
+rpc/librpc.a: rpc/rpc.o rpc/connection.o rpc/poll_mgr.o rpc/thread_pool.o
rm -f $@
ar cq $@ $^
ranlib rpc/librpc.a
}
void lock_client::releaser() {
- while (1) {
- maybe<lock_protocol::lockid_t> mlid;
- release_fifo.deq(&mlid);
-
- if (!mlid) {
- LOG << "Releaser stopping";
- break;
- }
-
+ while (auto mlid = release_fifo.deq()) {
lock_protocol::lockid_t lid = mlid;
LOG << "Releaser: " << lid;
LOG << "Lock " << lid << ": none";
st.signal();
}
+ LOG << "Releaser stopping";
}
int lock_client::stat(lock_protocol::lockid_t lid) {
void lock_server::revoker () {
while (1) {
- lock_protocol::lockid_t lid;
- revoke_fifo.deq(&lid);
+ lock_protocol::lockid_t lid = revoke_fifo.deq();
LOG << "Revoking " << lid;
if (rsm_ && !rsm_->amiprimary())
continue;
void lock_server::retryer() {
while (1) {
- lock_protocol::lockid_t lid;
- retry_fifo.deq(&lid);
+ lock_protocol::lockid_t lid = retry_fifo.deq();
if (rsm_ && !rsm_->amiprimary())
continue;
#include "types.h"
-// blocks enq() and deq() when queue is FULL or EMPTY
template<class T>
class fifo {
public:
return true;
}
- void deq(T * e) {
+ T deq() {
lock ml(m_);
while(q_.empty())
non_empty_c_.wait(ml);
- *e = q_.front();
+ T t = q_.front();
q_.pop_front();
if (max_ && q_.size() < max_)
has_space_c_.notify_one();
- }
-
- bool size() {
- lock ml(m_);
- return q_.size();
+ return t;
}
private:
#define rpc_h
#include "types.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
#include "rpc_protocol.h"
-#include "thr_pool.h"
+#include "thread_pool.h"
#include "marshall.h"
#include "marshall_wrap.h"
#include "connection.h"
void dispatch(shared_ptr<connection> c, const string & buf);
- unique_ptr<thread_pool> dispatchpool_{new thread_pool(6, false)};
+ unique_ptr<thread_pool> dispatchpool_{new thread_pool(6)};
unique_ptr<connection_listener> listener_;
// RPC handler for clients binding
+++ /dev/null
-#include "thr_pool.h"
-
-// if blocking, then addJob() blocks when queue is full
-// otherwise, addJob() simply returns false when queue is full
-thread_pool::thread_pool(size_t sz, bool blocking)
-: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) {
- for (size_t i=0; i<nthreads_; i++)
- th_.emplace_back(&thread_pool::do_worker, this);
-}
-
-// IMPORTANT: this function can be called only when no external thread
-// will ever use this thread pool again or is currently blocking on it
-thread_pool::~thread_pool() {
- for (size_t i=0; i<nthreads_; i++)
- jobq_.enq(job_t());
-
- for (size_t i=0; i<nthreads_; i++)
- th_[i].join();
-}
-
-bool thread_pool::addJob(const job_t & j) {
- return jobq_.enq(j,blockadd_);
-}
-
-void thread_pool::do_worker() {
- job_t j;
- while (1) {
- jobq_.deq(&j);
- if (!j)
- break;
- j();
- }
-}
--- /dev/null
+#include "thread_pool.h"
+
+thread_pool::thread_pool(size_t sz) : th_(sz) {
+ for (auto & t : th_)
+ t = thread(&thread_pool::do_worker, this);
+}
+
+// this function can be called only when no external thread will
+// ever use this thread pool again or is currently blocking on it
+thread_pool::~thread_pool() {
+ for (size_t i=0; i<th_.size(); i++)
+ jobq_.enq(job_t());
+
+ for (auto & t : th_)
+ t.join();
+}
+
+bool thread_pool::addJob(const job_t & j) {
+ return jobq_.enq(j, false);
+}
+
+void thread_pool::do_worker() {
+ while (auto j = jobq_.deq())
+ j();
+}
-#ifndef thr_pool_h
-#define thr_pool_h
+#ifndef thread_pool_h
+#define thread_pool_h
-#include "types.h"
+#include <functional>
+#include <vector>
#include "fifo.h"
typedef std::function<void()> job_t;
class thread_pool {
public:
- thread_pool(size_t sz, bool blocking=true);
+ thread_pool(size_t sz);
~thread_pool();
-
bool addJob(const job_t & j);
private:
- size_t nthreads_;
- bool blockadd_;
-
fifo<job_t> jobq_;
std::vector<thread> th_;
-
void do_worker();
};