ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
threading.cc
Go to the documentation of this file.
1 // Copyright (c) 2018, ETH Zurich and UNC Chapel Hill.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 //
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 //
14 // * Neither the name of ETH Zurich and UNC Chapel Hill nor the names of
15 // its contributors may be used to endorse or promote products derived
16 // from this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Johannes L. Schoenberger (jsch-at-demuc-dot-de)
31 
32 #include "util/threading.h"
33 
34 #include "util/logging.h"
35 
36 namespace colmap {
37 
39  : started_(false),
40  stopped_(false),
41  paused_(false),
42  pausing_(false),
43  finished_(false),
44  setup_(false),
45  setup_valid_(false) {
48 }
49 
50 void Thread::Start() {
51  std::unique_lock<std::mutex> lock(mutex_);
52  CHECK(!started_ || finished_);
53  Wait();
54  timer_.Restart();
55  thread_ = std::thread(&Thread::RunFunc, this);
56  started_ = true;
57  stopped_ = false;
58  paused_ = false;
59  pausing_ = false;
60  finished_ = false;
61  setup_ = false;
62  setup_valid_ = false;
63 }
64 
65 void Thread::Stop() {
66  {
67  std::unique_lock<std::mutex> lock(mutex_);
68  stopped_ = true;
69  }
70  Resume();
71 }
72 
73 void Thread::Pause() {
74  std::unique_lock<std::mutex> lock(mutex_);
75  paused_ = true;
76 }
77 
79  std::unique_lock<std::mutex> lock(mutex_);
80  if (paused_) {
81  paused_ = false;
82  pause_condition_.notify_all();
83  }
84 }
85 
86 void Thread::Wait() {
87  if (thread_.joinable()) {
88  thread_.join();
89  }
90 }
91 
93  std::unique_lock<std::mutex> lock(mutex_);
94  return started_;
95 }
96 
98  std::unique_lock<std::mutex> lock(mutex_);
99  return stopped_;
100 }
101 
103  std::unique_lock<std::mutex> lock(mutex_);
104  return paused_;
105 }
106 
108  std::unique_lock<std::mutex> lock(mutex_);
109  return started_ && !pausing_ && !finished_;
110 }
111 
113  std::unique_lock<std::mutex> lock(mutex_);
114  return finished_;
115 }
116 
117 void Thread::AddCallback(const int id, const std::function<void()>& func) {
118  CHECK(func);
119  CHECK_GT(callbacks_.count(id), 0) << "Callback not registered";
120  callbacks_.at(id).push_back(func);
121 }
122 
123 void Thread::RegisterCallback(const int id) {
124  callbacks_.emplace(id, std::list<std::function<void()>>());
125 }
126 
127 void Thread::Callback(const int id) const {
128  CHECK_GT(callbacks_.count(id), 0) << "Callback not registered";
129  for (const auto& callback : callbacks_.at(id)) {
130  callback();
131  }
132 }
133 
134 std::thread::id Thread::GetThreadId() const {
135  return std::this_thread::get_id();
136 }
137 
139  std::unique_lock<std::mutex> lock(mutex_);
140  CHECK(!setup_);
141  setup_ = true;
142  setup_valid_ = true;
143  setup_condition_.notify_all();
144 }
145 
147  std::unique_lock<std::mutex> lock(mutex_);
148  CHECK(!setup_);
149  setup_ = true;
150  setup_valid_ = false;
151  setup_condition_.notify_all();
152 }
153 
154 const class Timer& Thread::GetTimer() const { return timer_; }
155 
157  std::unique_lock<std::mutex> lock(mutex_);
158  if (paused_) {
159  pausing_ = true;
160  timer_.Pause();
161  pause_condition_.wait(lock);
162  pausing_ = false;
163  timer_.Resume();
164  }
165 }
166 
168  std::unique_lock<std::mutex> lock(mutex_);
169  if (!setup_) {
170  setup_condition_.wait(lock);
171  }
172  return setup_valid_;
173 }
174 
175 void Thread::RunFunc() {
177  Run();
178  {
179  std::unique_lock<std::mutex> lock(mutex_);
180  finished_ = true;
181  timer_.Pause();
182  }
184 }
185 
186 ThreadPool::ThreadPool(const int num_threads)
187  : stopped_(false), num_active_workers_(0) {
188  const int num_effective_threads = GetEffectiveNumThreads(num_threads);
189  for (int index = 0; index < num_effective_threads; ++index) {
190  std::function<void(void)> worker =
191  std::bind(&ThreadPool::WorkerFunc, this, index);
192  workers_.emplace_back(worker);
193  }
194 }
195 
197 
199  {
200  std::unique_lock<std::mutex> lock(mutex_);
201 
202  if (stopped_) {
203  return;
204  }
205 
206  stopped_ = true;
207 
208  std::queue<std::function<void()>> empty_tasks;
209  std::swap(tasks_, empty_tasks);
210  }
211 
212  task_condition_.notify_all();
213 
214  for (auto& worker : workers_) {
215  worker.join();
216  }
217 
218  finished_condition_.notify_all();
219 }
220 
222  std::unique_lock<std::mutex> lock(mutex_);
223  if (!tasks_.empty() || num_active_workers_ > 0) {
224  finished_condition_.wait(
225  lock, [this]() { return tasks_.empty() && num_active_workers_ == 0; });
226  }
227 }
228 
229 void ThreadPool::WorkerFunc(const int index) {
230  {
231  std::lock_guard<std::mutex> lock(mutex_);
232  thread_id_to_index_.emplace(GetThreadId(), index);
233  }
234 
235  while (true) {
236  std::function<void()> task;
237  {
238  std::unique_lock<std::mutex> lock(mutex_);
239  task_condition_.wait(lock,
240  [this] { return stopped_ || !tasks_.empty(); });
241  if (stopped_ && tasks_.empty()) {
242  return;
243  }
244  task = std::move(tasks_.front());
245  tasks_.pop();
246  num_active_workers_ += 1;
247  }
248 
249  task();
250 
251  {
252  std::unique_lock<std::mutex> lock(mutex_);
253  num_active_workers_ -= 1;
254  }
255 
256  finished_condition_.notify_all();
257  }
258 }
259 
260 std::thread::id ThreadPool::GetThreadId() const {
261  return std::this_thread::get_id();
262 }
263 
265  std::unique_lock<std::mutex> lock(mutex_);
266  return thread_id_to_index_.at(GetThreadId());
267 }
268 
269 int GetEffectiveNumThreads(const int num_threads) {
270  int num_effective_threads = num_threads;
271  if (num_threads <= 0) {
272  num_effective_threads = std::thread::hardware_concurrency();
273  }
274 
275  if (num_effective_threads <= 0) {
276  num_effective_threads = 1;
277  }
278 
279  return num_effective_threads;
280 }
281 
282 } // namespace colmap
std::function< void(std::shared_ptr< core::Tensor >)> callback
ThreadPool(const int num_threads=kMaxNumThreads)
Definition: threading.cc:186
std::thread::id GetThreadId() const
Definition: threading.cc:260
bool IsFinished()
Definition: threading.cc:112
virtual void Pause()
Definition: threading.cc:73
virtual void Start()
Definition: threading.cc:50
void Callback(const int id) const
Definition: threading.cc:127
void BlockIfPaused()
Definition: threading.cc:156
virtual void Stop()
Definition: threading.cc:65
void RegisterCallback(const int id)
Definition: threading.cc:123
bool IsPaused()
Definition: threading.cc:102
const Timer & GetTimer() const
Definition: threading.cc:154
virtual void Resume()
Definition: threading.cc:78
bool CheckValidSetup()
Definition: threading.cc:167
void AddCallback(const int id, const std::function< void()> &func)
Definition: threading.cc:117
virtual void Run()=0
void SignalInvalidSetup()
Definition: threading.cc:146
bool IsStarted()
Definition: threading.cc:92
void SignalValidSetup()
Definition: threading.cc:138
std::thread::id GetThreadId() const
Definition: threading.cc:134
bool IsStopped()
Definition: threading.cc:97
virtual void Wait()
Definition: threading.cc:86
bool IsRunning()
Definition: threading.cc:107
void Pause()
Definition: timer.cc:54
void Restart()
Definition: timer.cc:49
void Resume()
Definition: timer.cc:59
int GetEffectiveNumThreads(const int num_threads)
Definition: threading.cc:269
void swap(cloudViewer::core::SmallVectorImpl< T > &LHS, cloudViewer::core::SmallVectorImpl< T > &RHS)
Implement std::swap in terms of SmallVector swap.
Definition: SmallVector.h:1370