16 #include <unordered_map>
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wkeyword-macro"
29 #pragma clang diagnostic pop
110 void AddCallback(
const int id,
const std::function<
void()>& func);
144 std::condition_variable pause_condition_;
145 std::condition_variable setup_condition_;
157 std::unordered_map<int, std::list<std::function<void()>>> callbacks_;
181 template <
class func_t,
class... args_t>
182 auto AddTask(func_t&& f, args_t&&... args)
183 -> std::future<
typename std::result_of<func_t(args_t...)>
::type>;
200 void WorkerFunc(
const int index);
202 std::vector<std::thread> workers_;
203 std::queue<std::function<void()>> tasks_;
206 std::condition_variable task_condition_;
207 std::condition_variable finished_condition_;
210 int num_active_workers_;
212 std::unordered_map<std::thread::id, int> thread_id_to_index_;
236 template <
typename T>
249 const T&
Data()
const {
return data_; }
279 size_t max_num_jobs_;
280 std::atomic<bool> stop_;
283 std::condition_variable push_condition_;
284 std::condition_variable pop_condition_;
285 std::condition_variable empty_condition_;
298 template <
class func_t,
class... args_t>
300 -> std::future<
typename std::result_of<func_t(args_t...)>
::type> {
301 typedef typename std::result_of<func_t(args_t...)>
::type return_t;
303 auto task = std::make_shared<std::packaged_task<return_t()>>(
304 std::bind(std::forward<func_t>(f), std::forward<args_t>(args)...));
306 std::future<return_t>
result = task->get_future();
309 std::unique_lock<std::mutex> lock(mutex_);
311 throw std::runtime_error(
"Cannot add task to stopped thread pool.");
313 tasks_.emplace([task]() { (*task)(); });
316 task_condition_.notify_one();
321 template <
typename T>
324 template <
typename T>
326 : max_num_jobs_(max_num_jobs), stop_(false) {}
328 template <
typename T>
333 template <
typename T>
335 std::unique_lock<std::mutex> lock(mutex_);
339 template <
typename T>
341 std::unique_lock<std::mutex> lock(mutex_);
342 while (jobs_.size() >= max_num_jobs_ && !stop_) {
343 pop_condition_.wait(lock);
349 push_condition_.notify_one();
354 template <
typename T>
356 std::unique_lock<std::mutex> lock(mutex_);
357 while (jobs_.empty() && !stop_) {
358 push_condition_.wait(lock);
363 const T
data = jobs_.front();
365 pop_condition_.notify_one();
367 empty_condition_.notify_all();
373 template <
typename T>
375 std::unique_lock<std::mutex> lock(mutex_);
376 while (!jobs_.empty()) {
377 empty_condition_.wait(lock);
381 template <
typename T>
384 push_condition_.notify_all();
385 pop_condition_.notify_all();
388 template <
typename T>
390 std::unique_lock<std::mutex> lock(mutex_);
391 std::queue<T> empty_jobs;
JobQueue(const size_t max_num_jobs)
static const int kMaxNumThreads
auto AddTask(func_t &&f, args_t &&... args) -> std::future< typename std::result_of< func_t(args_t...)>::type >
ThreadPool(const int num_threads=kMaxNumThreads)
std::thread::id GetThreadId() const
size_t NumThreads() const
void Callback(const int id) const
void RegisterCallback(const int id)
const Timer & GetTimer() const
void AddCallback(const int id, const std::function< void()> &func)
void SignalInvalidSetup()
std::thread::id GetThreadId() const
virtual ~Thread()=default
int GetEffectiveNumThreads(const int num_threads)
void swap(cloudViewer::core::SmallVectorImpl< T > &LHS, cloudViewer::core::SmallVectorImpl< T > &RHS)
Implement std::swap in terms of SmallVector swap.