51 std::unique_lock<std::mutex> lock(mutex_);
52 CHECK(!started_ || finished_);
55 thread_ = std::thread(&Thread::RunFunc,
this);
67 std::unique_lock<std::mutex> lock(mutex_);
74 std::unique_lock<std::mutex> lock(mutex_);
79 std::unique_lock<std::mutex> lock(mutex_);
82 pause_condition_.notify_all();
87 if (thread_.joinable()) {
93 std::unique_lock<std::mutex> lock(mutex_);
98 std::unique_lock<std::mutex> lock(mutex_);
103 std::unique_lock<std::mutex> lock(mutex_);
108 std::unique_lock<std::mutex> lock(mutex_);
109 return started_ && !pausing_ && !finished_;
113 std::unique_lock<std::mutex> lock(mutex_);
119 CHECK_GT(callbacks_.count(
id), 0) <<
"Callback not registered";
120 callbacks_.at(
id).push_back(func);
124 callbacks_.emplace(
id, std::list<std::function<
void()>>());
128 CHECK_GT(callbacks_.count(
id), 0) <<
"Callback not registered";
129 for (
const auto&
callback : callbacks_.at(
id)) {
135 return std::this_thread::get_id();
139 std::unique_lock<std::mutex> lock(mutex_);
143 setup_condition_.notify_all();
147 std::unique_lock<std::mutex> lock(mutex_);
150 setup_valid_ =
false;
151 setup_condition_.notify_all();
157 std::unique_lock<std::mutex> lock(mutex_);
161 pause_condition_.wait(lock);
168 std::unique_lock<std::mutex> lock(mutex_);
170 setup_condition_.wait(lock);
175 void Thread::RunFunc() {
179 std::unique_lock<std::mutex> lock(mutex_);
187 : stopped_(false), num_active_workers_(0) {
189 for (
int index = 0; index < num_effective_threads; ++index) {
190 std::function<void(
void)> worker =
191 std::bind(&ThreadPool::WorkerFunc,
this, index);
192 workers_.emplace_back(worker);
200 std::unique_lock<std::mutex> lock(mutex_);
208 std::queue<std::function<void()>> empty_tasks;
212 task_condition_.notify_all();
214 for (
auto& worker : workers_) {
218 finished_condition_.notify_all();
222 std::unique_lock<std::mutex> lock(mutex_);
223 if (!tasks_.empty() || num_active_workers_ > 0) {
224 finished_condition_.wait(
225 lock, [
this]() {
return tasks_.empty() && num_active_workers_ == 0; });
229 void ThreadPool::WorkerFunc(
const int index) {
231 std::lock_guard<std::mutex> lock(mutex_);
236 std::function<void()> task;
238 std::unique_lock<std::mutex> lock(mutex_);
239 task_condition_.wait(lock,
240 [
this] {
return stopped_ || !tasks_.empty(); });
241 if (stopped_ && tasks_.empty()) {
244 task = std::move(tasks_.front());
246 num_active_workers_ += 1;
252 std::unique_lock<std::mutex> lock(mutex_);
253 num_active_workers_ -= 1;
256 finished_condition_.notify_all();
261 return std::this_thread::get_id();
265 std::unique_lock<std::mutex> lock(mutex_);
270 int num_effective_threads = num_threads;
271 if (num_threads <= 0) {
272 num_effective_threads = std::thread::hardware_concurrency();
275 if (num_effective_threads <= 0) {
276 num_effective_threads = 1;
279 return num_effective_threads;
std::function< void(std::shared_ptr< core::Tensor >)> callback
ThreadPool(const int num_threads=kMaxNumThreads)
std::thread::id GetThreadId() const
void Callback(const int id) const
void RegisterCallback(const int id)
const Timer & GetTimer() const
void AddCallback(const int id, const std::function< void()> &func)
void SignalInvalidSetup()
std::thread::id GetThreadId() const
int GetEffectiveNumThreads(const int num_threads)
void swap(cloudViewer::core::SmallVectorImpl< T > &LHS, cloudViewer::core::SmallVectorImpl< T > &RHS)
Implement std::swap in terms of SmallVector swap.