1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
10 #include <cuda_runtime.h>
11 #include <thrust/pair.h>
12 #include <thrust/tuple.h>
19 #include <type_traits>
21 #include "cloudViewer/core/CUDAUtils.h"
22 #include "core/Blob.h"
23 #include "core/Device.h"
24 #include "core/Dispatch.h"
25 #include "core/FunctionTraits.h"
26 #include "core/Indexer.h"
27 #include "core/MemoryManager.h"
28 #include "core/ParallelFor.h"
29 #include "core/SizeVector.h"
30 #include "core/Tensor.h"
31 #include "core/kernel/Reduction.h"
33 // CUDA reduction is based on PyTorch's CUDA reduction implementation.
34 // See: aten/src/ATen/native/cuda/Reduce.cuh
36 #if __CUDA_ARCH__ >= 750
37 constexpr uint32_t CUDA_MAX_THREADS_PER_SM = 1024;
39 constexpr uint32_t CUDA_MAX_THREADS_PER_SM = 2048;
42 constexpr uint32_t CUDA_MAX_THREADS_PER_BLOCK = 1024;
43 constexpr uint32_t CUDA_THREADS_PER_BLOCK_FALLBACK = 256;
45 #define CLOUDVIEWER_MAX_THREADS_PER_BLOCK(val) \
46 (((val) <= CUDA_MAX_THREADS_PER_BLOCK) ? (val) \
47 : CUDA_THREADS_PER_BLOCK_FALLBACK)
48 #define CLOUDVIEWER_MIN_BLOCKS_PER_SM(threads_per_block, blocks_per_sm) \
49 ((((threads_per_block) * (blocks_per_sm) <= CUDA_MAX_THREADS_PER_SM) \
51 : ((CUDA_MAX_THREADS_PER_SM + (threads_per_block) - 1) / \
52 (threads_per_block))))
54 #define CLOUDVIEWER_LAUNCH_BOUNDS_2(max_threads_per_block, min_blocks_per_sm) \
56 (CLOUDVIEWER_MAX_THREADS_PER_BLOCK((max_threads_per_block))), \
57 (CLOUDVIEWER_MIN_BLOCKS_PER_SM((max_threads_per_block), \
58 (min_blocks_per_sm))))
61 CLOUDVIEWER_DEVICE __forceinline__ T
62 WARP_SHFL_DOWN(T value,
65 unsigned int mask = 0xffffffff) {
66 #if CUDA_VERSION >= 9000
67 return __shfl_down_sync(mask, value, delta, width);
69 return __shfl_down(value, delta, width);
73 namespace cloudViewer {
77 static inline int64_t DivUp(int64_t a, int64_t b) { return (a + b - 1) / b; }
79 // Returns reduced fraction numerator & denominator
80 CLOUDVIEWER_HOST_DEVICE static void ReduceFraction(int64_t& numerator,
81 int64_t& denominator) {
82 // Get GCD of num and denom using Euclid's algorithm.
83 // Can replace this with std::gcd if we ever support c++17.
84 int64_t a = denominator;
85 int64_t b = numerator;
99 static constexpr int BLOCK_X = 0;
100 static constexpr int BLOCK_Y = 1;
101 static constexpr int CTA = 2;
102 static constexpr int MAX_NUM_THREADS = 512;
104 int num_inputs_per_output_;
107 int step_output_ = 1;
108 int ctas_per_output_ = 1;
111 int element_size_bytes_;
112 int input_mult_[3] = {0, 0, 0};
113 int output_mult_[2] = {0, 0};
119 ReduceConfig(int element_size_bytes, const Indexer& indexer)
120 : element_size_bytes_(element_size_bytes) {
121 num_outputs_ = indexer.NumOutputElements();
122 num_inputs_per_output_ = indexer.NumWorkloads() / num_outputs_;
124 // Adjust block size to map block width to fastest changing dimension of
125 // input tensor. This grants the best possible memory accessing pattern,
126 // given that for non-contiguous tensor with space in between, we cannot
127 // have perfect memory coalescing.
128 bool reduction_on_fastest_striding_dimension =
129 (indexer.NumReductionDims() == indexer.NumDims()) ||
130 (indexer.GetInput(0).byte_strides_[0] <
131 indexer.GetInput(0).byte_strides_[indexer.NumReductionDims()]);
133 // Notice that dim0 & dim1 does NOT guarantee any launch configuration
134 // here! dim0 & dim1 are more like the upper bound of the block
135 // dimension. The actual launch config and reduction scheme is
136 // determined by setting values to `input_mult_` and
137 // `output_mult_`. We try to max out dim1 so that we have enough
138 // threads per CTA to deliver performance for larger problem size.
142 if (reduction_on_fastest_striding_dimension) {
143 // Map block.x to the fastest reducing dimension. It implies:
144 // 1. BlockXReduce is required.
145 // 2. block.y now max out to num_outputs.
146 dim0 = indexer.GetPrimaryShape()[0];
149 // Map block.x to the fastest non reducing dimension. It implies:
150 // 1. BlockXReduce is turned off.
151 // 2. block.y now max out to num_inputs_per_output_.
152 dim0 = indexer.GetPrimaryShape()[indexer.NumReductionDims()];
153 dim1 = num_inputs_per_output_;
156 // Adjust block_width and block_height
157 SetBlockDimension(dim0, dim1);
159 int block_width = block_width_;
160 int block_height = block_height_;
162 if (indexer.NumDims() == 0 || reduction_on_fastest_striding_dimension) {
163 // Split the input across lanes if the input is contiguous in the
164 // reduced dimension. This will require reduction between threads
165 // using warp shuffle instructions and shared memory (if
166 // block_width > warpSize).
167 input_mult_[0] = SplitInput(block_width);
169 // Otherwise split the output across lanes in a warp.
170 output_mult_[0] = SplitOutput(block_width);
173 if (ValuesPerThread() >= block_height * 16 ||
174 ValuesPerThread() >= 256) {
175 // Divide the input across warps in a thread-block, if that leaves
176 // at least 16 elements to be summed by each thread. This will
177 // require inter-warp reduction using shared memory.
178 input_mult_[1] = SplitInput(block_height);
180 // Otherwise, each warp handles a separate output.
181 output_mult_[1] = SplitOutput(block_height);
184 if (input_mult_[1] != 0 && ValuesPerThread() >= 256 &&
185 num_outputs_ <= 4096) {
186 // Divide the input across thread-blocks if the amount of work
187 // per-thread is large enough and the size of the output is small
188 // enough. This will require a reduction using global memory.
189 ctas_per_output_ = DivUp(ValuesPerThread(), 16);
190 if (ctas_per_output_ > 65535) {
191 ctas_per_output_ = 65535;
193 input_mult_[2] = SplitInput(ctas_per_output_);
197 /// Returns floor(log2(n))
198 static inline int LastPow2(int n) {
199 // Dtype.h asserts sizeof(int) == 4.
205 return std::max(1, n - (n >> 1));
208 void SetBlockDimension(int64_t dim0, int64_t dim1) {
209 int dim0_pow2 = dim0 < MAX_NUM_THREADS
210 ? static_cast<int>(LastPow2(dim0))
212 int dim1_pow2 = dim1 < MAX_NUM_THREADS
213 ? static_cast<int>(LastPow2(dim1))
215 block_width_ = std::min(dim0_pow2, GetCUDACurrentWarpSize());
217 std::min(dim1_pow2, int(MAX_NUM_THREADS / block_width_));
219 std::min(dim0_pow2, int(MAX_NUM_THREADS / block_height_));
220 num_threads_ = block_width_ * block_height_;
223 int SplitInput(int parallelism) {
224 int step = step_input_;
225 step_input_ *= parallelism;
229 int SplitOutput(int parallelism) {
230 int step = step_output_;
231 step_output_ *= parallelism;
235 dim3 BlockDim() const { return dim3(block_width_, block_height_); }
237 dim3 GridDim() const {
238 return dim3(DivUp(num_outputs_, step_output_), ctas_per_output_);
241 CLOUDVIEWER_HOST_DEVICE bool ShouldBlockXReduce() const {
242 return input_mult_[BLOCK_X] != 0;
245 CLOUDVIEWER_HOST_DEVICE bool ShouldBlockYReduce() const {
246 return input_mult_[BLOCK_Y] != 0;
249 CLOUDVIEWER_HOST_DEVICE bool ShouldGlobalReduce() const {
250 return input_mult_[CTA] != 0;
253 CLOUDVIEWER_DEVICE bool ShouldStore(int output_idx) const {
254 return output_idx < num_outputs_ &&
255 (!ShouldBlockXReduce() || threadIdx.x == 0) &&
256 (!ShouldBlockYReduce() || threadIdx.y == 0);
259 CLOUDVIEWER_HOST_DEVICE int InputIdx() const {
260 int lane = threadIdx.x;
261 int warp = threadIdx.y;
262 int cta2 = blockIdx.y;
263 return (lane * input_mult_[BLOCK_X] + warp * input_mult_[BLOCK_Y] +
264 cta2 * input_mult_[CTA]);
267 CLOUDVIEWER_HOST_DEVICE int OutputIdx() const {
268 int lane = threadIdx.x;
269 int warp = threadIdx.y;
270 int cta1 = blockIdx.x;
271 return (lane * output_mult_[BLOCK_X] + warp * output_mult_[BLOCK_Y] +
272 cta1 * step_output_);
275 CLOUDVIEWER_DEVICE int SharedMemoryOffset(int offset) const {
276 return threadIdx.x + (threadIdx.y + offset) * blockDim.x;
279 CLOUDVIEWER_DEVICE int StagingMemoryOffset(int cta2) const {
280 int offset = cta2 + blockIdx.x * gridDim.y;
281 if (!ShouldBlockXReduce()) {
282 offset = threadIdx.x + offset * blockDim.x;
287 int SharedMemorySize() const {
288 if (!ShouldBlockYReduce() &&
289 (!ShouldBlockXReduce() ||
290 block_width_ <= GetCUDACurrentWarpSize())) {
293 return element_size_bytes_ * num_threads_;
296 int64_t GlobalMemorySize() const {
297 if (!ShouldGlobalReduce()) {
301 (int64_t)element_size_bytes_ * num_outputs_ * ctas_per_output_;
302 if (!ShouldBlockXReduce()) {
303 size *= BlockDim().x;
308 int SemaphoreSize() const {
309 if (!ShouldGlobalReduce()) {
312 return sizeof(int) * GridDim().x;
315 int ValuesPerThread() const {
316 return DivUp(num_inputs_per_output_, step_input_);
319 std::string ToString() const {
320 std::string input_mult_str = fmt::format(
321 "[{},{},{}]", input_mult_[0], input_mult_[1], input_mult_[2]);
322 std::string output_mult_str =
323 fmt::format("[{},{}]", output_mult_[0], output_mult_[1]);
324 std::string block_str = fmt::format("[{},{},{}]", BlockDim().x,
325 BlockDim().y, BlockDim().z);
326 std::string grid_str = fmt::format("[{},{},{}]", GridDim().x,
327 GridDim().y, GridDim().z);
328 std::string str = fmt::format(
329 "REDUCEConfig(element_size_bytes_={}, "
330 "num_inputs_per_output_={}, num_outputs_={}, "
331 "step_input_={}, step_output_={}, ctas_per_output_={}, "
332 "input_mult_={}, output_mult_={}, values_per_thread={}, "
333 "block={}, grid={}, global_memory_size={})",
334 element_size_bytes_, num_inputs_per_output_, num_outputs_,
335 step_input_, step_output_, ctas_per_output_, input_mult_str,
336 output_mult_str, ValuesPerThread(), block_str, grid_str,
342 template <int nt, typename R>
343 CLOUDVIEWER_LAUNCH_BOUNDS_2(nt, 4)
344 __global__ void ReduceKernel(R reduction) {
348 template <typename index_t>
349 static OffsetCalculator<2, index_t> MakeOutputCalculator(
350 const Indexer& indexer) {
351 int num_reduction_dims = indexer.NumReductionDims();
352 int num_output_dims = indexer.NumDims() - num_reduction_dims;
353 std::array<const int64_t*, 2> strides = {
354 indexer.GetOutput().byte_strides_ + num_reduction_dims,
355 indexer.GetInput(0).byte_strides_ + num_reduction_dims,
357 const int64_t* shape = indexer.GetPrimaryShape() + num_reduction_dims;
358 return OffsetCalculator<2, index_t>(num_output_dims, shape, strides.data());
361 template <typename index_t>
362 static OffsetCalculator<1, index_t> MakeInputCalculator(
363 const Indexer& indexer) {
364 int num_reduction_dims = indexer.NumReductionDims();
365 std::array<const int64_t*, 1> strides = {
366 indexer.GetInput(0).byte_strides_,
368 return OffsetCalculator<1, index_t>(
369 num_reduction_dims, indexer.GetPrimaryShape(), strides.data());
372 template <int vt, typename index_t, typename func_t>
373 CLOUDVIEWER_DEVICE void StridedIterate(func_t f,
377 if (begin + (vt - 1) * stride < end) {
379 for (index_t i = 0; i < vt; i++) {
380 f(i, begin + i * stride);
384 for (index_t i = 0; i < vt; i++) {
385 index_t idx = begin + i * stride;
393 /// Combime() and Reduce() are the same for regular reduction ops.
394 template <typename out_scalar_t, typename func_t>
395 class RegularReduceOps {
396 using arg_t = typename BinaryFunctionTraits<func_t>::arg0_t;
397 using scalar_t = typename BinaryFunctionTraits<func_t>::arg1_t;
400 RegularReduceOps(const func_t& op) : reduce_func_(op) {}
402 static inline CLOUDVIEWER_DEVICE out_scalar_t Project(arg_t arg) {
403 return (out_scalar_t)arg;
406 static inline CLOUDVIEWER_DEVICE arg_t WarpShflDown(arg_t arg, int offset) {
407 return WARP_SHFL_DOWN(arg, offset);
410 CLOUDVIEWER_DEVICE inline arg_t Combine(arg_t acc, scalar_t val) const {
411 return reduce_func_(acc, val);
414 /// Idx is ignored for RegularReduceOps.
415 CLOUDVIEWER_DEVICE inline arg_t Reduce(arg_t acc,
418 return reduce_func_(acc, val);
422 func_t reduce_func_ = nullptr;
425 template <typename scalar_t, typename func_t>
426 RegularReduceOps<scalar_t, func_t> WrapRegularReduceOps(const func_t& op) {
427 return RegularReduceOps<scalar_t, func_t>{op};
430 template <typename func_t>
432 using scalar_t = typename BinaryFunctionTraits<func_t>::arg1_t;
433 using index_t = int64_t;
434 using arg_t = thrust::pair<scalar_t, index_t>;
437 ArgReduceOps(const func_t comp_func) : comp_func_(comp_func) {}
439 static CLOUDVIEWER_DEVICE index_t Project(arg_t arg) { return arg.second; }
441 static CLOUDVIEWER_DEVICE arg_t WarpShflDown(arg_t arg, int offset) {
442 return arg_t(WARP_SHFL_DOWN(arg.first, offset),
443 WARP_SHFL_DOWN(arg.second, offset));
446 /// Combine(pair<val_t, idx_t>, pair<val_t, idx_t>) -> pair<val_t, idx_t>.
447 /// Called at subsequent rounds of reduction, when values are already
448 /// associated with indices.
449 CLOUDVIEWER_DEVICE inline arg_t Combine(arg_t a, arg_t b) const {
450 return comp_func_(a.first, b.first) ? a : b;
453 /// Reduce(pair<val_t, idx_t>, val_t, idx_t) -> pair<val_t, idx_t>.
454 /// Called at the first round of reduction, when values are not yet
455 /// associated with indices.
456 CLOUDVIEWER_DEVICE inline arg_t Reduce(arg_t arg,
459 return comp_func_(arg.first, val) ? arg : arg_t(val, idx);
463 func_t comp_func_ = nullptr;
466 template <typename func_t>
467 ArgReduceOps<func_t> WrapArgReduceOps(const func_t& comp_func) {
468 return ArgReduceOps<func_t>{comp_func};
471 template <typename scalar_t,
474 typename out_scalar_t = scalar_t,
477 using traits = FunctionTraits<decltype(&ops_t::Reduce)>;
479 typename std::decay<typename traits::template arg<0>::type>::type;
480 using InputCalculator = OffsetCalculator<1, index_t>;
481 using OutputCalculator = OffsetCalculator<2, index_t>;
486 InputCalculator input_calc,
487 OutputCalculator output_calc,
498 input_calc_(input_calc),
499 output_calc_(output_calc),
504 semaphores_(semaphores),
506 accumulate_(accumulate),
507 final_output_(final_output) {}
509 CLOUDVIEWER_DEVICE void Run() const {
510 extern __shared__ char shared_memory[];
511 index_t output_idx = config_.OutputIdx();
512 index_t input_idx = config_.InputIdx();
513 auto base_offsets = output_calc_.get(output_idx);
515 arg_t value = identity_;
516 if (output_idx < config_.num_outputs_ &&
517 input_idx < config_.num_inputs_per_output_) {
518 auto input_slice = (const char*)src_ + base_offsets[1];
519 value = ThreadReduce((const scalar_t*)input_slice);
522 if (config_.ShouldBlockYReduce()) {
523 value = BlockYReduce(value, shared_memory);
525 if (config_.ShouldBlockXReduce()) {
526 value = BlockXReduce(value, shared_memory);
529 auto out = (out_scalar_t*)((char*)dst_ + base_offsets[0]);
530 arg_t* acc = nullptr;
531 if (acc_buf_ != nullptr) {
532 int64_t numerator = (int64_t)sizeof(arg_t);
533 int64_t denominator = (int64_t)sizeof(out_scalar_t);
534 ReduceFraction(numerator, denominator);
535 acc = (arg_t*)((char*)acc_buf_ +
536 (base_offsets[0] * numerator / denominator));
539 if (config_.ShouldGlobalReduce()) {
540 value = GlobalReduce(value, acc, shared_memory);
541 } else if (config_.ShouldStore(output_idx)) {
542 if (acc == nullptr) {
544 value = AccumulateInOutput<can_accumulate_in_output>(out,
548 SetResultsToOutput(value, base_offsets[0]);
550 *out = GetAccumulatedOutput<can_accumulate_in_output>(
555 value = ops_.Combine(*acc, value);
558 SetResultsToOutput(value, base_offsets[0]);
566 CLOUDVIEWER_DEVICE arg_t ThreadReduce(const scalar_t* data) const {
567 index_t idx = config_.InputIdx();
568 // Multiple accumulators to remove dependency between unrolled loops.
569 arg_t value_list[vt0];
571 for (int i = 0; i < vt0; i++) {
572 value_list[i] = identity_;
574 index_t end = config_.num_inputs_per_output_;
575 index_t stride = config_.step_input_;
576 index_t element_stride = input_calc_.strides_[0][0] / sizeof(scalar_t);
578 // Reducing layers of function calls so compiler could do proper loop
579 // unroll that exposes instruction level parallelism.
580 while (idx < config_.num_inputs_per_output_) {
582 utility::MiniVec<scalar_t, vt0> values;
583 if (input_calc_.dims_ == 1) {
585 [&](index_t i, index_t idx) {
586 values[i] = data[idx * element_stride];
591 [&](index_t i, index_t idx) {
592 values[i] = data[input_calc_.get(idx)[0] /
598 StridedIterate<vt0, index_t>(
599 [&](index_t i, index_t idx) {
601 ops_.Reduce(value_list[i], values[i], idx);
603 idx, config_.num_inputs_per_output_, config_.step_input_);
605 idx += config_.step_input_ * vt0;
608 for (int i = 1; i < vt0; i++) {
609 value_list[0] = ops_.Combine(value_list[0], value_list[i]);
611 return value_list[0];
614 CLOUDVIEWER_DEVICE arg_t BlockXReduce(arg_t value,
615 char* shared_memory) const {
616 int dim_x = blockDim.x;
617 arg_t* shared = (arg_t*)shared_memory;
618 if (dim_x > warpSize) {
619 int address_base = threadIdx.x + threadIdx.y * blockDim.x;
620 shared[address_base] = value;
621 for (int offset = dim_x / 2; offset >= warpSize; offset >>= 1) {
623 if (threadIdx.x < offset && threadIdx.x + offset < blockDim.x) {
624 arg_t other = shared[address_base + offset];
625 value = ops_.Combine(value, other);
626 shared[address_base] = value;
634 for (int offset = 1; offset < dim_x; offset <<= 1) {
635 arg_t other = ops_.WarpShflDown(value, offset);
636 value = ops_.Combine(value, other);
641 CLOUDVIEWER_DEVICE arg_t BlockYReduce(arg_t value,
642 char* shared_memory) const {
643 arg_t* shared = (arg_t*)shared_memory;
644 shared[config_.SharedMemoryOffset(0)] = value;
645 for (int offset = blockDim.y / 2; offset > 0; offset >>= 1) {
647 if (threadIdx.y < offset && threadIdx.y + offset < blockDim.y) {
648 arg_t other = shared[config_.SharedMemoryOffset(offset)];
649 value = ops_.Combine(value, other);
650 shared[config_.SharedMemoryOffset(0)] = value;
656 CLOUDVIEWER_DEVICE bool MarkBlockFinished() const {
657 __shared__ bool is_last_block_done_shared;
660 if (threadIdx.x == 0 && threadIdx.y == 0) {
661 int prev_blocks_finished = atomicAdd(&semaphores_[blockIdx.x], 1);
662 is_last_block_done_shared = (prev_blocks_finished == gridDim.y - 1);
667 return is_last_block_done_shared;
670 template <bool can_acc>
671 CLOUDVIEWER_DEVICE arg_t AccumulateInOutput(
674 typename std::enable_if<can_acc>::type* = nullptr) const {
675 return ops_.Combine(*out, value);
678 // This function should never be called --
679 // it's the version of `AccumulateInOutput`
680 // when accumulation in the output is not possible.
681 template <bool can_acc>
682 CLOUDVIEWER_DEVICE arg_t AccumulateInOutput(
685 typename std::enable_if<!can_acc>::type* = nullptr) const {
686 CLOUDVIEWER_ASSERT(false);
690 template <bool can_acc>
691 CLOUDVIEWER_DEVICE out_scalar_t GetAccumulatedOutput(
694 typename std::enable_if<can_acc>::type* = nullptr) const {
695 CLOUDVIEWER_ASSERT(!final_output_);
696 return (out_scalar_t)value;
699 // This function should never be called --
700 // it's the version of `GetAccumulatedOutput`
701 // when accumulation in the output is not possible.
702 template <bool can_acc>
703 CLOUDVIEWER_DEVICE out_scalar_t GetAccumulatedOutput(
706 typename std::enable_if<!can_acc>::type* = nullptr) const {
707 CLOUDVIEWER_ASSERT(false);
712 CLOUDVIEWER_DEVICE void SetResults(const T x,
713 const index_t base_offset) const {
714 auto res = (out_scalar_t*)((char*)dst_ + base_offset);
718 CLOUDVIEWER_DEVICE void SetResultsToOutput(arg_t value,
719 index_t base_offset) const {
720 CLOUDVIEWER_ASSERT(final_output_);
721 SetResults(ops_.Project(value), base_offset);
724 CLOUDVIEWER_DEVICE arg_t GlobalReduce(arg_t value,
726 char* shared_memory) const {
727 arg_t* reduce_buffer = (arg_t*)cta_buf_;
728 index_t output_idx = config_.OutputIdx();
729 auto base_offsets = output_calc_.get(output_idx);
730 auto out = (out_scalar_t*)((char*)dst_ + base_offsets[0]);
732 bool should_store = config_.ShouldStore(config_.OutputIdx());
734 index_t offset = config_.StagingMemoryOffset(blockIdx.y);
735 reduce_buffer[offset] = value;
738 __threadfence(); // make sure writes are globally visible
739 __syncthreads(); // if multiple warps in this block wrote to staging,
740 // make sure they're all done
741 bool is_last_block_done = MarkBlockFinished();
743 if (is_last_block_done) {
745 if (config_.ShouldBlockXReduce()) {
746 index_t input_offset = threadIdx.x + threadIdx.y * blockDim.x;
747 index_t step = blockDim.x * blockDim.y;
748 for (; input_offset < config_.ctas_per_output_;
749 input_offset += step) {
750 index_t idx = config_.StagingMemoryOffset(input_offset);
751 arg_t next = reduce_buffer[idx];
752 value = ops_.Combine(value, next);
755 index_t input_offset = threadIdx.y;
756 index_t step = blockDim.y;
757 for (; input_offset < config_.ctas_per_output_;
758 input_offset += step) {
759 index_t idx = config_.StagingMemoryOffset(input_offset);
760 arg_t next = reduce_buffer[idx];
761 value = ops_.Combine(value, next);
764 value = BlockYReduce(value, shared_memory);
765 if (config_.ShouldBlockXReduce()) {
766 value = BlockXReduce(value, shared_memory);
769 if (acc == nullptr) {
771 value = AccumulateInOutput<can_accumulate_in_output>(
775 SetResultsToOutput(value, base_offsets[0]);
777 *out = GetAccumulatedOutput<can_accumulate_in_output>(
782 value = ops_.Combine(*acc, value);
785 SetResultsToOutput(value, base_offsets[0]);
797 static constexpr bool can_accumulate_in_output =
798 std::is_convertible<arg_t, out_scalar_t>::value &&
799 std::is_convertible<out_scalar_t, arg_t>::value;
800 static constexpr float acc_buffer_multiplier =
801 (float)sizeof(arg_t) / sizeof(out_scalar_t);
803 ReduceConfig config_;
804 InputCalculator input_calc_;
805 OutputCalculator output_calc_;
808 // acc_buf_ used for accumulation among sub Tensor Iterator when
809 // accumulation on output is not permissible
811 // cta_buf_ used for accumulation between blocks during global reduction
819 class AccumulationBuffer {
821 AccumulationBuffer() {}
823 AccumulationBuffer(int64_t acc_t_size,
827 out_ptr_ = (char*)out_ptr;
828 if (out_t_size >= acc_t_size) {
829 // reusing output buffer for accumulation.
830 acc_ptr_ = (char*)out_ptr;
834 int device_id = cuda::GetDevice();
835 Device device(Device::DeviceType::CUDA, device_id);
836 buffer_ = std::make_unique<Blob>(size, device);
837 acc_ptr_ = (char*)buffer_->GetDataPtr();
838 numerator_ = acc_t_size;
839 denominator_ = out_t_size;
840 ReduceFraction(numerator_, denominator_);
844 char* GetAccSlice(char* out_ptr) {
845 if (numerator_ == -1 || acc_ptr_ == nullptr) {
848 return acc_ptr_ + ((out_ptr - out_ptr_) * numerator_ / denominator_);
852 std::unique_ptr<Blob> buffer_;
853 char* acc_ptr_ = nullptr;
854 char* out_ptr_ = nullptr;
855 float size_factor_ = -1;
856 int64_t numerator_ = -1;
857 int64_t denominator_ = -1;
860 class CUDAReductionEngine {
862 CUDAReductionEngine(const CUDAReductionEngine&) = delete;
863 CUDAReductionEngine& operator=(const CUDAReductionEngine&) = delete;
864 CUDAReductionEngine(const Indexer& indexer) : indexer_(indexer) {}
866 template <typename func_t, typename scalar_t>
867 void Run(const func_t& reduce_func, scalar_t identity) {
868 if (indexer_.NumWorkloads() == 0) {
870 "0-sized input should be handled outside of the reudction "
873 if (indexer_.NumInputs() != 1) {
874 utility::LogError("Reduction op must have exactly one input.");
877 CLOUDVIEWER_ASSERT_HOST_DEVICE_LAMBDA(func_t);
878 using arg0_t = typename BinaryFunctionTraits<func_t>::arg0_t;
879 using arg1_t = typename BinaryFunctionTraits<func_t>::arg1_t;
880 if (!std::is_same<scalar_t, arg0_t>::value ||
881 !std::is_same<scalar_t, arg1_t>::value) {
883 "Function input type must match with the identity's type.");
886 using res_t = typename BinaryFunctionTraits<func_t>::res_t;
887 if (std::is_same<res_t, bool>::value) {
888 // func_t is a comparison function (for arg-reduction).
889 // Signature: (scalar_t, scalar_t) -> bool.
890 RunReduce<scalar_t, int64_t>(
891 indexer_, WrapArgReduceOps(reduce_func),
892 thrust::pair<scalar_t, int64_t>(identity, 0));
894 // func_t is a regular reduction function.
895 // Signature: (scalar_t, scalar_t) -> scalar_t.
896 RunReduce<scalar_t, scalar_t>(
897 indexer_, WrapRegularReduceOps<scalar_t>(reduce_func),
903 /// If the index cannot be represented in 32 bits, RunReduce calls itself
905 template <typename scalar_t,
906 typename out_scalar_t,
910 static void RunReduce(Indexer& indexer,
913 AccumulationBuffer* acc_buf_ptr = nullptr) {
914 using traits = FunctionTraits<decltype(&ops_t::Reduce)>;
915 using arg_t = typename traits::template arg<0>::type;
916 static constexpr bool can_accumulate_in_output =
917 std::is_convertible<arg_t, out_scalar_t>::value;
919 bool can_use_32bit_indexing = indexer.CanUse32BitIndexing();
920 std::unique_ptr<AccumulationBuffer> owned_buf_ptr;
922 // The acc_buf_ptr is a shared pointer. It is create at the first
923 // entrance reused by all recursive function calls.
924 if (acc_buf_ptr == nullptr) {
925 // acc_buf_ptr holds buffer used for accumulation among multiple
926 // sub_iter when accumulation in output is not possible.
927 if (!can_accumulate_in_output && !can_use_32bit_indexing) {
928 int64_t output_memory_size = 1;
929 for (int dim = 0; dim < indexer.NumDims(); dim++) {
930 output_memory_size = std::max(
932 indexer.GetPrimaryShape()[dim] *
933 indexer.GetOutput().byte_strides_[dim]);
935 owned_buf_ptr.reset(new AccumulationBuffer(
936 sizeof(arg_t), sizeof(out_scalar_t),
937 (char*)indexer.GetOutput().data_ptr_,
938 output_memory_size * sizeof(arg_t)));
940 owned_buf_ptr.reset(new AccumulationBuffer());
942 acc_buf_ptr = owned_buf_ptr.get();
945 if (!can_use_32bit_indexing) {
946 for (auto& sub_indexer : indexer.SplitTo32BitIndexing()) {
947 RunReduce<scalar_t, out_scalar_t, vt0>(sub_indexer, ops,
948 identity, acc_buf_ptr);
953 ReduceConfig config(sizeof(arg_t), indexer);
955 std::unique_ptr<Blob> buffer_blob;
956 std::unique_ptr<Blob> semaphores_blob;
957 void* buffer = nullptr;
958 void* semaphores = nullptr;
959 if (config.ShouldGlobalReduce()) {
960 int device_id = cuda::GetDevice();
961 Device device(Device::DeviceType::CUDA, device_id);
964 std::make_unique<Blob>(config.GlobalMemorySize(), device);
966 std::make_unique<Blob>(config.SemaphoreSize(), device);
967 buffer = buffer_blob->GetDataPtr();
968 semaphores = semaphores_blob->GetDataPtr();
969 CLOUDVIEWER_CUDA_CHECK(
970 cudaMemset(semaphores, 0, config.SemaphoreSize()));
973 CLOUDVIEWER_ASSERT(can_use_32bit_indexing);
974 const char* in_data = (char*)indexer.GetInput(0).data_ptr_;
975 char* out_data = (char*)indexer.GetOutput().data_ptr_;
976 char* acc_data = acc_buf_ptr->GetAccSlice(out_data);
977 auto output_calc = MakeOutputCalculator<uint32_t>(indexer);
978 auto input_calc = MakeInputCalculator<uint32_t>(indexer);
980 auto reduce_op = ReduceOp<scalar_t, ops_t, uint32_t, out_scalar_t, vt0>(
981 ops, config, input_calc, output_calc, in_data, out_data,
982 acc_data, buffer, (int*)semaphores, identity,
983 indexer.ShouldAccumulate(), indexer.IsFinalOutput());
985 // Launch reduce kernel
986 int shared_memory = config.SharedMemorySize();
987 ReduceKernel<ReduceConfig::MAX_NUM_THREADS>
988 <<<config.GridDim(), config.BlockDim(), shared_memory,
989 core::cuda::GetStream()>>>(reduce_op);
991 CLOUDVIEWER_CUDA_CHECK(cudaGetLastError());
998 void ReductionCUDA(const Tensor& src,
1000 const SizeVector& dims,
1002 ReductionOpCode op_code) {
1003 if (s_regular_reduce_ops.find(op_code) != s_regular_reduce_ops.end()) {
1004 Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
1005 CUDAReductionEngine re(indexer);
1006 Dtype dtype = src.GetDtype();
1008 CUDAScopedDevice scoped_device(src.GetDevice());
1009 DISPATCH_DTYPE_TO_TEMPLATE(dtype, [&]() {
1011 case ReductionOpCode::Sum:
1012 if (indexer.NumWorkloads() == 0) {
1013 // 0-sized input can be reduced to non-0-sized outputs,
1014 // where identity elements should be filled.
1015 // E.g. np.sum(np.ones((0, 5)), axis=0).shape == (5,).
1019 [] CLOUDVIEWER_HOST_DEVICE(
1020 scalar_t a, scalar_t b) -> scalar_t {
1023 static_cast<scalar_t>(0));
1026 case ReductionOpCode::Prod:
1027 if (indexer.NumWorkloads() == 0) {
1031 [] CLOUDVIEWER_HOST_DEVICE(
1032 scalar_t a, scalar_t b) -> scalar_t {
1035 static_cast<scalar_t>(1));
1038 case ReductionOpCode::Min:
1039 if (indexer.NumWorkloads() == 0) {
1041 "Zero-size Tensor does not suport Min.");
1044 [] CLOUDVIEWER_HOST_DEVICE(
1045 scalar_t a, scalar_t b) -> scalar_t {
1046 return a < b ? a : b;
1048 static_cast<scalar_t>(
1049 std::numeric_limits<scalar_t>::max()));
1052 case ReductionOpCode::Max:
1053 if (indexer.NumWorkloads() == 0) {
1055 "Zero-size Tensor does not suport Max.");
1058 [] CLOUDVIEWER_HOST_DEVICE(
1059 scalar_t a, scalar_t b) -> scalar_t {
1060 return a > b ? a : b;
1062 static_cast<scalar_t>(std::numeric_limits<
1063 scalar_t>::lowest()));
1067 utility::LogError("Unsupported op code.");
1071 } else if (s_arg_reduce_ops.find(op_code) != s_arg_reduce_ops.end()) {
1072 if (dst.GetDtype() != core::Int64) {
1073 utility::LogError("Arg-reduction must have int64 output dtype.");
1075 Indexer indexer({src}, dst, DtypePolicy::INPUT_SAME, dims);
1076 CUDAReductionEngine re(indexer);
1077 Dtype dtype = src.GetDtype();
1079 CUDAScopedDevice scoped_device(src.GetDevice());
1080 DISPATCH_DTYPE_TO_TEMPLATE(dtype, [&]() {
1082 case ReductionOpCode::ArgMin:
1083 if (indexer.NumWorkloads() == 0) {
1085 "Zero-size Tensor does not suport ArgMin.");
1087 re.Run([] CLOUDVIEWER_HOST_DEVICE(
1089 scalar_t b) -> bool { return a < b; },
1090 static_cast<scalar_t>(
1091 std::numeric_limits<scalar_t>::max()));
1094 case ReductionOpCode::ArgMax:
1095 if (indexer.NumWorkloads() == 0) {
1097 "Zero-size Tensor does not suport ArgMax.");
1099 re.Run([] CLOUDVIEWER_HOST_DEVICE(
1101 scalar_t b) -> bool { return a > b; },
1102 static_cast<scalar_t>(std::numeric_limits<
1103 scalar_t>::lowest()));
1107 utility::LogError("Unsupported op code.");
1111 } else if (s_boolean_reduce_ops.find(op_code) !=
1112 s_boolean_reduce_ops.end()) {
1113 if (src.GetDtype() != core::Bool) {
1115 "Boolean reduction only supports boolean input tensor.");
1117 if (dst.GetDtype() != core::Bool) {
1119 "Boolean reduction only supports boolean output tensor.");
1121 Indexer indexer({src}, dst, DtypePolicy::ALL_SAME, dims);
1122 CUDAReductionEngine re(indexer);
1124 CUDAScopedDevice scoped_device(src.GetDevice());
1126 case ReductionOpCode::All:
1127 if (indexer.NumWorkloads() == 0) {
1130 re.Run([] CLOUDVIEWER_HOST_DEVICE(uint8_t a, uint8_t b)
1131 -> uint8_t { return a && b; },
1132 static_cast<uint8_t>(true));
1135 case ReductionOpCode::Any:
1136 if (indexer.NumWorkloads() == 0) {
1139 re.Run([] CLOUDVIEWER_HOST_DEVICE(uint8_t a, uint8_t b)
1140 -> uint8_t { return a || b; },
1141 static_cast<uint8_t>(false));
1145 utility::LogError("Unsupported op code.");
1149 utility::LogError("Unsupported op code.");
1153 } // namespace kernel
1155 } // namespace cloudViewer