FairRoot/PandaRoot
PndLmdThreadPool.h
Go to the documentation of this file.
1 /*
2  * @author Sehe (https://stackoverflow.com/users/85371/sehe)
3  *
4  * Thread Pool implementation using boost::threads.
5  *
6  * taken from this StackOverflow answer:
7  * https://stackoverflow.com/questions/22685176/boost-group-threads-maximal-number-of-parallel-thread
8  *
9  */
10 
11 #include <boost/thread.hpp>
12 #include <boost/phoenix.hpp>
13 #include <boost/optional.hpp>
14 
15 //using namespace boost;
16 using namespace boost::phoenix::arg_names;
17 
19 private:
20  boost::mutex mx;
21  boost::condition_variable cv;
22 
23  typedef boost::function<void()> job_t;
24  std::deque<job_t> _queue;
25 
26  boost::thread_group pool;
27 
28  boost::atomic_bool shutdown;
29  static void worker_thread(PndLmdThreadPool& q) {
30  while (boost::optional<job_t> job = q.dequeue())
31  (*job)();
32  }
33 
34 public:
35  //create thread pool with maximum possible threads
37  shutdown(false) {
38  for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
39  pool.create_thread(boost::bind(worker_thread, boost::ref(*this)));
40  }
41 
42  //create thread pool with maxThreads threads
43  PndLmdThreadPool(unsigned int maxThreads) :
44  shutdown(false) {
45  if (maxThreads > boost::thread::hardware_concurrency() || maxThreads == 0) maxThreads =
46  boost::thread::hardware_concurrency();
47  for (unsigned i = 0; i < maxThreads; ++i)
48  pool.create_thread(boost::bind(worker_thread, boost::ref(*this)));
49  }
50 
51  void enqueue(job_t job) {
52  boost::lock_guard<boost::mutex> lk(mx);
53  _queue.push_back(job);
54  cv.notify_one();
55  }
56 
57  boost::optional<job_t> dequeue() {
58  boost::unique_lock<boost::mutex> lk(mx);
59  namespace phx = boost::phoenix;
60  cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
61  if (_queue.empty()) return boost::none;
62  job_t job = _queue.front();
63  _queue.pop_front();
64  return job;
65  }
66 
67  // wait for all threads to complete
68  void wait() {
69  shutdown = true;
70  {
71  boost::lock_guard<boost::mutex> lk(mx);
72  cv.notify_all();
73  }
74  pool.join_all();
75  }
76 
78  shutdown = true;
79  {
80  boost::lock_guard<boost::mutex> lk(mx);
81  cv.notify_all();
82  }
83  pool.join_all();
84  }
85 };
Int_t i
Definition: run_full.C:25
std::deque< job_t > _queue
void enqueue(job_t job)
boost::condition_variable cv
PndLmdThreadPool(unsigned int maxThreads)
static void worker_thread(PndLmdThreadPool &q)
boost::optional< job_t > dequeue()
boost::atomic_bool shutdown
boost::function< void()> job_t
boost::thread_group pool