ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
threading.h
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 
8 #pragma once
9 
10 #include <atomic>
11 #include <climits>
12 #include <functional>
13 #include <future>
14 #include <list>
15 #include <queue>
16 #include <unordered_map>
17 
18 #include "util/alignment.h"
19 #include "util/timer.h"
20 
21 namespace colmap {
22 
23 #ifdef __clang__
24 #pragma clang diagnostic push
25 #pragma clang diagnostic ignored "-Wkeyword-macro"
26 #endif
27 
28 #ifdef __clang__
29 #pragma clang diagnostic pop // -Wkeyword-macro
30 #endif
31 
32 // Helper class to create single threads with simple controls and timing, e.g.:
33 //
34 // class MyThread : public Thread {
35 // enum {
36 // PROCESSED_CALLBACK,
37 // };
38 //
39 // MyThread() { RegisterCallback(PROCESSED_CALLBACK); }
40 // void Run() {
41 // // Some setup routine... note that this optional.
42 // if (setup_valid) {
43 // SignalValidSetup();
44 // } else {
45 // SignalInvalidSetup();
46 // }
47 //
48 // // Some pre-processing...
49 // for (const auto& item : items) {
50 // BlockIfPaused();
51 // if (IsStopped()) {
52 // // Tear down...
53 // break;
54 // }
55 // // Process item...
56 // Callback(PROCESSED_CALLBACK);
57 // }
58 // }
59 // };
60 //
61 // MyThread thread;
62 // thread.AddCallback(MyThread::PROCESSED_CALLBACK, []() {
63 // std::cout << "Processed item"; })
64 // thread.AddCallback(MyThread::STARTED_CALLBACK, []() {
65 // std::cout << "Start"; })
66 // thread.AddCallback(MyThread::FINISHED_CALLBACK, []() {
67 // std::cout << "Finished"; })
68 // thread.Start();
69 // // thread.CheckValidSetup();
70 // // Pause, resume, stop, ...
71 // thread.Wait();
72 // thread.Timer().PrintElapsedSeconds();
73 //
74 class Thread {
75 public:
76  enum {
77  STARTED_CALLBACK = INT_MIN,
79  };
80 
81  Thread();
82  virtual ~Thread() = default;
83 
84  // Control the state of the thread.
85  virtual void Start();
86  virtual void Stop();
87  virtual void Pause();
88  virtual void Resume();
89  virtual void Wait();
90 
91  // Check the state of the thread.
92  bool IsStarted();
93  bool IsStopped();
94  bool IsPaused();
95  bool IsRunning();
96  bool IsFinished();
97 
98  // To be called from inside the main run function. This blocks the main
99  // caller, if the thread is paused, until the thread is resumed.
100  void BlockIfPaused();
101 
102  // To be called from outside. This blocks the caller until the thread is
103  // setup, i.e. it signaled that its setup was valid or not. If it never
104  // gives this signal, this call will block the caller infinitely. Check
105  // whether setup is valid. Note that the result is only meaningful if the
106  // thread gives a setup signal.
107  bool CheckValidSetup();
108 
109  // Set callbacks that can be triggered within the main run function.
110  void AddCallback(const int id, const std::function<void()>& func);
111 
112  // Get timing information of the thread, properly accounting for pause
113  // times.
114  const Timer& GetTimer() const;
115 
116 protected:
117  // This is the main run function to be implemented by the child class. If
118  // you are looping over data and want to support the pause operation, call
119  // `BlockIfPaused` at appropriate places in the loop. To support the stop
120  // operation, check the `IsStopped` state and early return from this method.
121  virtual void Run() = 0;
122 
123  // Register a new callback. Note that only registered callbacks can be
124  // set/reset and called from within the thread. Hence, this method should be
125  // called from the derived thread constructor.
126  void RegisterCallback(const int id);
127 
128  // Call back to the function with the specified name, if it exists.
129  void Callback(const int id) const;
130 
131  // Get the unique identifier of the current thread.
132  std::thread::id GetThreadId() const;
133 
134  // Signal that the thread is setup. Only call this function once.
135  void SignalValidSetup();
136  void SignalInvalidSetup();
137 
138 private:
139  // Wrapper around the main run function to set the finished flag.
140  void RunFunc();
141 
142  std::thread thread_;
143  std::mutex mutex_;
144  std::condition_variable pause_condition_;
145  std::condition_variable setup_condition_;
146 
147  Timer timer_;
148 
149  bool started_;
150  bool stopped_;
151  bool paused_;
152  bool pausing_;
153  bool finished_;
154  bool setup_;
155  bool setup_valid_;
156 
157  std::unordered_map<int, std::list<std::function<void()>>> callbacks_;
158 };
159 
160 // A thread pool class to submit generic tasks (functors) to a pool of workers:
161 //
162 // ThreadPool thread_pool;
163 // thread_pool.AddTask([]() { /* Do some work */ });
164 // auto future = thread_pool.AddTask([]() { /* Do some work */ return 1; });
165 // const auto result = future.get();
166 // for (int i = 0; i < 10; ++i) {
167 // thread_pool.AddTask([](const int i) { /* Do some work */ });
168 // }
169 // thread_pool.Wait();
170 //
171 class ThreadPool {
172 public:
173  static const int kMaxNumThreads = -1;
174 
175  explicit ThreadPool(const int num_threads = kMaxNumThreads);
176  ~ThreadPool();
177 
178  inline size_t NumThreads() const;
179 
180  // Add new task to the thread pool.
181  template <class func_t, class... args_t>
182  auto AddTask(func_t&& f, args_t&&... args)
183  -> std::future<typename std::result_of<func_t(args_t...)>::type>;
184 
185  // Stop the execution of all workers.
186  void Stop();
187 
188  // Wait until tasks are finished.
189  void Wait();
190 
191  // Get the unique identifier of the current thread.
192  std::thread::id GetThreadId() const;
193 
194  // Get the index of the current thread. In a thread pool of size N,
195  // the thread index defines the 0-based index of the thread in the pool.
196  // In other words, there are the thread indices 0, ..., N-1.
197  int GetThreadIndex();
198 
199 private:
200  void WorkerFunc(const int index);
201 
202  std::vector<std::thread> workers_;
203  std::queue<std::function<void()>> tasks_;
204 
205  std::mutex mutex_;
206  std::condition_variable task_condition_;
207  std::condition_variable finished_condition_;
208 
209  bool stopped_;
210  int num_active_workers_;
211 
212  std::unordered_map<std::thread::id, int> thread_id_to_index_;
213 };
214 
215 // A job queue class for the producer-consumer paradigm.
216 //
217 // JobQueue<int> job_queue;
218 //
219 // std::thread producer_thread([&job_queue]() {
220 // for (int i = 0; i < 10; ++i) {
221 // job_queue.Push(i);
222 // }
223 // });
224 //
225 // std::thread consumer_thread([&job_queue]() {
226 // for (int i = 0; i < 10; ++i) {
227 // const auto job = job_queue.Pop();
228 // if (job.IsValid()) { /* Do some work */ }
229 // else { break; }
230 // }
231 // });
232 //
233 // producer_thread.join();
234 // consumer_thread.join();
235 //
236 template <typename T>
237 class JobQueue {
238 public:
239  class Job {
240  public:
241  Job() : valid_(false) {}
242  explicit Job(const T& data) : data_(data), valid_(true) {}
243 
244  // Check whether the data is valid.
245  bool IsValid() const { return valid_; }
246 
247  // Get reference to the data.
248  T& Data() { return data_; }
249  const T& Data() const { return data_; }
250 
251  private:
252  T data_;
253  bool valid_;
254  };
255 
257  explicit JobQueue(const size_t max_num_jobs);
259 
260  // The number of pushed and not popped jobs in the queue.
261  size_t Size();
262 
263  // Push a new job to the queue. Waits if the number of jobs is exceeded.
264  bool Push(const T& data);
265 
266  // Pop a job from the queue. Waits if there is no job in the queue.
267  Job Pop();
268 
269  // Wait for all jobs to be popped and then stop the queue.
270  void Wait();
271 
272  // Stop the queue and return from all push/pop calls with false.
273  void Stop();
274 
275  // Clear all pushed and not popped jobs from the queue.
276  void Clear();
277 
278 private:
279  size_t max_num_jobs_;
280  std::atomic<bool> stop_;
281  std::queue<T> jobs_;
282  std::mutex mutex_;
283  std::condition_variable push_condition_;
284  std::condition_variable pop_condition_;
285  std::condition_variable empty_condition_;
286 };
287 
288 // Return the number of logical CPU cores if num_threads <= 0,
289 // otherwise return the input value of num_threads.
290 int GetEffectiveNumThreads(const int num_threads);
291 
293 // Implementation
295 
296 size_t ThreadPool::NumThreads() const { return workers_.size(); }
297 
298 template <class func_t, class... args_t>
299 auto ThreadPool::AddTask(func_t&& f, args_t&&... args)
300  -> std::future<typename std::result_of<func_t(args_t...)>::type> {
301  typedef typename std::result_of<func_t(args_t...)>::type return_t;
302 
303  auto task = std::make_shared<std::packaged_task<return_t()>>(
304  std::bind(std::forward<func_t>(f), std::forward<args_t>(args)...));
305 
306  std::future<return_t> result = task->get_future();
307 
308  {
309  std::unique_lock<std::mutex> lock(mutex_);
310  if (stopped_) {
311  throw std::runtime_error("Cannot add task to stopped thread pool.");
312  }
313  tasks_.emplace([task]() { (*task)(); });
314  }
315 
316  task_condition_.notify_one();
317 
318  return result;
319 }
320 
321 template <typename T>
322 JobQueue<T>::JobQueue() : JobQueue(std::numeric_limits<size_t>::max()) {}
323 
324 template <typename T>
325 JobQueue<T>::JobQueue(const size_t max_num_jobs)
326  : max_num_jobs_(max_num_jobs), stop_(false) {}
327 
328 template <typename T>
330  Stop();
331 }
332 
333 template <typename T>
335  std::unique_lock<std::mutex> lock(mutex_);
336  return jobs_.size();
337 }
338 
339 template <typename T>
340 bool JobQueue<T>::Push(const T& data) {
341  std::unique_lock<std::mutex> lock(mutex_);
342  while (jobs_.size() >= max_num_jobs_ && !stop_) {
343  pop_condition_.wait(lock);
344  }
345  if (stop_) {
346  return false;
347  } else {
348  jobs_.push(data);
349  push_condition_.notify_one();
350  return true;
351  }
352 }
353 
354 template <typename T>
356  std::unique_lock<std::mutex> lock(mutex_);
357  while (jobs_.empty() && !stop_) {
358  push_condition_.wait(lock);
359  }
360  if (stop_) {
361  return Job();
362  } else {
363  const T data = jobs_.front();
364  jobs_.pop();
365  pop_condition_.notify_one();
366  if (jobs_.empty()) {
367  empty_condition_.notify_all();
368  }
369  return Job(data);
370  }
371 }
372 
373 template <typename T>
375  std::unique_lock<std::mutex> lock(mutex_);
376  while (!jobs_.empty()) {
377  empty_condition_.wait(lock);
378  }
379 }
380 
381 template <typename T>
383  stop_ = true;
384  push_condition_.notify_all();
385  pop_condition_.notify_all();
386 }
387 
388 template <typename T>
390  std::unique_lock<std::mutex> lock(mutex_);
391  std::queue<T> empty_jobs;
392  std::swap(jobs_, empty_jobs);
393 }
394 
395 } // namespace colmap
char type
core::Tensor result
Definition: VtkUtils.cpp:76
const T & Data() const
Definition: threading.h:249
bool IsValid() const
Definition: threading.h:245
Job(const T &data)
Definition: threading.h:242
bool Push(const T &data)
Definition: threading.h:340
JobQueue(const size_t max_num_jobs)
Definition: threading.h:325
size_t Size()
Definition: threading.h:334
static const int kMaxNumThreads
Definition: threading.h:173
auto AddTask(func_t &&f, args_t &&... args) -> std::future< typename std::result_of< func_t(args_t...)>::type >
Definition: threading.h:299
ThreadPool(const int num_threads=kMaxNumThreads)
Definition: threading.cc:186
std::thread::id GetThreadId() const
Definition: threading.cc:260
size_t NumThreads() const
Definition: threading.h:296
bool IsFinished()
Definition: threading.cc:112
virtual void Pause()
Definition: threading.cc:73
virtual void Start()
Definition: threading.cc:50
void Callback(const int id) const
Definition: threading.cc:127
void BlockIfPaused()
Definition: threading.cc:156
virtual void Stop()
Definition: threading.cc:65
void RegisterCallback(const int id)
Definition: threading.cc:123
bool IsPaused()
Definition: threading.cc:102
const Timer & GetTimer() const
Definition: threading.cc:154
virtual void Resume()
Definition: threading.cc:78
bool CheckValidSetup()
Definition: threading.cc:167
void AddCallback(const int id, const std::function< void()> &func)
Definition: threading.cc:117
virtual void Run()=0
void SignalInvalidSetup()
Definition: threading.cc:146
bool IsStarted()
Definition: threading.cc:92
void SignalValidSetup()
Definition: threading.cc:138
std::thread::id GetThreadId() const
Definition: threading.cc:134
bool IsStopped()
Definition: threading.cc:97
virtual ~Thread()=default
virtual void Wait()
Definition: threading.cc:86
bool IsRunning()
Definition: threading.cc:107
GraphType data
Definition: graph_cut.cc:138
int GetEffectiveNumThreads(const int num_threads)
Definition: threading.cc:269
Definition: Eigen.h:85
void swap(cloudViewer::core::SmallVectorImpl< T > &LHS, cloudViewer::core::SmallVectorImpl< T > &RHS)
Implement std::swap in terms of SmallVector swap.
Definition: SmallVector.h:1370