ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
threading_test.cc
Go to the documentation of this file.
1 // Copyright (c) 2018, ETH Zurich and UNC Chapel Hill.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 //
7 // * Redistributions of source code must retain the above copyright
8 // notice, this list of conditions and the following disclaimer.
9 //
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 //
14 // * Neither the name of ETH Zurich and UNC Chapel Hill nor the names of
15 // its contributors may be used to endorse or promote products derived
16 // from this software without specific prior written permission.
17 //
18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
22 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 // POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Author: Johannes L. Schoenberger (jsch-at-demuc-dot-de)
31 
32 #define TEST_NAME "util/threading"
33 #include "util/testing.h"
34 
35 #include "util/logging.h"
36 #include "util/threading.h"
37 
38 using namespace colmap;
39 
40 namespace {
41 
42 // Custom implementation of std::barrier that allows us to execute the below
43 // tests deterministically.
44 class Barrier {
45  public:
46  Barrier() : Barrier(2) {}
47 
48  explicit Barrier(const size_t count)
49  : threshold_(count), count_(count), generation_(0) {}
50 
51  void Wait() {
52  std::unique_lock<std::mutex> lock(mutex_);
53  auto current_generation = generation_;
54  if (!--count_) {
55  ++generation_;
56  count_ = threshold_;
57  condition_.notify_all();
58  } else {
59  condition_.wait(lock, [this, current_generation] {
60  return current_generation != generation_;
61  });
62  }
63  }
64 
65  private:
66  std::mutex mutex_;
67  std::condition_variable condition_;
68  const size_t threshold_;
69  size_t count_;
70  size_t generation_;
71 };
72 
73 } // namespace
74 
75 // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
76 // so we use glog's CHECK macros inside threads.
77 
78 BOOST_AUTO_TEST_CASE(TestThreadWait) {
79  class TestThread : public Thread {
80  public:
81  Barrier startBarrier;
82  Barrier endBarrier;
83 
84  void Run() {
85  startBarrier.Wait();
86  endBarrier.Wait();
87  }
88  };
89 
90  TestThread thread;
91  BOOST_CHECK(!thread.IsStarted());
92  BOOST_CHECK(!thread.IsStopped());
93  BOOST_CHECK(!thread.IsPaused());
94  BOOST_CHECK(!thread.IsRunning());
95  BOOST_CHECK(!thread.IsFinished());
96 
97  thread.Start();
98 
99  thread.startBarrier.Wait();
100  BOOST_CHECK(thread.IsStarted());
101  BOOST_CHECK(!thread.IsStopped());
102  BOOST_CHECK(!thread.IsPaused());
103  BOOST_CHECK(thread.IsRunning());
104  BOOST_CHECK(!thread.IsFinished());
105 
106  thread.endBarrier.Wait();
107  thread.Wait();
108  BOOST_CHECK(thread.IsStarted());
109  BOOST_CHECK(!thread.IsStopped());
110  BOOST_CHECK(!thread.IsPaused());
111  BOOST_CHECK(!thread.IsRunning());
112  BOOST_CHECK(thread.IsFinished());
113 }
114 
115 BOOST_AUTO_TEST_CASE(TestThreadPause) {
116  class TestThread : public Thread {
117  public:
118  Barrier startBarrier;
119  Barrier pauseBarrier;
120  Barrier pausedBarrier;
121  Barrier resumedBarrier;
122  Barrier endBarrier;
123 
124  void Run() {
125  startBarrier.Wait();
126  pauseBarrier.Wait();
127  pausedBarrier.Wait();
128  BlockIfPaused();
129  resumedBarrier.Wait();
130  endBarrier.Wait();
131  }
132  };
133 
134  TestThread thread;
135  BOOST_CHECK(!thread.IsStarted());
136  BOOST_CHECK(!thread.IsStopped());
137  BOOST_CHECK(!thread.IsPaused());
138  BOOST_CHECK(!thread.IsRunning());
139  BOOST_CHECK(!thread.IsFinished());
140 
141  thread.Start();
142 
143  thread.startBarrier.Wait();
144  BOOST_CHECK(thread.IsStarted());
145  BOOST_CHECK(!thread.IsStopped());
146  BOOST_CHECK(!thread.IsPaused());
147  BOOST_CHECK(thread.IsRunning());
148  BOOST_CHECK(!thread.IsFinished());
149 
150  thread.pauseBarrier.Wait();
151  thread.Pause();
152  thread.pausedBarrier.Wait();
153  while (!thread.IsPaused() || thread.IsRunning()) {
154  std::this_thread::sleep_for(std::chrono::milliseconds(10));
155  }
156  BOOST_CHECK(thread.IsStarted());
157  BOOST_CHECK(!thread.IsStopped());
158  BOOST_CHECK(thread.IsPaused());
159  BOOST_CHECK(!thread.IsRunning());
160  BOOST_CHECK(!thread.IsFinished());
161 
162  thread.Resume();
163  thread.resumedBarrier.Wait();
164  BOOST_CHECK(thread.IsStarted());
165  BOOST_CHECK(!thread.IsStopped());
166  BOOST_CHECK(!thread.IsPaused());
167  BOOST_CHECK(thread.IsRunning());
168  BOOST_CHECK(!thread.IsFinished());
169 
170  thread.endBarrier.Wait();
171  thread.Wait();
172  BOOST_CHECK(thread.IsStarted());
173  BOOST_CHECK(!thread.IsStopped());
174  BOOST_CHECK(!thread.IsPaused());
175  BOOST_CHECK(!thread.IsRunning());
176  BOOST_CHECK(thread.IsFinished());
177 }
178 
179 BOOST_AUTO_TEST_CASE(TestThreadPauseStop) {
180  class TestThread : public Thread {
181  public:
182  Barrier startBarrier;
183  Barrier pauseBarrier;
184  Barrier pausedBarrier;
185  Barrier resumedBarrier;
186  Barrier stopBarrier;
187  Barrier stoppingBarrier;
188  Barrier stoppedBarrier;
189  Barrier endBarrier;
190 
191  void Run() {
192  startBarrier.Wait();
193  pauseBarrier.Wait();
194  pausedBarrier.Wait();
195  BlockIfPaused();
196  resumedBarrier.Wait();
197  stopBarrier.Wait();
198  stoppingBarrier.Wait();
199 
200  if (IsStopped()) {
201  stoppedBarrier.Wait();
202  endBarrier.Wait();
203  return;
204  }
205  }
206  };
207 
208  TestThread thread;
209  BOOST_CHECK(!thread.IsStarted());
210  BOOST_CHECK(!thread.IsStopped());
211  BOOST_CHECK(!thread.IsPaused());
212  BOOST_CHECK(!thread.IsRunning());
213  BOOST_CHECK(!thread.IsFinished());
214 
215  thread.Start();
216 
217  thread.startBarrier.Wait();
218  BOOST_CHECK(thread.IsStarted());
219  BOOST_CHECK(!thread.IsStopped());
220  BOOST_CHECK(!thread.IsPaused());
221  BOOST_CHECK(thread.IsRunning());
222  BOOST_CHECK(!thread.IsFinished());
223 
224  thread.pauseBarrier.Wait();
225  thread.Pause();
226  thread.pausedBarrier.Wait();
227  while (!thread.IsPaused() || thread.IsRunning()) {
228  std::this_thread::sleep_for(std::chrono::milliseconds(10));
229  }
230  BOOST_CHECK(thread.IsStarted());
231  BOOST_CHECK(!thread.IsStopped());
232  BOOST_CHECK(thread.IsPaused());
233  BOOST_CHECK(!thread.IsRunning());
234  BOOST_CHECK(!thread.IsFinished());
235 
236  thread.Resume();
237  thread.resumedBarrier.Wait();
238  BOOST_CHECK(thread.IsStarted());
239  BOOST_CHECK(!thread.IsStopped());
240  BOOST_CHECK(!thread.IsPaused());
241  BOOST_CHECK(thread.IsRunning());
242  BOOST_CHECK(!thread.IsFinished());
243 
244  thread.stopBarrier.Wait();
245  thread.Stop();
246  thread.stoppingBarrier.Wait();
247  thread.stoppedBarrier.Wait();
248  BOOST_CHECK(thread.IsStarted());
249  BOOST_CHECK(thread.IsStopped());
250  BOOST_CHECK(!thread.IsPaused());
251  BOOST_CHECK(thread.IsRunning());
252  BOOST_CHECK(!thread.IsFinished());
253 
254  thread.endBarrier.Wait();
255  thread.Wait();
256  BOOST_CHECK(thread.IsStarted());
257  BOOST_CHECK(thread.IsStopped());
258  BOOST_CHECK(!thread.IsPaused());
259  BOOST_CHECK(!thread.IsRunning());
260  BOOST_CHECK(thread.IsFinished());
261 }
262 
263 BOOST_AUTO_TEST_CASE(TestThreadRestart) {
264  class TestThread : public Thread {
265  public:
266  Barrier startBarrier;
267  Barrier endBarrier;
268 
269  void Run() {
270  startBarrier.Wait();
271  endBarrier.Wait();
272  }
273  };
274 
275  TestThread thread;
276  BOOST_CHECK(!thread.IsStarted());
277  BOOST_CHECK(!thread.IsStopped());
278  BOOST_CHECK(!thread.IsPaused());
279  BOOST_CHECK(!thread.IsRunning());
280  BOOST_CHECK(!thread.IsFinished());
281 
282  for (size_t i = 0; i < 2; ++i) {
283  thread.Start();
284 
285  thread.startBarrier.Wait();
286  BOOST_CHECK(thread.IsStarted());
287  BOOST_CHECK(!thread.IsStopped());
288  BOOST_CHECK(!thread.IsPaused());
289  BOOST_CHECK(thread.IsRunning());
290  BOOST_CHECK(!thread.IsFinished());
291 
292  thread.endBarrier.Wait();
293  thread.Wait();
294  BOOST_CHECK(thread.IsStarted());
295  BOOST_CHECK(!thread.IsStopped());
296  BOOST_CHECK(!thread.IsPaused());
297  BOOST_CHECK(!thread.IsRunning());
298  BOOST_CHECK(thread.IsFinished());
299  }
300 }
301 
302 BOOST_AUTO_TEST_CASE(TestThreadValidSetup) {
303  class TestThread : public Thread {
304  public:
305  Barrier startBarrier;
306  Barrier signalBarrier;
307  Barrier endBarrier;
308 
309  void Run() {
310  startBarrier.Wait();
311  SignalValidSetup();
312  signalBarrier.Wait();
313  endBarrier.Wait();
314  }
315  };
316 
317  TestThread thread;
318  BOOST_CHECK(!thread.IsStarted());
319  BOOST_CHECK(!thread.IsStopped());
320  BOOST_CHECK(!thread.IsPaused());
321  BOOST_CHECK(!thread.IsRunning());
322  BOOST_CHECK(!thread.IsFinished());
323 
324  thread.Start();
325 
326  thread.startBarrier.Wait();
327  BOOST_CHECK(thread.IsStarted());
328  BOOST_CHECK(!thread.IsStopped());
329  BOOST_CHECK(!thread.IsPaused());
330  BOOST_CHECK(thread.IsRunning());
331  BOOST_CHECK(!thread.IsFinished());
332 
333  thread.signalBarrier.Wait();
334  BOOST_CHECK(thread.CheckValidSetup());
335 
336  thread.endBarrier.Wait();
337  thread.Wait();
338  BOOST_CHECK(thread.IsStarted());
339  BOOST_CHECK(!thread.IsStopped());
340  BOOST_CHECK(!thread.IsPaused());
341  BOOST_CHECK(!thread.IsRunning());
342  BOOST_CHECK(thread.IsFinished());
343 }
344 
345 BOOST_AUTO_TEST_CASE(TestThreadInvalidSetup) {
346  class TestThread : public Thread {
347  public:
348  Barrier startBarrier;
349  Barrier signalBarrier;
350  Barrier endBarrier;
351 
352  void Run() {
353  startBarrier.Wait();
354  SignalInvalidSetup();
355  signalBarrier.Wait();
356  endBarrier.Wait();
357  }
358  };
359 
360  TestThread thread;
361  BOOST_CHECK(!thread.IsStarted());
362  BOOST_CHECK(!thread.IsStopped());
363  BOOST_CHECK(!thread.IsPaused());
364  BOOST_CHECK(!thread.IsRunning());
365  BOOST_CHECK(!thread.IsFinished());
366 
367  thread.Start();
368 
369  thread.startBarrier.Wait();
370  BOOST_CHECK(thread.IsStarted());
371  BOOST_CHECK(!thread.IsStopped());
372  BOOST_CHECK(!thread.IsPaused());
373  BOOST_CHECK(thread.IsRunning());
374  BOOST_CHECK(!thread.IsFinished());
375 
376  thread.signalBarrier.Wait();
377  BOOST_CHECK(!thread.CheckValidSetup());
378 
379  thread.endBarrier.Wait();
380  thread.Wait();
381  BOOST_CHECK(thread.IsStarted());
382  BOOST_CHECK(!thread.IsStopped());
383  BOOST_CHECK(!thread.IsPaused());
384  BOOST_CHECK(!thread.IsRunning());
385  BOOST_CHECK(thread.IsFinished());
386 }
387 
388 BOOST_AUTO_TEST_CASE(TestCallback) {
389  class TestThread : public Thread {
390  public:
391  enum Callbacks {
392  CALLBACK1,
393  CALLBACK2,
394  };
395 
396  TestThread() {
397  RegisterCallback(CALLBACK1);
398  RegisterCallback(CALLBACK2);
399  }
400 
401  private:
402  void Run() {
403  Callback(CALLBACK1);
404  Callback(CALLBACK2);
405  }
406  };
407 
408  bool called_back1 = false;
409  std::function<void()> CallbackFunc1 = [&called_back1]() {
410  called_back1 = true;
411  };
412 
413  bool called_back2 = false;
414  std::function<void()> CallbackFunc2 = [&called_back2]() {
415  called_back2 = true;
416  };
417 
418  bool called_back3 = false;
419  std::function<void()> CallbackFunc3 = [&called_back3]() {
420  called_back3 = true;
421  };
422 
423  TestThread thread;
424  thread.AddCallback(TestThread::CALLBACK1, CallbackFunc1);
425  thread.Start();
426  thread.Wait();
427  BOOST_CHECK(called_back1);
428  BOOST_CHECK(!called_back2);
429  BOOST_CHECK(!called_back3);
430 
431  called_back1 = false;
432  called_back2 = false;
433  thread.AddCallback(TestThread::CALLBACK2, CallbackFunc2);
434  thread.Start();
435  thread.Wait();
436  BOOST_CHECK(called_back1);
437  BOOST_CHECK(called_back2);
438  BOOST_CHECK(!called_back3);
439 
440  called_back1 = false;
441  called_back2 = false;
442  called_back3 = false;
443  thread.AddCallback(TestThread::CALLBACK1, CallbackFunc3);
444  thread.Start();
445  thread.Wait();
446  BOOST_CHECK(called_back1);
447  BOOST_CHECK(called_back2);
448  BOOST_CHECK(called_back3);
449 }
450 
451 BOOST_AUTO_TEST_CASE(TestDefaultCallback) {
452  class TestThread : public Thread {
453  public:
454  Barrier startBarrier;
455  Barrier signalBarrier;
456  Barrier endBarrier;
457 
458  void Run() {
459  startBarrier.Wait();
460  endBarrier.Wait();
461  }
462  };
463 
464  bool called_back1 = false;
465  std::function<void()> CallbackFunc1 = [&called_back1]() {
466  called_back1 = true;
467  };
468 
469  bool called_back2 = false;
470  std::function<void()> CallbackFunc2 = [&called_back2]() {
471  called_back2 = true;
472  };
473 
474  TestThread thread;
475  thread.AddCallback(TestThread::STARTED_CALLBACK, CallbackFunc1);
476  thread.AddCallback(TestThread::FINISHED_CALLBACK, CallbackFunc2);
477  thread.Start();
478  thread.startBarrier.Wait();
479  BOOST_CHECK(called_back1);
480  BOOST_CHECK(!called_back2);
481  thread.endBarrier.Wait();
482  thread.Wait();
483  BOOST_CHECK(called_back1);
484  BOOST_CHECK(called_back2);
485 }
486 
487 BOOST_AUTO_TEST_CASE(TestThreadPoolNoArgNoReturn) {
488  std::function<void(void)> Func = []() {
489  int num = 0;
490  for (int i = 0; i < 100; ++i) {
491  num += i;
492  }
493  };
494 
495  ThreadPool pool(4);
496  std::vector<std::future<void>> futures;
497 
498  for (int i = 0; i < 100; ++i) {
499  futures.push_back(pool.AddTask(Func));
500  }
501 
502  for (auto& future : futures) {
503  future.get();
504  }
505 }
506 
507 BOOST_AUTO_TEST_CASE(TestThreadPoolArgNoReturn) {
508  std::function<void(int)> Func = [](int num) {
509  for (int i = 0; i < 100; ++i) {
510  num += i;
511  }
512  };
513 
514  ThreadPool pool(4);
515  std::vector<std::future<void>> futures;
516 
517  for (int i = 0; i < 100; ++i) {
518  futures.push_back(pool.AddTask(Func, i));
519  }
520 
521  for (auto& future : futures) {
522  future.get();
523  }
524 }
525 
526 BOOST_AUTO_TEST_CASE(TestThreadPoolNoArgReturn) {
527  std::function<int(void)> Func = []() { return 0; };
528 
529  ThreadPool pool(4);
530  std::vector<std::future<int>> futures;
531 
532  for (int i = 0; i < 100; ++i) {
533  futures.push_back(pool.AddTask(Func));
534  }
535 
536  for (auto& future : futures) {
537  future.get();
538  }
539 }
540 
541 BOOST_AUTO_TEST_CASE(TestThreadPoolArgReturn) {
542  std::function<int(int)> Func = [](int num) {
543  for (int i = 0; i < 100; ++i) {
544  num += i;
545  }
546  return num;
547  };
548 
549  ThreadPool pool(4);
550  std::vector<std::future<int>> futures;
551 
552  for (int i = 0; i < 100; ++i) {
553  futures.push_back(pool.AddTask(Func, i));
554  }
555 
556  for (auto& future : futures) {
557  future.get();
558  }
559 }
560 
561 BOOST_AUTO_TEST_CASE(TestThreadPoolStop) {
562  std::function<int(int)> Func = [](int num) {
563  for (int i = 0; i < 100; ++i) {
564  num += i;
565  }
566  return num;
567  };
568 
569  ThreadPool pool(4);
570  std::vector<std::future<int>> futures;
571 
572  for (int i = 0; i < 100; ++i) {
573  futures.push_back(pool.AddTask(Func, i));
574  }
575 
576  pool.Stop();
577 
578  BOOST_CHECK_THROW(pool.AddTask(Func, 100), std::runtime_error);
579 
580  pool.Stop();
581 }
582 
583 BOOST_AUTO_TEST_CASE(TestThreadPoolWait) {
584  std::vector<uint8_t> results(100, 0);
585  std::function<void(int)> Func = [&results](const int num) {
586  results[num] = 1;
587  };
588 
589  ThreadPool pool(4);
590  pool.Wait();
591 
592  for (size_t i = 0; i < results.size(); ++i) {
593  pool.AddTask(Func, i);
594  }
595 
596  pool.Wait();
597 
598  for (const auto result : results) {
599  BOOST_CHECK_EQUAL(result, 1);
600  }
601 }
602 
603 BOOST_AUTO_TEST_CASE(TestThreadPoolWaitEverytime) {
604  std::vector<uint8_t> results(4, 0);
605  std::function<void(int)> Func = [&results](const int num) {
606  results[num] = 1;
607  };
608 
609  ThreadPool pool(4);
610 
611  for (size_t i = 0; i < results.size(); ++i) {
612  pool.AddTask(Func, i);
613  pool.Wait();
614 
615  for (size_t j = 0; j < results.size(); ++j) {
616  if (j <= i) {
617  BOOST_CHECK_EQUAL(results[j], 1);
618  } else {
619  BOOST_CHECK_EQUAL(results[j], 0);
620  }
621  }
622  }
623 
624  pool.Wait();
625 }
626 
627 BOOST_AUTO_TEST_CASE(TestThreadPoolGetThreadIndex) {
628  ThreadPool pool(4);
629 
630  std::vector<int> results(100, -1);
631  std::function<void(int)> Func = [&](const int num) {
632  results[num] = pool.GetThreadIndex();
633  };
634 
635  for (size_t i = 0; i < results.size(); ++i) {
636  pool.AddTask(Func, i);
637  }
638 
639  pool.Wait();
640 
641  for (const auto result : results) {
642  BOOST_CHECK_GE(result, 0);
643  BOOST_CHECK_LE(result, 3);
644  }
645 }
646 
647 BOOST_AUTO_TEST_CASE(TestJobQueueSingleProducerSingleConsumer) {
648  JobQueue<int> job_queue;
649 
650  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
651  // so we use glog's CHECK macros inside threads.
652 
653  std::thread producer_thread([&job_queue]() {
654  for (int i = 0; i < 10; ++i) {
655  CHECK(job_queue.Push(i));
656  }
657  });
658 
659  std::thread consumer_thread([&job_queue]() {
660  CHECK_LE(job_queue.Size(), 10);
661  for (int i = 0; i < 10; ++i) {
662  const auto job = job_queue.Pop();
663  CHECK(job.IsValid());
664  CHECK_LT(job.Data(), 10);
665  }
666  });
667 
668  producer_thread.join();
669  consumer_thread.join();
670 }
671 
672 BOOST_AUTO_TEST_CASE(TestJobQueueSingleProducerSingleConsumerMaxNumJobs) {
673  JobQueue<int> job_queue(2);
674 
675  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
676  // so we use glog's CHECK macros inside threads.
677 
678  std::thread producer_thread([&job_queue]() {
679  for (int i = 0; i < 10; ++i) {
680  CHECK(job_queue.Push(i));
681  }
682  });
683 
684  std::thread consumer_thread([&job_queue]() {
685  CHECK_LE(job_queue.Size(), 2);
686  for (int i = 0; i < 10; ++i) {
687  const auto job = job_queue.Pop();
688  CHECK(job.IsValid());
689  CHECK_LT(job.Data(), 10);
690  }
691  });
692 
693  producer_thread.join();
694  consumer_thread.join();
695 }
696 
697 BOOST_AUTO_TEST_CASE(TestJobQueueMultipleProducerSingleConsumer) {
698  JobQueue<int> job_queue(1);
699 
700  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
701  // so we use glog's CHECK macros inside threads.
702 
703  std::thread producer_thread1([&job_queue]() {
704  for (int i = 0; i < 10; ++i) {
705  CHECK(job_queue.Push(i));
706  }
707  });
708 
709  std::thread producer_thread2([&job_queue]() {
710  for (int i = 0; i < 10; ++i) {
711  CHECK(job_queue.Push(i));
712  }
713  });
714 
715  std::thread consumer_thread([&job_queue]() {
716  CHECK_LE(job_queue.Size(), 1);
717  for (int i = 0; i < 20; ++i) {
718  const auto job = job_queue.Pop();
719  CHECK(job.IsValid());
720  CHECK_LT(job.Data(), 10);
721  }
722  });
723 
724  producer_thread1.join();
725  producer_thread2.join();
726  consumer_thread.join();
727 }
728 
729 BOOST_AUTO_TEST_CASE(TestJobQueueSingleProducerMultipleConsumer) {
730  JobQueue<int> job_queue(1);
731 
732  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
733  // so we use glog's CHECK macros inside threads.
734 
735  std::thread producer_thread([&job_queue]() {
736  for (int i = 0; i < 20; ++i) {
737  CHECK(job_queue.Push(i));
738  }
739  });
740 
741  std::thread consumer_thread1([&job_queue]() {
742  CHECK_LE(job_queue.Size(), 1);
743  for (int i = 0; i < 10; ++i) {
744  const auto job = job_queue.Pop();
745  CHECK(job.IsValid());
746  CHECK_LT(job.Data(), 20);
747  }
748  });
749 
750  std::thread consumer_thread2([&job_queue]() {
751  CHECK_LE(job_queue.Size(), 1);
752  for (int i = 0; i < 10; ++i) {
753  const auto job = job_queue.Pop();
754  CHECK(job.IsValid());
755  CHECK_LT(job.Data(), 20);
756  }
757  });
758 
759  producer_thread.join();
760  consumer_thread1.join();
761  consumer_thread2.join();
762 }
763 
764 BOOST_AUTO_TEST_CASE(TestJobQueueMultipleProducerMultipleConsumer) {
765  JobQueue<int> job_queue(1);
766 
767  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
768  // so we use glog's CHECK macros inside threads.
769 
770  std::thread producer_thread1([&job_queue]() {
771  for (int i = 0; i < 10; ++i) {
772  CHECK(job_queue.Push(i));
773  }
774  });
775 
776  std::thread producer_thread2([&job_queue]() {
777  for (int i = 0; i < 10; ++i) {
778  CHECK(job_queue.Push(i));
779  }
780  });
781 
782  std::thread consumer_thread1([&job_queue]() {
783  CHECK_LE(job_queue.Size(), 1);
784  for (int i = 0; i < 10; ++i) {
785  const auto job = job_queue.Pop();
786  CHECK(job.IsValid());
787  CHECK_LT(job.Data(), 10);
788  }
789  });
790 
791  std::thread consumer_thread2([&job_queue]() {
792  CHECK_LE(job_queue.Size(), 1);
793  for (int i = 0; i < 10; ++i) {
794  const auto job = job_queue.Pop();
795  CHECK(job.IsValid());
796  CHECK_LT(job.Data(), 10);
797  }
798  });
799 
800  producer_thread1.join();
801  producer_thread2.join();
802  consumer_thread1.join();
803  consumer_thread2.join();
804 }
805 
806 BOOST_AUTO_TEST_CASE(TestJobQueueWait) {
807  JobQueue<int> job_queue;
808 
809  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
810  // so we use glog's CHECK macros inside threads.
811 
812  for (int i = 0; i < 10; ++i) {
813  CHECK(job_queue.Push(i));
814  }
815 
816  std::thread consumer_thread([&job_queue]() {
817  CHECK_EQ(job_queue.Size(), 10);
818  for (int i = 0; i < 10; ++i) {
819  const auto job = job_queue.Pop();
820  CHECK(job.IsValid());
821  CHECK_EQ(job.Data(), i);
822  }
823  });
824 
825  job_queue.Wait();
826 
827  BOOST_CHECK_EQUAL(job_queue.Size(), 0);
828  BOOST_CHECK(job_queue.Push(0));
829  BOOST_CHECK(job_queue.Pop().IsValid());
830 
831  consumer_thread.join();
832 }
833 
834 BOOST_AUTO_TEST_CASE(TestJobQueueStopProducer) {
835  JobQueue<int> job_queue(1);
836 
837  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
838  // so we use glog's CHECK macros inside threads.
839 
840  Barrier stopBarrier;
841  std::thread producer_thread([&job_queue, &stopBarrier]() {
842  CHECK(job_queue.Push(0));
843  stopBarrier.Wait();
844  CHECK(!job_queue.Push(0));
845  });
846 
847  stopBarrier.Wait();
848  BOOST_CHECK_EQUAL(job_queue.Size(), 1);
849 
850  job_queue.Stop();
851  producer_thread.join();
852 
853  BOOST_CHECK(!job_queue.Push(0));
854  BOOST_CHECK(!job_queue.Pop().IsValid());
855 }
856 
857 BOOST_AUTO_TEST_CASE(TestJobQueueStopConsumer) {
858  JobQueue<int> job_queue(1);
859 
860  // IMPORTANT: BOOST_CHECK_* macros are not thread-safe,
861  // so we use glog's CHECK macros inside threads.
862 
863  BOOST_CHECK(job_queue.Push(0));
864 
865  Barrier popBarrier;
866  std::thread consumer_thread([&job_queue, &popBarrier]() {
867  const auto job = job_queue.Pop();
868  CHECK(job.IsValid());
869  CHECK_EQ(job.Data(), 0);
870  popBarrier.Wait();
871  CHECK(!job_queue.Pop().IsValid());
872  });
873 
874  popBarrier.Wait();
875  BOOST_CHECK_EQUAL(job_queue.Size(), 0);
876 
877  job_queue.Stop();
878  consumer_thread.join();
879 
880  BOOST_CHECK(!job_queue.Push(0));
881  BOOST_CHECK(!job_queue.Pop().IsValid());
882 }
883 
884 BOOST_AUTO_TEST_CASE(TestJobQueueClear) {
885  JobQueue<int> job_queue(1);
886 
887  BOOST_CHECK(job_queue.Push(0));
888  BOOST_CHECK_EQUAL(job_queue.Size(), 1);
889 
890  job_queue.Clear();
891  BOOST_CHECK_EQUAL(job_queue.Size(), 0);
892 }
893 
894 BOOST_AUTO_TEST_CASE(TestGetEffectiveNumThreads) {
895  BOOST_CHECK_GT(GetEffectiveNumThreads(-2), 0);
896  BOOST_CHECK_GT(GetEffectiveNumThreads(-1), 0);
897  BOOST_CHECK_GT(GetEffectiveNumThreads(0), 0);
898  BOOST_CHECK_EQUAL(GetEffectiveNumThreads(1), 1);
899  BOOST_CHECK_EQUAL(GetEffectiveNumThreads(2), 2);
900  BOOST_CHECK_EQUAL(GetEffectiveNumThreads(3), 3);
901 }
int Run(int argc, const char *argv[])
int count
core::Tensor result
Definition: VtkUtils.cpp:76
bool IsValid() const
Definition: threading.h:245
bool Push(const T &data)
Definition: threading.h:340
size_t Size()
Definition: threading.h:334
auto AddTask(func_t &&f, args_t &&... args) -> std::future< typename std::result_of< func_t(args_t...)>::type >
Definition: threading.h:299
int GetEffectiveNumThreads(const int num_threads)
Definition: threading.cc:269
BOOST_AUTO_TEST_CASE(TestThreadWait)