ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
ReductionCPU.cpp
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 
8 #include <Logging.h>
9 #include <Parallel.h>
10 
11 #include <limits>
12 
17 
18 namespace cloudViewer {
19 namespace core {
20 namespace kernel {
21 
22 template <typename scalar_t>
23 static inline scalar_t CPUSumReductionKernel(scalar_t a, scalar_t b) {
24  return a + b;
25 }
26 
27 template <typename scalar_t>
28 static inline scalar_t CPUProdReductionKernel(scalar_t a, scalar_t b) {
29  return a * b;
30 }
31 
32 template <typename scalar_t>
33 static inline scalar_t CPUMinReductionKernel(scalar_t a, scalar_t b) {
34  return std::min(a, b);
35 }
36 
37 template <typename scalar_t>
38 static inline scalar_t CPUMaxReductionKernel(scalar_t a, scalar_t b) {
39  return std::max(a, b);
40 }
41 
42 static inline uint8_t CPUAllReductionKernel(uint8_t a, uint8_t b) {
43  return a && b;
44 }
45 
46 static inline uint8_t CPUAnyReductionKernel(uint8_t a, uint8_t b) {
47  return a || b;
48 }
49 
50 template <typename scalar_t>
51 static inline std::pair<int64_t, scalar_t> CPUArgMinReductionKernel(
52  int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b) {
53  if (a < b) {
54  return {a_idx, a};
55  } else {
56  return {b_idx, b};
57  }
58 }
59 
60 template <typename scalar_t>
61 static inline std::pair<int64_t, scalar_t> CPUArgMaxReductionKernel(
62  int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b) {
63  if (a > b) {
64  return {a_idx, a};
65  } else {
66  return {b_idx, b};
67  }
68 }
69 
71 public:
74  CPUReductionEngine(const Indexer& indexer) : indexer_(indexer) {}
75 
76  template <typename func_t, typename scalar_t>
77  void Run(const func_t& reduce_func, scalar_t identity) {
78  // See: PyTorch's TensorIterator::parallel_reduce for the reference
79  // design of reduction strategy.
81  LaunchReductionKernelSerial<scalar_t>(indexer_, reduce_func);
82  } else if (indexer_.NumOutputElements() <= 1) {
83  LaunchReductionKernelTwoPass<scalar_t>(indexer_, reduce_func,
84  identity);
85  } else {
86  LaunchReductionParallelDim<scalar_t>(indexer_, reduce_func);
87  }
88  }
89 
90 private:
91  template <typename scalar_t, typename func_t>
92  static void LaunchReductionKernelSerial(const Indexer& indexer,
93  func_t element_kernel) {
94  for (int64_t workload_idx = 0; workload_idx < indexer.NumWorkloads();
95  ++workload_idx) {
96  scalar_t* src = reinterpret_cast<scalar_t*>(
97  indexer.GetInputPtr(0, workload_idx));
98  scalar_t* dst = reinterpret_cast<scalar_t*>(
99  indexer.GetOutputPtr(workload_idx));
100  *dst = element_kernel(*src, *dst);
101  }
102  }
103 
106  template <typename scalar_t, typename func_t>
107  static void LaunchReductionKernelTwoPass(const Indexer& indexer,
108  func_t element_kernel,
109  scalar_t identity) {
110  if (indexer.NumOutputElements() > 1) {
112  "Internal error: two-pass reduction only works for "
113  "single-output reduction ops.");
114  }
115  int64_t num_workloads = indexer.NumWorkloads();
116  int64_t num_threads = utility::EstimateMaxThreads();
117  int64_t workload_per_thread =
118  (num_workloads + num_threads - 1) / num_threads;
119  std::vector<scalar_t> thread_results(num_threads, identity);
120 
121 #pragma omp parallel for schedule(static) \
122  num_threads(utility::EstimateMaxThreads())
123  for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
124  int64_t start = thread_idx * workload_per_thread;
125  int64_t end = std::min(start + workload_per_thread, num_workloads);
126  scalar_t local_result = identity;
127  for (int64_t workload_idx = start; workload_idx < end;
128  ++workload_idx) {
129  scalar_t* src = reinterpret_cast<scalar_t*>(
130  indexer.GetInputPtr(0, workload_idx));
131  local_result = element_kernel(*src, local_result);
132  }
133  thread_results[thread_idx] = local_result;
134  }
135  scalar_t* dst = reinterpret_cast<scalar_t*>(indexer.GetOutputPtr(0));
136  for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
137  *dst = element_kernel(thread_results[thread_idx], *dst);
138  }
139  }
140 
141  template <typename scalar_t, typename func_t>
142  static void LaunchReductionParallelDim(const Indexer& indexer,
143  func_t element_kernel) {
144  // Prefers outer dimension >= num_threads.
145  const int64_t* indexer_shape = indexer.GetPrimaryShape();
146  const int64_t num_dims = indexer.NumDims();
147  int64_t num_threads = utility::EstimateMaxThreads();
148 
149  // Init best_dim as the outer-most non-reduction dim.
150  int64_t best_dim = num_dims - 1;
151  while (best_dim >= 0 && indexer.IsReductionDim(best_dim)) {
152  best_dim--;
153  }
154  for (int64_t dim = best_dim; dim >= 0 && !indexer.IsReductionDim(dim);
155  --dim) {
156  if (indexer_shape[dim] >= num_threads) {
157  best_dim = dim;
158  break;
159  } else if (indexer_shape[dim] > indexer_shape[best_dim]) {
160  best_dim = dim;
161  }
162  }
163  if (best_dim == -1) {
165  "Internal error: all dims are reduction dims, use "
166  "LaunchReductionKernelTwoPass instead.");
167  }
168 
169 #pragma omp parallel for schedule(static) \
170  num_threads(utility::EstimateMaxThreads())
171  for (int64_t i = 0; i < indexer_shape[best_dim]; ++i) {
172  Indexer sub_indexer(indexer);
173  sub_indexer.ShrinkDim(best_dim, i, 1);
174  LaunchReductionKernelSerial<scalar_t>(sub_indexer, element_kernel);
175  }
176  }
177 
178 private:
179  Indexer indexer_;
180 };
181 
183 public:
187 
188  template <typename func_t, typename scalar_t>
189  void Run(const func_t& reduce_func, scalar_t identity) {
190  // Arg-reduction needs to iterate each output element separately in
191  // sub-iterations. Each output elemnent corresponds to multiple input
192  // elements. We need to keep track of the indices within each
193  // sub-iteration.
194  int64_t num_output_elements = indexer_.NumOutputElements();
195  if (num_output_elements <= 1) {
196  LaunchArgReductionKernelTwoPass(indexer_, reduce_func, identity);
197  } else {
198  LaunchArgReductionParallelDim(indexer_, reduce_func, identity);
199  }
200  }
201 
202  template <typename scalar_t, typename func_t>
204  func_t reduce_func,
205  scalar_t identity) {
206  int64_t num_output_elements = indexer.NumOutputElements();
207 #pragma omp parallel for schedule(static) \
208  num_threads(utility::EstimateMaxThreads())
209  for (int64_t output_idx = 0; output_idx < num_output_elements;
210  output_idx++) {
211  // sub_indexer.NumWorkloads() == ipo.
212  // sub_indexer's workload_idx is indexer's ipo_idx.
213  Indexer sub_indexer = indexer.GetPerOutputIndexer(output_idx);
214  scalar_t dst_val = identity;
215  for (int64_t workload_idx = 0;
216  workload_idx < sub_indexer.NumWorkloads(); workload_idx++) {
217  int64_t src_idx = workload_idx;
218  scalar_t* src_val = reinterpret_cast<scalar_t*>(
219  sub_indexer.GetInputPtr(0, workload_idx));
220  int64_t* dst_idx = reinterpret_cast<int64_t*>(
221  sub_indexer.GetOutputPtr(0, workload_idx));
222  std::tie(*dst_idx, dst_val) =
223  reduce_func(src_idx, *src_val, *dst_idx, dst_val);
224  }
225  }
226  }
227 
231  template <typename scalar_t, typename func_t>
233  func_t reduce_func,
234  scalar_t identity) {
235  if (indexer.NumOutputElements() > 1) {
237  "Internal error: two-pass arg reduction only works for "
238  "single-output arg reduction ops.");
239  }
240  int64_t num_workloads = indexer.NumWorkloads();
241  int64_t num_threads = utility::EstimateMaxThreads();
242  int64_t workload_per_thread =
243  (num_workloads + num_threads - 1) / num_threads;
244  std::vector<int64_t> thread_results_idx(num_threads, 0);
245  std::vector<scalar_t> thread_results_val(num_threads, identity);
246 
247 #pragma omp parallel for schedule(static) \
248  num_threads(utility::EstimateMaxThreads())
249  for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
250  int64_t start = thread_idx * workload_per_thread;
251  int64_t end = std::min(start + workload_per_thread, num_workloads);
252  scalar_t local_result_val = identity;
253  int64_t local_result_idx = 0;
254  for (int64_t workload_idx = start; workload_idx < end;
255  ++workload_idx) {
256  int64_t src_idx = workload_idx;
257  scalar_t* src_val = reinterpret_cast<scalar_t*>(
258  indexer.GetInputPtr(0, workload_idx));
259  std::tie(local_result_idx, local_result_val) = reduce_func(
260  src_idx, *src_val, local_result_idx, local_result_val);
261  }
262  thread_results_val[thread_idx] = local_result_val;
263  thread_results_idx[thread_idx] = local_result_idx;
264  }
265  scalar_t dst_val = identity;
266  int64_t* dst_idx = reinterpret_cast<int64_t*>(indexer.GetOutputPtr(0));
267  for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
268  std::tie(*dst_idx, dst_val) = reduce_func(
269  thread_results_idx[thread_idx],
270  thread_results_val[thread_idx], *dst_idx, dst_val);
271  }
272  }
273 
274 private:
275  Indexer indexer_;
276 };
277 
278 void ReductionCPU(const Tensor& src,
279  Tensor& dst,
280  const SizeVector& dims,
281  bool keepdim,
282  ReductionOpCode op_code) {
283  if (s_regular_reduce_ops.find(op_code) != s_regular_reduce_ops.end()) {
284  Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
286  DISPATCH_DTYPE_TO_TEMPLATE(src.GetDtype(), [&]() {
287  scalar_t identity;
288  switch (op_code) {
289  case ReductionOpCode::Sum:
290  identity = 0;
291  dst.Fill(identity);
292  re.Run(CPUSumReductionKernel<scalar_t>, identity);
293  break;
294  case ReductionOpCode::Prod:
295  identity = 1;
296  dst.Fill(identity);
297  re.Run(CPUProdReductionKernel<scalar_t>, identity);
298  break;
299  case ReductionOpCode::Min:
300  if (indexer.NumWorkloads() == 0) {
301  utility::LogError(
302  "Zero-size Tensor does not support Min.");
303  } else {
304  identity = std::numeric_limits<scalar_t>::max();
305  dst.Fill(identity);
306  re.Run(CPUMinReductionKernel<scalar_t>, identity);
307  }
308  break;
309  case ReductionOpCode::Max:
310  if (indexer.NumWorkloads() == 0) {
311  utility::LogError(
312  "Zero-size Tensor does not support Max.");
313  } else {
314  identity = std::numeric_limits<scalar_t>::lowest();
315  dst.Fill(identity);
316  re.Run(CPUMaxReductionKernel<scalar_t>, identity);
317  }
318  break;
319  default:
320  utility::LogError("Unsupported op code.");
321  break;
322  }
323  });
324  } else if (s_arg_reduce_ops.find(op_code) != s_arg_reduce_ops.end()) {
325  if (dst.GetDtype() != core::Int64) {
326  utility::LogError("Arg-reduction must have int64 output dtype.");
327  }
328  // Accumulation buffer to store temporary min/max values.
329  Tensor dst_acc(dst.GetShape(), src.GetDtype(), src.GetDevice());
330 
331  Indexer indexer({src}, {dst, dst_acc}, DtypePolicy::INPUT_SAME, dims);
332  CPUArgReductionEngine re(indexer);
333  DISPATCH_DTYPE_TO_TEMPLATE(src.GetDtype(), [&]() {
334  scalar_t identity;
335  switch (op_code) {
336  case ReductionOpCode::ArgMin:
337  if (indexer.NumWorkloads() == 0) {
338  utility::LogError(
339  "Zero-size Tensor does not support ArgMin.");
340  } else {
341  identity = std::numeric_limits<scalar_t>::max();
342  dst_acc.Fill(identity);
343  re.Run(CPUArgMinReductionKernel<scalar_t>, identity);
344  }
345  break;
346  case ReductionOpCode::ArgMax:
347  if (indexer.NumWorkloads() == 0) {
348  utility::LogError(
349  "Zero-size Tensor does not support ArgMax.");
350  } else {
351  identity = std::numeric_limits<scalar_t>::lowest();
352  dst_acc.Fill(identity);
353  re.Run(CPUArgMaxReductionKernel<scalar_t>, identity);
354  }
355  break;
356  default:
357  utility::LogError("Unsupported op code.");
358  break;
359  }
360  });
361  } else if (s_boolean_reduce_ops.find(op_code) !=
362  s_boolean_reduce_ops.end()) {
363  if (src.GetDtype() != core::Bool) {
365  "Boolean reduction only supports boolean input tensor.");
366  }
367  if (dst.GetDtype() != core::Bool) {
369  "Boolean reduction only supports boolean output tensor.");
370  }
371  Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
372  CPUReductionEngine re(indexer);
373  switch (op_code) {
374  case ReductionOpCode::All:
375  // Identity == true. 0-sized tensor, returns true.
376  dst.Fill(true);
377  re.Run(CPUAllReductionKernel, static_cast<uint8_t>(true));
378  break;
379  case ReductionOpCode::Any:
380  // Identity == false. 0-sized tensor, returns false.
381  dst.Fill(false);
382  re.Run(CPUAnyReductionKernel, static_cast<uint8_t>(false));
383  break;
384  default:
385  utility::LogError("Unsupported op code.");
386  break;
387  }
388  } else {
389  utility::LogError("Unsupported op code.");
390  }
391 }
392 
393 } // namespace kernel
394 } // namespace core
395 } // namespace cloudViewer
Indexer indexer
#define DISPATCH_DTYPE_TO_TEMPLATE(DTYPE,...)
Definition: Dispatch.h:31
CLOUDVIEWER_HOST_DEVICE char * GetInputPtr(int64_t input_idx, int64_t workload_idx) const
Definition: Indexer.h:406
CLOUDVIEWER_HOST_DEVICE char * GetOutputPtr(int64_t workload_idx) const
Definition: Indexer.h:438
int64_t NumWorkloads() const
Definition: Indexer.cpp:406
int64_t NumOutputElements() const
Returns the number of output elements.
Definition: Indexer.cpp:414
Dtype GetDtype() const
Definition: Tensor.h:1164
CPUArgReductionEngine(const CPUArgReductionEngine &)=delete
void Run(const func_t &reduce_func, scalar_t identity)
static void LaunchArgReductionKernelTwoPass(const Indexer &indexer, func_t reduce_func, scalar_t identity)
static void LaunchArgReductionParallelDim(const Indexer &indexer, func_t reduce_func, scalar_t identity)
CPUArgReductionEngine & operator=(const CPUArgReductionEngine &)=delete
CPUReductionEngine & operator=(const CPUReductionEngine &)=delete
CPUReductionEngine(const CPUReductionEngine &)=delete
void Run(const func_t &reduce_func, scalar_t identity)
#define LogError(...)
Definition: Logging.h:60
int min(int a, int b)
Definition: cutil_math.h:53
int max(int a, int b)
Definition: cutil_math.h:48
static scalar_t CPUSumReductionKernel(scalar_t a, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_arg_reduce_ops
Definition: Reduction.h:41
static scalar_t CPUMinReductionKernel(scalar_t a, scalar_t b)
static std::pair< int64_t, scalar_t > CPUArgMaxReductionKernel(int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b)
static uint8_t CPUAllReductionKernel(uint8_t a, uint8_t b)
static scalar_t CPUProdReductionKernel(scalar_t a, scalar_t b)
void ReductionCPU(const Tensor &src, Tensor &dst, const SizeVector &dims, bool keepdim, ReductionOpCode op_code)
static scalar_t CPUMaxReductionKernel(scalar_t a, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_regular_reduce_ops
Definition: Reduction.h:34
static uint8_t CPUAnyReductionKernel(uint8_t a, uint8_t b)
static std::pair< int64_t, scalar_t > CPUArgMinReductionKernel(int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_boolean_reduce_ops
Definition: Reduction.h:46
const Dtype Bool
Definition: Dtype.cpp:52
const Dtype Int64
Definition: Dtype.cpp:47
bool InParallel()
Returns true if in an parallel section.
Definition: Parallel.cpp:48
int EstimateMaxThreads()
Estimate the maximum number of threads to be used in a parallel region.
Definition: Parallel.cpp:31
Generic file read and write utility for python interface.