ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
ReductionCUDA.cu
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 
8 #include <Logging.h>
9 #include <cuda.h>
10 #include <cuda_runtime.h>
11 #include <thrust/pair.h>
12 #include <thrust/tuple.h>
13 
14 #include <algorithm>
15 #include <array>
16 #include <limits>
17 #include <sstream>
18 #include <tuple>
19 #include <type_traits>
20 
21 #include "cloudViewer/core/CUDAUtils.h"
22 #include "core/Blob.h"
23 #include "core/Device.h"
24 #include "core/Dispatch.h"
25 #include "core/FunctionTraits.h"
26 #include "core/Indexer.h"
27 #include "core/MemoryManager.h"
28 #include "core/ParallelFor.h"
29 #include "core/SizeVector.h"
30 #include "core/Tensor.h"
31 #include "core/kernel/Reduction.h"
32 
33 // CUDA reduction is based on PyTorch's CUDA reduction implementation.
34 // See: aten/src/ATen/native/cuda/Reduce.cuh
35 
36 #if __CUDA_ARCH__ >= 750
37 constexpr uint32_t CUDA_MAX_THREADS_PER_SM = 1024;
38 #else
39 constexpr uint32_t CUDA_MAX_THREADS_PER_SM = 2048;
40 #endif
41 
42 constexpr uint32_t CUDA_MAX_THREADS_PER_BLOCK = 1024;
43 constexpr uint32_t CUDA_THREADS_PER_BLOCK_FALLBACK = 256;
44 
45 #define CLOUDVIEWER_MAX_THREADS_PER_BLOCK(val) \
46  (((val) <= CUDA_MAX_THREADS_PER_BLOCK) ? (val) \
47  : CUDA_THREADS_PER_BLOCK_FALLBACK)
48 #define CLOUDVIEWER_MIN_BLOCKS_PER_SM(threads_per_block, blocks_per_sm) \
49  ((((threads_per_block) * (blocks_per_sm) <= CUDA_MAX_THREADS_PER_SM) \
50  ? (blocks_per_sm) \
51  : ((CUDA_MAX_THREADS_PER_SM + (threads_per_block) - 1) / \
52  (threads_per_block))))
53 
54 #define CLOUDVIEWER_LAUNCH_BOUNDS_2(max_threads_per_block, min_blocks_per_sm) \
55  __launch_bounds__( \
56  (CLOUDVIEWER_MAX_THREADS_PER_BLOCK((max_threads_per_block))), \
57  (CLOUDVIEWER_MIN_BLOCKS_PER_SM((max_threads_per_block), \
58  (min_blocks_per_sm))))
59 
60 template <typename T>
61 CLOUDVIEWER_DEVICE __forceinline__ T
62 WARP_SHFL_DOWN(T value,
63  unsigned int delta,
64  int width = warpSize,
65  unsigned int mask = 0xffffffff) {
66 #if CUDA_VERSION >= 9000
67  return __shfl_down_sync(mask, value, delta, width);
68 #else
69  return __shfl_down(value, delta, width);
70 #endif
71 }
72 
73 namespace cloudViewer {
74 namespace core {
75 namespace kernel {
76 
77 static inline int64_t DivUp(int64_t a, int64_t b) { return (a + b - 1) / b; }
78 
79 // Returns reduced fraction numerator & denominator
80 CLOUDVIEWER_HOST_DEVICE static void ReduceFraction(int64_t& numerator,
81  int64_t& denominator) {
82  // Get GCD of num and denom using Euclid's algorithm.
83  // Can replace this with std::gcd if we ever support c++17.
84  int64_t a = denominator;
85  int64_t b = numerator;
86  while (b != 0) {
87  a %= b;
88  int64_t tmp = a;
89  a = b;
90  b = tmp;
91  }
92  // a is now the GCD
93  numerator /= a;
94  denominator /= a;
95 }
96 
97 class ReduceConfig {
98 public:
99  static constexpr int BLOCK_X = 0;
100  static constexpr int BLOCK_Y = 1;
101  static constexpr int CTA = 2;
102  static constexpr int MAX_NUM_THREADS = 512;
103 
104  int num_inputs_per_output_;
105  int num_outputs_;
106  int step_input_ = 1;
107  int step_output_ = 1;
108  int ctas_per_output_ = 1;
109 
110 private:
111  int element_size_bytes_;
112  int input_mult_[3] = {0, 0, 0};
113  int output_mult_[2] = {0, 0};
114  int block_width_;
115  int block_height_;
116  int num_threads_;
117 
118 public:
119  ReduceConfig(int element_size_bytes, const Indexer& indexer)
120  : element_size_bytes_(element_size_bytes) {
121  num_outputs_ = indexer.NumOutputElements();
122  num_inputs_per_output_ = indexer.NumWorkloads() / num_outputs_;
123 
124  // Adjust block size to map block width to fastest changing dimension of
125  // input tensor. This grants the best possible memory accessing pattern,
126  // given that for non-contiguous tensor with space in between, we cannot
127  // have perfect memory coalescing.
128  bool reduction_on_fastest_striding_dimension =
129  (indexer.NumReductionDims() == indexer.NumDims()) ||
130  (indexer.GetInput(0).byte_strides_[0] <
131  indexer.GetInput(0).byte_strides_[indexer.NumReductionDims()]);
132 
133  // Notice that dim0 & dim1 does NOT guarantee any launch configuration
134  // here! dim0 & dim1 are more like the upper bound of the block
135  // dimension. The actual launch config and reduction scheme is
136  // determined by setting values to `input_mult_` and
137  // `output_mult_`. We try to max out dim1 so that we have enough
138  // threads per CTA to deliver performance for larger problem size.
139  int64_t dim0;
140  int64_t dim1;
141 
142  if (reduction_on_fastest_striding_dimension) {
143  // Map block.x to the fastest reducing dimension. It implies:
144  // 1. BlockXReduce is required.
145  // 2. block.y now max out to num_outputs.
146  dim0 = indexer.GetPrimaryShape()[0];
147  dim1 = num_outputs_;
148  } else {
149  // Map block.x to the fastest non reducing dimension. It implies:
150  // 1. BlockXReduce is turned off.
151  // 2. block.y now max out to num_inputs_per_output_.
152  dim0 = indexer.GetPrimaryShape()[indexer.NumReductionDims()];
153  dim1 = num_inputs_per_output_;
154  }
155 
156  // Adjust block_width and block_height
157  SetBlockDimension(dim0, dim1);
158 
159  int block_width = block_width_;
160  int block_height = block_height_;
161 
162  if (indexer.NumDims() == 0 || reduction_on_fastest_striding_dimension) {
163  // Split the input across lanes if the input is contiguous in the
164  // reduced dimension. This will require reduction between threads
165  // using warp shuffle instructions and shared memory (if
166  // block_width > warpSize).
167  input_mult_[0] = SplitInput(block_width);
168  } else {
169  // Otherwise split the output across lanes in a warp.
170  output_mult_[0] = SplitOutput(block_width);
171  }
172 
173  if (ValuesPerThread() >= block_height * 16 ||
174  ValuesPerThread() >= 256) {
175  // Divide the input across warps in a thread-block, if that leaves
176  // at least 16 elements to be summed by each thread. This will
177  // require inter-warp reduction using shared memory.
178  input_mult_[1] = SplitInput(block_height);
179  } else {
180  // Otherwise, each warp handles a separate output.
181  output_mult_[1] = SplitOutput(block_height);
182  }
183 
184  if (input_mult_[1] != 0 && ValuesPerThread() >= 256 &&
185  num_outputs_ <= 4096) {
186  // Divide the input across thread-blocks if the amount of work
187  // per-thread is large enough and the size of the output is small
188  // enough. This will require a reduction using global memory.
189  ctas_per_output_ = DivUp(ValuesPerThread(), 16);
190  if (ctas_per_output_ > 65535) {
191  ctas_per_output_ = 65535;
192  }
193  input_mult_[2] = SplitInput(ctas_per_output_);
194  }
195  }
196 
197  /// Returns floor(log2(n))
198  static inline int LastPow2(int n) {
199  // Dtype.h asserts sizeof(int) == 4.
200  n |= (n >> 1);
201  n |= (n >> 2);
202  n |= (n >> 4);
203  n |= (n >> 8);
204  n |= (n >> 16);
205  return std::max(1, n - (n >> 1));
206  }
207 
208  void SetBlockDimension(int64_t dim0, int64_t dim1) {
209  int dim0_pow2 = dim0 < MAX_NUM_THREADS
210  ? static_cast<int>(LastPow2(dim0))
211  : MAX_NUM_THREADS;
212  int dim1_pow2 = dim1 < MAX_NUM_THREADS
213  ? static_cast<int>(LastPow2(dim1))
214  : MAX_NUM_THREADS;
215  block_width_ = std::min(dim0_pow2, GetCUDACurrentWarpSize());
216  block_height_ =
217  std::min(dim1_pow2, int(MAX_NUM_THREADS / block_width_));
218  block_width_ =
219  std::min(dim0_pow2, int(MAX_NUM_THREADS / block_height_));
220  num_threads_ = block_width_ * block_height_;
221  }
222 
223  int SplitInput(int parallelism) {
224  int step = step_input_;
225  step_input_ *= parallelism;
226  return step;
227  }
228 
229  int SplitOutput(int parallelism) {
230  int step = step_output_;
231  step_output_ *= parallelism;
232  return step;
233  }
234 
235  dim3 BlockDim() const { return dim3(block_width_, block_height_); }
236 
237  dim3 GridDim() const {
238  return dim3(DivUp(num_outputs_, step_output_), ctas_per_output_);
239  }
240 
241  CLOUDVIEWER_HOST_DEVICE bool ShouldBlockXReduce() const {
242  return input_mult_[BLOCK_X] != 0;
243  }
244 
245  CLOUDVIEWER_HOST_DEVICE bool ShouldBlockYReduce() const {
246  return input_mult_[BLOCK_Y] != 0;
247  }
248 
249  CLOUDVIEWER_HOST_DEVICE bool ShouldGlobalReduce() const {
250  return input_mult_[CTA] != 0;
251  }
252 
253  CLOUDVIEWER_DEVICE bool ShouldStore(int output_idx) const {
254  return output_idx < num_outputs_ &&
255  (!ShouldBlockXReduce() || threadIdx.x == 0) &&
256  (!ShouldBlockYReduce() || threadIdx.y == 0);
257  }
258 
259  CLOUDVIEWER_HOST_DEVICE int InputIdx() const {
260  int lane = threadIdx.x;
261  int warp = threadIdx.y;
262  int cta2 = blockIdx.y;
263  return (lane * input_mult_[BLOCK_X] + warp * input_mult_[BLOCK_Y] +
264  cta2 * input_mult_[CTA]);
265  }
266 
267  CLOUDVIEWER_HOST_DEVICE int OutputIdx() const {
268  int lane = threadIdx.x;
269  int warp = threadIdx.y;
270  int cta1 = blockIdx.x;
271  return (lane * output_mult_[BLOCK_X] + warp * output_mult_[BLOCK_Y] +
272  cta1 * step_output_);
273  }
274 
275  CLOUDVIEWER_DEVICE int SharedMemoryOffset(int offset) const {
276  return threadIdx.x + (threadIdx.y + offset) * blockDim.x;
277  }
278 
279  CLOUDVIEWER_DEVICE int StagingMemoryOffset(int cta2) const {
280  int offset = cta2 + blockIdx.x * gridDim.y;
281  if (!ShouldBlockXReduce()) {
282  offset = threadIdx.x + offset * blockDim.x;
283  }
284  return offset;
285  }
286 
287  int SharedMemorySize() const {
288  if (!ShouldBlockYReduce() &&
289  (!ShouldBlockXReduce() ||
290  block_width_ <= GetCUDACurrentWarpSize())) {
291  return 0;
292  }
293  return element_size_bytes_ * num_threads_;
294  }
295 
296  int64_t GlobalMemorySize() const {
297  if (!ShouldGlobalReduce()) {
298  return 0;
299  }
300  auto size =
301  (int64_t)element_size_bytes_ * num_outputs_ * ctas_per_output_;
302  if (!ShouldBlockXReduce()) {
303  size *= BlockDim().x;
304  }
305  return size;
306  }
307 
308  int SemaphoreSize() const {
309  if (!ShouldGlobalReduce()) {
310  return 0;
311  }
312  return sizeof(int) * GridDim().x;
313  }
314 
315  int ValuesPerThread() const {
316  return DivUp(num_inputs_per_output_, step_input_);
317  }
318 
319  std::string ToString() const {
320  std::string input_mult_str = fmt::format(
321  "[{},{},{}]", input_mult_[0], input_mult_[1], input_mult_[2]);
322  std::string output_mult_str =
323  fmt::format("[{},{}]", output_mult_[0], output_mult_[1]);
324  std::string block_str = fmt::format("[{},{},{}]", BlockDim().x,
325  BlockDim().y, BlockDim().z);
326  std::string grid_str = fmt::format("[{},{},{}]", GridDim().x,
327  GridDim().y, GridDim().z);
328  std::string str = fmt::format(
329  "REDUCEConfig(element_size_bytes_={}, "
330  "num_inputs_per_output_={}, num_outputs_={}, "
331  "step_input_={}, step_output_={}, ctas_per_output_={}, "
332  "input_mult_={}, output_mult_={}, values_per_thread={}, "
333  "block={}, grid={}, global_memory_size={})",
334  element_size_bytes_, num_inputs_per_output_, num_outputs_,
335  step_input_, step_output_, ctas_per_output_, input_mult_str,
336  output_mult_str, ValuesPerThread(), block_str, grid_str,
337  GlobalMemorySize());
338  return str;
339  }
340 };
341 
342 template <int nt, typename R>
343 CLOUDVIEWER_LAUNCH_BOUNDS_2(nt, 4)
344 __global__ void ReduceKernel(R reduction) {
345  reduction.Run();
346 }
347 
348 template <typename index_t>
349 static OffsetCalculator<2, index_t> MakeOutputCalculator(
350  const Indexer& indexer) {
351  int num_reduction_dims = indexer.NumReductionDims();
352  int num_output_dims = indexer.NumDims() - num_reduction_dims;
353  std::array<const int64_t*, 2> strides = {
354  indexer.GetOutput().byte_strides_ + num_reduction_dims,
355  indexer.GetInput(0).byte_strides_ + num_reduction_dims,
356  };
357  const int64_t* shape = indexer.GetPrimaryShape() + num_reduction_dims;
358  return OffsetCalculator<2, index_t>(num_output_dims, shape, strides.data());
359 }
360 
361 template <typename index_t>
362 static OffsetCalculator<1, index_t> MakeInputCalculator(
363  const Indexer& indexer) {
364  int num_reduction_dims = indexer.NumReductionDims();
365  std::array<const int64_t*, 1> strides = {
366  indexer.GetInput(0).byte_strides_,
367  };
368  return OffsetCalculator<1, index_t>(
369  num_reduction_dims, indexer.GetPrimaryShape(), strides.data());
370 }
371 
372 template <int vt, typename index_t, typename func_t>
373 CLOUDVIEWER_DEVICE void StridedIterate(func_t f,
374  index_t begin,
375  index_t end,
376  index_t stride) {
377  if (begin + (vt - 1) * stride < end) {
378 #pragma unroll
379  for (index_t i = 0; i < vt; i++) {
380  f(i, begin + i * stride);
381  }
382  } else {
383 #pragma unroll
384  for (index_t i = 0; i < vt; i++) {
385  index_t idx = begin + i * stride;
386  if (idx < end) {
387  f(i, idx);
388  }
389  }
390  }
391 }
392 
393 /// Combime() and Reduce() are the same for regular reduction ops.
394 template <typename out_scalar_t, typename func_t>
395 class RegularReduceOps {
396  using arg_t = typename BinaryFunctionTraits<func_t>::arg0_t;
397  using scalar_t = typename BinaryFunctionTraits<func_t>::arg1_t;
398 
399 public:
400  RegularReduceOps(const func_t& op) : reduce_func_(op) {}
401 
402  static inline CLOUDVIEWER_DEVICE out_scalar_t Project(arg_t arg) {
403  return (out_scalar_t)arg;
404  }
405 
406  static inline CLOUDVIEWER_DEVICE arg_t WarpShflDown(arg_t arg, int offset) {
407  return WARP_SHFL_DOWN(arg, offset);
408  }
409 
410  CLOUDVIEWER_DEVICE inline arg_t Combine(arg_t acc, scalar_t val) const {
411  return reduce_func_(acc, val);
412  }
413 
414  /// Idx is ignored for RegularReduceOps.
415  CLOUDVIEWER_DEVICE inline arg_t Reduce(arg_t acc,
416  scalar_t val,
417  int64_t idx) const {
418  return reduce_func_(acc, val);
419  }
420 
421 private:
422  func_t reduce_func_ = nullptr;
423 };
424 
425 template <typename scalar_t, typename func_t>
426 RegularReduceOps<scalar_t, func_t> WrapRegularReduceOps(const func_t& op) {
427  return RegularReduceOps<scalar_t, func_t>{op};
428 }
429 
430 template <typename func_t>
431 class ArgReduceOps {
432  using scalar_t = typename BinaryFunctionTraits<func_t>::arg1_t;
433  using index_t = int64_t;
434  using arg_t = thrust::pair<scalar_t, index_t>;
435 
436 public:
437  ArgReduceOps(const func_t comp_func) : comp_func_(comp_func) {}
438 
439  static CLOUDVIEWER_DEVICE index_t Project(arg_t arg) { return arg.second; }
440 
441  static CLOUDVIEWER_DEVICE arg_t WarpShflDown(arg_t arg, int offset) {
442  return arg_t(WARP_SHFL_DOWN(arg.first, offset),
443  WARP_SHFL_DOWN(arg.second, offset));
444  }
445 
446  /// Combine(pair<val_t, idx_t>, pair<val_t, idx_t>) -> pair<val_t, idx_t>.
447  /// Called at subsequent rounds of reduction, when values are already
448  /// associated with indices.
449  CLOUDVIEWER_DEVICE inline arg_t Combine(arg_t a, arg_t b) const {
450  return comp_func_(a.first, b.first) ? a : b;
451  }
452 
453  /// Reduce(pair<val_t, idx_t>, val_t, idx_t) -> pair<val_t, idx_t>.
454  /// Called at the first round of reduction, when values are not yet
455  /// associated with indices.
456  CLOUDVIEWER_DEVICE inline arg_t Reduce(arg_t arg,
457  scalar_t val,
458  int64_t idx) const {
459  return comp_func_(arg.first, val) ? arg : arg_t(val, idx);
460  }
461 
462 private:
463  func_t comp_func_ = nullptr;
464 };
465 
466 template <typename func_t>
467 ArgReduceOps<func_t> WrapArgReduceOps(const func_t& comp_func) {
468  return ArgReduceOps<func_t>{comp_func};
469 }
470 
471 template <typename scalar_t,
472  typename ops_t,
473  typename index_t,
474  typename out_scalar_t = scalar_t,
475  int vt0 = 4>
476 class ReduceOp {
477  using traits = FunctionTraits<decltype(&ops_t::Reduce)>;
478  using arg_t =
479  typename std::decay<typename traits::template arg<0>::type>::type;
480  using InputCalculator = OffsetCalculator<1, index_t>;
481  using OutputCalculator = OffsetCalculator<2, index_t>;
482 
483 public:
484  ReduceOp(ops_t ops,
485  ReduceConfig config,
486  InputCalculator input_calc,
487  OutputCalculator output_calc,
488  const void* src,
489  char* dst,
490  void* acc_buf,
491  void* cta_buf,
492  int* semaphores,
493  arg_t identity,
494  bool accumulate,
495  bool final_output)
496  : ops_(ops),
497  config_(config),
498  input_calc_(input_calc),
499  output_calc_(output_calc),
500  src_(src),
501  dst_(dst),
502  acc_buf_(acc_buf),
503  cta_buf_(cta_buf),
504  semaphores_(semaphores),
505  identity_(identity),
506  accumulate_(accumulate),
507  final_output_(final_output) {}
508 
509  CLOUDVIEWER_DEVICE void Run() const {
510  extern __shared__ char shared_memory[];
511  index_t output_idx = config_.OutputIdx();
512  index_t input_idx = config_.InputIdx();
513  auto base_offsets = output_calc_.get(output_idx);
514 
515  arg_t value = identity_;
516  if (output_idx < config_.num_outputs_ &&
517  input_idx < config_.num_inputs_per_output_) {
518  auto input_slice = (const char*)src_ + base_offsets[1];
519  value = ThreadReduce((const scalar_t*)input_slice);
520  }
521 
522  if (config_.ShouldBlockYReduce()) {
523  value = BlockYReduce(value, shared_memory);
524  }
525  if (config_.ShouldBlockXReduce()) {
526  value = BlockXReduce(value, shared_memory);
527  }
528 
529  auto out = (out_scalar_t*)((char*)dst_ + base_offsets[0]);
530  arg_t* acc = nullptr;
531  if (acc_buf_ != nullptr) {
532  int64_t numerator = (int64_t)sizeof(arg_t);
533  int64_t denominator = (int64_t)sizeof(out_scalar_t);
534  ReduceFraction(numerator, denominator);
535  acc = (arg_t*)((char*)acc_buf_ +
536  (base_offsets[0] * numerator / denominator));
537  }
538 
539  if (config_.ShouldGlobalReduce()) {
540  value = GlobalReduce(value, acc, shared_memory);
541  } else if (config_.ShouldStore(output_idx)) {
542  if (acc == nullptr) {
543  if (accumulate_) {
544  value = AccumulateInOutput<can_accumulate_in_output>(out,
545  value);
546  }
547  if (final_output_) {
548  SetResultsToOutput(value, base_offsets[0]);
549  } else {
550  *out = GetAccumulatedOutput<can_accumulate_in_output>(
551  out, value);
552  }
553  } else {
554  if (accumulate_) {
555  value = ops_.Combine(*acc, value);
556  }
557  if (final_output_) {
558  SetResultsToOutput(value, base_offsets[0]);
559  } else {
560  *acc = value;
561  }
562  }
563  }
564  }
565 
566  CLOUDVIEWER_DEVICE arg_t ThreadReduce(const scalar_t* data) const {
567  index_t idx = config_.InputIdx();
568  // Multiple accumulators to remove dependency between unrolled loops.
569  arg_t value_list[vt0];
570 #pragma unroll
571  for (int i = 0; i < vt0; i++) {
572  value_list[i] = identity_;
573  }
574  index_t end = config_.num_inputs_per_output_;
575  index_t stride = config_.step_input_;
576  index_t element_stride = input_calc_.strides_[0][0] / sizeof(scalar_t);
577 
578  // Reducing layers of function calls so compiler could do proper loop
579  // unroll that exposes instruction level parallelism.
580  while (idx < config_.num_inputs_per_output_) {
581  // load input
582  utility::MiniVec<scalar_t, vt0> values;
583  if (input_calc_.dims_ == 1) {
584  StridedIterate<vt0>(
585  [&](index_t i, index_t idx) {
586  values[i] = data[idx * element_stride];
587  },
588  idx, end, stride);
589  } else {
590  StridedIterate<vt0>(
591  [&](index_t i, index_t idx) {
592  values[i] = data[input_calc_.get(idx)[0] /
593  sizeof(scalar_t)];
594  },
595  idx, end, stride);
596  }
597  // compute
598  StridedIterate<vt0, index_t>(
599  [&](index_t i, index_t idx) {
600  value_list[i] =
601  ops_.Reduce(value_list[i], values[i], idx);
602  },
603  idx, config_.num_inputs_per_output_, config_.step_input_);
604  // step offset
605  idx += config_.step_input_ * vt0;
606  }
607 #pragma unroll
608  for (int i = 1; i < vt0; i++) {
609  value_list[0] = ops_.Combine(value_list[0], value_list[i]);
610  }
611  return value_list[0];
612  }
613 
614  CLOUDVIEWER_DEVICE arg_t BlockXReduce(arg_t value,
615  char* shared_memory) const {
616  int dim_x = blockDim.x;
617  arg_t* shared = (arg_t*)shared_memory;
618  if (dim_x > warpSize) {
619  int address_base = threadIdx.x + threadIdx.y * blockDim.x;
620  shared[address_base] = value;
621  for (int offset = dim_x / 2; offset >= warpSize; offset >>= 1) {
622  __syncthreads();
623  if (threadIdx.x < offset && threadIdx.x + offset < blockDim.x) {
624  arg_t other = shared[address_base + offset];
625  value = ops_.Combine(value, other);
626  shared[address_base] = value;
627  }
628  }
629  dim_x = warpSize;
630  }
631 
632  __syncthreads();
633 
634  for (int offset = 1; offset < dim_x; offset <<= 1) {
635  arg_t other = ops_.WarpShflDown(value, offset);
636  value = ops_.Combine(value, other);
637  }
638  return value;
639  }
640 
641  CLOUDVIEWER_DEVICE arg_t BlockYReduce(arg_t value,
642  char* shared_memory) const {
643  arg_t* shared = (arg_t*)shared_memory;
644  shared[config_.SharedMemoryOffset(0)] = value;
645  for (int offset = blockDim.y / 2; offset > 0; offset >>= 1) {
646  __syncthreads();
647  if (threadIdx.y < offset && threadIdx.y + offset < blockDim.y) {
648  arg_t other = shared[config_.SharedMemoryOffset(offset)];
649  value = ops_.Combine(value, other);
650  shared[config_.SharedMemoryOffset(0)] = value;
651  }
652  }
653  return value;
654  }
655 
656  CLOUDVIEWER_DEVICE bool MarkBlockFinished() const {
657  __shared__ bool is_last_block_done_shared;
658 
659  __syncthreads();
660  if (threadIdx.x == 0 && threadIdx.y == 0) {
661  int prev_blocks_finished = atomicAdd(&semaphores_[blockIdx.x], 1);
662  is_last_block_done_shared = (prev_blocks_finished == gridDim.y - 1);
663  }
664 
665  __syncthreads();
666 
667  return is_last_block_done_shared;
668  }
669 
670  template <bool can_acc>
671  CLOUDVIEWER_DEVICE arg_t AccumulateInOutput(
672  out_scalar_t* out,
673  arg_t value,
674  typename std::enable_if<can_acc>::type* = nullptr) const {
675  return ops_.Combine(*out, value);
676  }
677 
678  // This function should never be called --
679  // it's the version of `AccumulateInOutput`
680  // when accumulation in the output is not possible.
681  template <bool can_acc>
682  CLOUDVIEWER_DEVICE arg_t AccumulateInOutput(
683  out_scalar_t*,
684  arg_t,
685  typename std::enable_if<!can_acc>::type* = nullptr) const {
686  CLOUDVIEWER_ASSERT(false);
687  return arg_t{};
688  }
689 
690  template <bool can_acc>
691  CLOUDVIEWER_DEVICE out_scalar_t GetAccumulatedOutput(
692  out_scalar_t* out,
693  arg_t value,
694  typename std::enable_if<can_acc>::type* = nullptr) const {
695  CLOUDVIEWER_ASSERT(!final_output_);
696  return (out_scalar_t)value;
697  }
698 
699  // This function should never be called --
700  // it's the version of `GetAccumulatedOutput`
701  // when accumulation in the output is not possible.
702  template <bool can_acc>
703  CLOUDVIEWER_DEVICE out_scalar_t GetAccumulatedOutput(
704  out_scalar_t* out,
705  arg_t value,
706  typename std::enable_if<!can_acc>::type* = nullptr) const {
707  CLOUDVIEWER_ASSERT(false);
708  return *out;
709  }
710 
711  template <class T>
712  CLOUDVIEWER_DEVICE void SetResults(const T x,
713  const index_t base_offset) const {
714  auto res = (out_scalar_t*)((char*)dst_ + base_offset);
715  *res = x;
716  }
717 
718  CLOUDVIEWER_DEVICE void SetResultsToOutput(arg_t value,
719  index_t base_offset) const {
720  CLOUDVIEWER_ASSERT(final_output_);
721  SetResults(ops_.Project(value), base_offset);
722  }
723 
724  CLOUDVIEWER_DEVICE arg_t GlobalReduce(arg_t value,
725  arg_t* acc,
726  char* shared_memory) const {
727  arg_t* reduce_buffer = (arg_t*)cta_buf_;
728  index_t output_idx = config_.OutputIdx();
729  auto base_offsets = output_calc_.get(output_idx);
730  auto out = (out_scalar_t*)((char*)dst_ + base_offsets[0]);
731 
732  bool should_store = config_.ShouldStore(config_.OutputIdx());
733  if (should_store) {
734  index_t offset = config_.StagingMemoryOffset(blockIdx.y);
735  reduce_buffer[offset] = value;
736  }
737 
738  __threadfence(); // make sure writes are globally visible
739  __syncthreads(); // if multiple warps in this block wrote to staging,
740  // make sure they're all done
741  bool is_last_block_done = MarkBlockFinished();
742 
743  if (is_last_block_done) {
744  value = identity_;
745  if (config_.ShouldBlockXReduce()) {
746  index_t input_offset = threadIdx.x + threadIdx.y * blockDim.x;
747  index_t step = blockDim.x * blockDim.y;
748  for (; input_offset < config_.ctas_per_output_;
749  input_offset += step) {
750  index_t idx = config_.StagingMemoryOffset(input_offset);
751  arg_t next = reduce_buffer[idx];
752  value = ops_.Combine(value, next);
753  }
754  } else {
755  index_t input_offset = threadIdx.y;
756  index_t step = blockDim.y;
757  for (; input_offset < config_.ctas_per_output_;
758  input_offset += step) {
759  index_t idx = config_.StagingMemoryOffset(input_offset);
760  arg_t next = reduce_buffer[idx];
761  value = ops_.Combine(value, next);
762  }
763  }
764  value = BlockYReduce(value, shared_memory);
765  if (config_.ShouldBlockXReduce()) {
766  value = BlockXReduce(value, shared_memory);
767  }
768  if (should_store) {
769  if (acc == nullptr) {
770  if (accumulate_) {
771  value = AccumulateInOutput<can_accumulate_in_output>(
772  out, value);
773  }
774  if (final_output_) {
775  SetResultsToOutput(value, base_offsets[0]);
776  } else {
777  *out = GetAccumulatedOutput<can_accumulate_in_output>(
778  out, value);
779  }
780  } else {
781  if (accumulate_) {
782  value = ops_.Combine(*acc, value);
783  }
784  if (final_output_) {
785  SetResultsToOutput(value, base_offsets[0]);
786  } else {
787  *acc = value;
788  }
789  }
790  }
791  }
792 
793  return value;
794  }
795 
796 private:
797  static constexpr bool can_accumulate_in_output =
798  std::is_convertible<arg_t, out_scalar_t>::value &&
799  std::is_convertible<out_scalar_t, arg_t>::value;
800  static constexpr float acc_buffer_multiplier =
801  (float)sizeof(arg_t) / sizeof(out_scalar_t);
802  ops_t ops_;
803  ReduceConfig config_;
804  InputCalculator input_calc_;
805  OutputCalculator output_calc_;
806  const void* src_;
807  const char* dst_;
808  // acc_buf_ used for accumulation among sub Tensor Iterator when
809  // accumulation on output is not permissible
810  void* acc_buf_;
811  // cta_buf_ used for accumulation between blocks during global reduction
812  void* cta_buf_;
813  int* semaphores_;
814  arg_t identity_;
815  bool accumulate_;
816  bool final_output_;
817 };
818 
819 class AccumulationBuffer {
820 public:
821  AccumulationBuffer() {}
822 
823  AccumulationBuffer(int64_t acc_t_size,
824  int64_t out_t_size,
825  char* out_ptr,
826  int64_t size) {
827  out_ptr_ = (char*)out_ptr;
828  if (out_t_size >= acc_t_size) {
829  // reusing output buffer for accumulation.
830  acc_ptr_ = (char*)out_ptr;
831  numerator_ = 1;
832  denominator_ = 1;
833  } else {
834  int device_id = cuda::GetDevice();
835  Device device(Device::DeviceType::CUDA, device_id);
836  buffer_ = std::make_unique<Blob>(size, device);
837  acc_ptr_ = (char*)buffer_->GetDataPtr();
838  numerator_ = acc_t_size;
839  denominator_ = out_t_size;
840  ReduceFraction(numerator_, denominator_);
841  }
842  }
843 
844  char* GetAccSlice(char* out_ptr) {
845  if (numerator_ == -1 || acc_ptr_ == nullptr) {
846  return nullptr;
847  }
848  return acc_ptr_ + ((out_ptr - out_ptr_) * numerator_ / denominator_);
849  }
850 
851 private:
852  std::unique_ptr<Blob> buffer_;
853  char* acc_ptr_ = nullptr;
854  char* out_ptr_ = nullptr;
855  float size_factor_ = -1;
856  int64_t numerator_ = -1;
857  int64_t denominator_ = -1;
858 };
859 
860 class CUDAReductionEngine {
861 public:
862  CUDAReductionEngine(const CUDAReductionEngine&) = delete;
863  CUDAReductionEngine& operator=(const CUDAReductionEngine&) = delete;
864  CUDAReductionEngine(const Indexer& indexer) : indexer_(indexer) {}
865 
866  template <typename func_t, typename scalar_t>
867  void Run(const func_t& reduce_func, scalar_t identity) {
868  if (indexer_.NumWorkloads() == 0) {
869  utility::LogError(
870  "0-sized input should be handled outside of the reudction "
871  "engine.");
872  }
873  if (indexer_.NumInputs() != 1) {
874  utility::LogError("Reduction op must have exactly one input.");
875  }
876 
877  CLOUDVIEWER_ASSERT_HOST_DEVICE_LAMBDA(func_t);
878  using arg0_t = typename BinaryFunctionTraits<func_t>::arg0_t;
879  using arg1_t = typename BinaryFunctionTraits<func_t>::arg1_t;
880  if (!std::is_same<scalar_t, arg0_t>::value ||
881  !std::is_same<scalar_t, arg1_t>::value) {
882  utility::LogError(
883  "Function input type must match with the identity's type.");
884  }
885 
886  using res_t = typename BinaryFunctionTraits<func_t>::res_t;
887  if (std::is_same<res_t, bool>::value) {
888  // func_t is a comparison function (for arg-reduction).
889  // Signature: (scalar_t, scalar_t) -> bool.
890  RunReduce<scalar_t, int64_t>(
891  indexer_, WrapArgReduceOps(reduce_func),
892  thrust::pair<scalar_t, int64_t>(identity, 0));
893  } else {
894  // func_t is a regular reduction function.
895  // Signature: (scalar_t, scalar_t) -> scalar_t.
896  RunReduce<scalar_t, scalar_t>(
897  indexer_, WrapRegularReduceOps<scalar_t>(reduce_func),
898  identity);
899  }
900  }
901 
902 private:
903  /// If the index cannot be represented in 32 bits, RunReduce calls itself
904  /// recursively.
905  template <typename scalar_t,
906  typename out_scalar_t,
907  int vt0 = 4,
908  typename ops_t,
909  typename ident_t>
910  static void RunReduce(Indexer& indexer,
911  const ops_t& ops,
912  ident_t identity,
913  AccumulationBuffer* acc_buf_ptr = nullptr) {
914  using traits = FunctionTraits<decltype(&ops_t::Reduce)>;
915  using arg_t = typename traits::template arg<0>::type;
916  static constexpr bool can_accumulate_in_output =
917  std::is_convertible<arg_t, out_scalar_t>::value;
918 
919  bool can_use_32bit_indexing = indexer.CanUse32BitIndexing();
920  std::unique_ptr<AccumulationBuffer> owned_buf_ptr;
921 
922  // The acc_buf_ptr is a shared pointer. It is create at the first
923  // entrance reused by all recursive function calls.
924  if (acc_buf_ptr == nullptr) {
925  // acc_buf_ptr holds buffer used for accumulation among multiple
926  // sub_iter when accumulation in output is not possible.
927  if (!can_accumulate_in_output && !can_use_32bit_indexing) {
928  int64_t output_memory_size = 1;
929  for (int dim = 0; dim < indexer.NumDims(); dim++) {
930  output_memory_size = std::max(
931  output_memory_size,
932  indexer.GetPrimaryShape()[dim] *
933  indexer.GetOutput().byte_strides_[dim]);
934  }
935  owned_buf_ptr.reset(new AccumulationBuffer(
936  sizeof(arg_t), sizeof(out_scalar_t),
937  (char*)indexer.GetOutput().data_ptr_,
938  output_memory_size * sizeof(arg_t)));
939  } else {
940  owned_buf_ptr.reset(new AccumulationBuffer());
941  }
942  acc_buf_ptr = owned_buf_ptr.get();
943  }
944 
945  if (!can_use_32bit_indexing) {
946  for (auto& sub_indexer : indexer.SplitTo32BitIndexing()) {
947  RunReduce<scalar_t, out_scalar_t, vt0>(sub_indexer, ops,
948  identity, acc_buf_ptr);
949  }
950  return;
951  }
952 
953  ReduceConfig config(sizeof(arg_t), indexer);
954 
955  std::unique_ptr<Blob> buffer_blob;
956  std::unique_ptr<Blob> semaphores_blob;
957  void* buffer = nullptr;
958  void* semaphores = nullptr;
959  if (config.ShouldGlobalReduce()) {
960  int device_id = cuda::GetDevice();
961  Device device(Device::DeviceType::CUDA, device_id);
962 
963  buffer_blob =
964  std::make_unique<Blob>(config.GlobalMemorySize(), device);
965  semaphores_blob =
966  std::make_unique<Blob>(config.SemaphoreSize(), device);
967  buffer = buffer_blob->GetDataPtr();
968  semaphores = semaphores_blob->GetDataPtr();
969  CLOUDVIEWER_CUDA_CHECK(
970  cudaMemset(semaphores, 0, config.SemaphoreSize()));
971  }
972 
973  CLOUDVIEWER_ASSERT(can_use_32bit_indexing);
974  const char* in_data = (char*)indexer.GetInput(0).data_ptr_;
975  char* out_data = (char*)indexer.GetOutput().data_ptr_;
976  char* acc_data = acc_buf_ptr->GetAccSlice(out_data);
977  auto output_calc = MakeOutputCalculator<uint32_t>(indexer);
978  auto input_calc = MakeInputCalculator<uint32_t>(indexer);
979 
980  auto reduce_op = ReduceOp<scalar_t, ops_t, uint32_t, out_scalar_t, vt0>(
981  ops, config, input_calc, output_calc, in_data, out_data,
982  acc_data, buffer, (int*)semaphores, identity,
983  indexer.ShouldAccumulate(), indexer.IsFinalOutput());
984 
985  // Launch reduce kernel
986  int shared_memory = config.SharedMemorySize();
987  ReduceKernel<ReduceConfig::MAX_NUM_THREADS>
988  <<<config.GridDim(), config.BlockDim(), shared_memory,
989  core::cuda::GetStream()>>>(reduce_op);
990  cuda::Synchronize();
991  CLOUDVIEWER_CUDA_CHECK(cudaGetLastError());
992  }
993 
994 private:
995  Indexer indexer_;
996 };
997 
998 void ReductionCUDA(const Tensor& src,
999  Tensor& dst,
1000  const SizeVector& dims,
1001  bool keepdim,
1002  ReductionOpCode op_code) {
1003  if (s_regular_reduce_ops.find(op_code) != s_regular_reduce_ops.end()) {
1004  Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
1005  CUDAReductionEngine re(indexer);
1006  Dtype dtype = src.GetDtype();
1007 
1008  CUDAScopedDevice scoped_device(src.GetDevice());
1009  DISPATCH_DTYPE_TO_TEMPLATE(dtype, [&]() {
1010  switch (op_code) {
1011  case ReductionOpCode::Sum:
1012  if (indexer.NumWorkloads() == 0) {
1013  // 0-sized input can be reduced to non-0-sized outputs,
1014  // where identity elements should be filled.
1015  // E.g. np.sum(np.ones((0, 5)), axis=0).shape == (5,).
1016  dst.Fill(0);
1017  } else {
1018  re.Run(
1019  [] CLOUDVIEWER_HOST_DEVICE(
1020  scalar_t a, scalar_t b) -> scalar_t {
1021  return a + b;
1022  },
1023  static_cast<scalar_t>(0));
1024  }
1025  break;
1026  case ReductionOpCode::Prod:
1027  if (indexer.NumWorkloads() == 0) {
1028  dst.Fill(1);
1029  } else {
1030  re.Run(
1031  [] CLOUDVIEWER_HOST_DEVICE(
1032  scalar_t a, scalar_t b) -> scalar_t {
1033  return a * b;
1034  },
1035  static_cast<scalar_t>(1));
1036  }
1037  break;
1038  case ReductionOpCode::Min:
1039  if (indexer.NumWorkloads() == 0) {
1040  utility::LogError(
1041  "Zero-size Tensor does not suport Min.");
1042  } else {
1043  re.Run(
1044  [] CLOUDVIEWER_HOST_DEVICE(
1045  scalar_t a, scalar_t b) -> scalar_t {
1046  return a < b ? a : b;
1047  },
1048  static_cast<scalar_t>(
1049  std::numeric_limits<scalar_t>::max()));
1050  }
1051  break;
1052  case ReductionOpCode::Max:
1053  if (indexer.NumWorkloads() == 0) {
1054  utility::LogError(
1055  "Zero-size Tensor does not suport Max.");
1056  } else {
1057  re.Run(
1058  [] CLOUDVIEWER_HOST_DEVICE(
1059  scalar_t a, scalar_t b) -> scalar_t {
1060  return a > b ? a : b;
1061  },
1062  static_cast<scalar_t>(std::numeric_limits<
1063  scalar_t>::lowest()));
1064  }
1065  break;
1066  default:
1067  utility::LogError("Unsupported op code.");
1068  break;
1069  }
1070  });
1071  } else if (s_arg_reduce_ops.find(op_code) != s_arg_reduce_ops.end()) {
1072  if (dst.GetDtype() != core::Int64) {
1073  utility::LogError("Arg-reduction must have int64 output dtype.");
1074  }
1075  Indexer indexer({src}, dst, DtypePolicy::INPUT_SAME, dims);
1076  CUDAReductionEngine re(indexer);
1077  Dtype dtype = src.GetDtype();
1078 
1079  CUDAScopedDevice scoped_device(src.GetDevice());
1080  DISPATCH_DTYPE_TO_TEMPLATE(dtype, [&]() {
1081  switch (op_code) {
1082  case ReductionOpCode::ArgMin:
1083  if (indexer.NumWorkloads() == 0) {
1084  utility::LogError(
1085  "Zero-size Tensor does not suport ArgMin.");
1086  } else {
1087  re.Run([] CLOUDVIEWER_HOST_DEVICE(
1088  scalar_t a,
1089  scalar_t b) -> bool { return a < b; },
1090  static_cast<scalar_t>(
1091  std::numeric_limits<scalar_t>::max()));
1092  }
1093  break;
1094  case ReductionOpCode::ArgMax:
1095  if (indexer.NumWorkloads() == 0) {
1096  utility::LogError(
1097  "Zero-size Tensor does not suport ArgMax.");
1098  } else {
1099  re.Run([] CLOUDVIEWER_HOST_DEVICE(
1100  scalar_t a,
1101  scalar_t b) -> bool { return a > b; },
1102  static_cast<scalar_t>(std::numeric_limits<
1103  scalar_t>::lowest()));
1104  }
1105  break;
1106  default:
1107  utility::LogError("Unsupported op code.");
1108  break;
1109  }
1110  });
1111  } else if (s_boolean_reduce_ops.find(op_code) !=
1112  s_boolean_reduce_ops.end()) {
1113  if (src.GetDtype() != core::Bool) {
1114  utility::LogError(
1115  "Boolean reduction only supports boolean input tensor.");
1116  }
1117  if (dst.GetDtype() != core::Bool) {
1118  utility::LogError(
1119  "Boolean reduction only supports boolean output tensor.");
1120  }
1121  Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
1122  CUDAReductionEngine re(indexer);
1123 
1124  CUDAScopedDevice scoped_device(src.GetDevice());
1125  switch (op_code) {
1126  case ReductionOpCode::All:
1127  if (indexer.NumWorkloads() == 0) {
1128  dst.Fill(true);
1129  } else {
1130  re.Run([] CLOUDVIEWER_HOST_DEVICE(uint8_t a, uint8_t b)
1131  -> uint8_t { return a && b; },
1132  static_cast<uint8_t>(true));
1133  }
1134  break;
1135  case ReductionOpCode::Any:
1136  if (indexer.NumWorkloads() == 0) {
1137  dst.Fill(false);
1138  } else {
1139  re.Run([] CLOUDVIEWER_HOST_DEVICE(uint8_t a, uint8_t b)
1140  -> uint8_t { return a || b; },
1141  static_cast<uint8_t>(false));
1142  }
1143  break;
1144  default:
1145  utility::LogError("Unsupported op code.");
1146  break;
1147  }
1148  } else {
1149  utility::LogError("Unsupported op code.");
1150  }
1151 }
1152 
1153 } // namespace kernel
1154 } // namespace core
1155 } // namespace cloudViewer