22 template <
typename scalar_t>
27 template <
typename scalar_t>
32 template <
typename scalar_t>
37 template <
typename scalar_t>
50 template <
typename scalar_t>
52 int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b) {
60 template <
typename scalar_t>
62 int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b) {
76 template <
typename func_t,
typename scalar_t>
77 void Run(
const func_t& reduce_func, scalar_t identity) {
81 LaunchReductionKernelSerial<scalar_t>(indexer_, reduce_func);
83 LaunchReductionKernelTwoPass<scalar_t>(indexer_, reduce_func,
86 LaunchReductionParallelDim<scalar_t>(indexer_, reduce_func);
91 template <
typename scalar_t,
typename func_t>
93 func_t element_kernel) {
94 for (int64_t workload_idx = 0; workload_idx <
indexer.NumWorkloads();
96 scalar_t* src =
reinterpret_cast<scalar_t*
>(
97 indexer.GetInputPtr(0, workload_idx));
98 scalar_t* dst =
reinterpret_cast<scalar_t*
>(
99 indexer.GetOutputPtr(workload_idx));
100 *dst = element_kernel(*src, *dst);
106 template <
typename scalar_t,
typename func_t>
107 static void LaunchReductionKernelTwoPass(
const Indexer&
indexer,
108 func_t element_kernel,
110 if (
indexer.NumOutputElements() > 1) {
112 "Internal error: two-pass reduction only works for "
113 "single-output reduction ops.");
115 int64_t num_workloads =
indexer.NumWorkloads();
117 int64_t workload_per_thread =
118 (num_workloads + num_threads - 1) / num_threads;
119 std::vector<scalar_t> thread_results(num_threads, identity);
121 #pragma omp parallel for schedule(static) \
122 num_threads(utility::EstimateMaxThreads())
123 for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
124 int64_t start = thread_idx * workload_per_thread;
125 int64_t end =
std::min(start + workload_per_thread, num_workloads);
126 scalar_t local_result = identity;
127 for (int64_t workload_idx = start; workload_idx < end;
129 scalar_t* src =
reinterpret_cast<scalar_t*
>(
130 indexer.GetInputPtr(0, workload_idx));
131 local_result = element_kernel(*src, local_result);
133 thread_results[thread_idx] = local_result;
135 scalar_t* dst =
reinterpret_cast<scalar_t*
>(
indexer.GetOutputPtr(0));
136 for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
137 *dst = element_kernel(thread_results[thread_idx], *dst);
141 template <
typename scalar_t,
typename func_t>
142 static void LaunchReductionParallelDim(
const Indexer&
indexer,
143 func_t element_kernel) {
145 const int64_t* indexer_shape =
indexer.GetPrimaryShape();
146 const int64_t num_dims =
indexer.NumDims();
150 int64_t best_dim = num_dims - 1;
151 while (best_dim >= 0 &&
indexer.IsReductionDim(best_dim)) {
154 for (int64_t dim = best_dim; dim >= 0 && !
indexer.IsReductionDim(dim);
156 if (indexer_shape[dim] >= num_threads) {
159 }
else if (indexer_shape[dim] > indexer_shape[best_dim]) {
163 if (best_dim == -1) {
165 "Internal error: all dims are reduction dims, use "
166 "LaunchReductionKernelTwoPass instead.");
169 #pragma omp parallel for schedule(static) \
170 num_threads(utility::EstimateMaxThreads())
171 for (int64_t i = 0; i < indexer_shape[best_dim]; ++i) {
173 sub_indexer.ShrinkDim(best_dim, i, 1);
174 LaunchReductionKernelSerial<scalar_t>(sub_indexer, element_kernel);
188 template <
typename func_t,
typename scalar_t>
189 void Run(
const func_t& reduce_func, scalar_t identity) {
195 if (num_output_elements <= 1) {
202 template <
typename scalar_t,
typename func_t>
206 int64_t num_output_elements =
indexer.NumOutputElements();
207 #pragma omp parallel for schedule(static) \
208 num_threads(utility::EstimateMaxThreads())
209 for (int64_t output_idx = 0; output_idx < num_output_elements;
214 scalar_t dst_val = identity;
215 for (int64_t workload_idx = 0;
216 workload_idx < sub_indexer.
NumWorkloads(); workload_idx++) {
217 int64_t src_idx = workload_idx;
218 scalar_t* src_val =
reinterpret_cast<scalar_t*
>(
220 int64_t* dst_idx =
reinterpret_cast<int64_t*
>(
222 std::tie(*dst_idx, dst_val) =
223 reduce_func(src_idx, *src_val, *dst_idx, dst_val);
231 template <
typename scalar_t,
typename func_t>
235 if (
indexer.NumOutputElements() > 1) {
237 "Internal error: two-pass arg reduction only works for "
238 "single-output arg reduction ops.");
240 int64_t num_workloads =
indexer.NumWorkloads();
242 int64_t workload_per_thread =
243 (num_workloads + num_threads - 1) / num_threads;
244 std::vector<int64_t> thread_results_idx(num_threads, 0);
245 std::vector<scalar_t> thread_results_val(num_threads, identity);
247 #pragma omp parallel for schedule(static) \
248 num_threads(utility::EstimateMaxThreads())
249 for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
250 int64_t start = thread_idx * workload_per_thread;
251 int64_t end =
std::min(start + workload_per_thread, num_workloads);
252 scalar_t local_result_val = identity;
253 int64_t local_result_idx = 0;
254 for (int64_t workload_idx = start; workload_idx < end;
256 int64_t src_idx = workload_idx;
257 scalar_t* src_val =
reinterpret_cast<scalar_t*
>(
258 indexer.GetInputPtr(0, workload_idx));
259 std::tie(local_result_idx, local_result_val) = reduce_func(
260 src_idx, *src_val, local_result_idx, local_result_val);
262 thread_results_val[thread_idx] = local_result_val;
263 thread_results_idx[thread_idx] = local_result_idx;
265 scalar_t dst_val = identity;
266 int64_t* dst_idx =
reinterpret_cast<int64_t*
>(
indexer.GetOutputPtr(0));
267 for (int64_t thread_idx = 0; thread_idx < num_threads; ++thread_idx) {
268 std::tie(*dst_idx, dst_val) = reduce_func(
269 thread_results_idx[thread_idx],
270 thread_results_val[thread_idx], *dst_idx, dst_val);
289 case ReductionOpCode::Sum:
292 re.Run(CPUSumReductionKernel<scalar_t>, identity);
294 case ReductionOpCode::Prod:
297 re.Run(CPUProdReductionKernel<scalar_t>, identity);
299 case ReductionOpCode::Min:
300 if (indexer.NumWorkloads() == 0) {
302 "Zero-size Tensor does not support Min.");
304 identity = std::numeric_limits<scalar_t>::max();
306 re.Run(CPUMinReductionKernel<scalar_t>, identity);
309 case ReductionOpCode::Max:
310 if (indexer.NumWorkloads() == 0) {
312 "Zero-size Tensor does not support Max.");
314 identity = std::numeric_limits<scalar_t>::lowest();
316 re.Run(CPUMaxReductionKernel<scalar_t>, identity);
320 utility::LogError(
"Unsupported op code.");
329 Tensor dst_acc(dst.GetShape(), src.GetDtype(), src.GetDevice());
332 CPUArgReductionEngine re(
indexer);
336 case ReductionOpCode::ArgMin:
337 if (indexer.NumWorkloads() == 0) {
339 "Zero-size Tensor does not support ArgMin.");
341 identity = std::numeric_limits<scalar_t>::max();
342 dst_acc.Fill(identity);
343 re.Run(CPUArgMinReductionKernel<scalar_t>, identity);
346 case ReductionOpCode::ArgMax:
347 if (indexer.NumWorkloads() == 0) {
349 "Zero-size Tensor does not support ArgMax.");
351 identity = std::numeric_limits<scalar_t>::lowest();
352 dst_acc.Fill(identity);
353 re.Run(CPUArgMaxReductionKernel<scalar_t>, identity);
357 utility::LogError(
"Unsupported op code.");
365 "Boolean reduction only supports boolean input tensor.");
369 "Boolean reduction only supports boolean output tensor.");
372 CPUReductionEngine re(
indexer);
374 case ReductionOpCode::All:
379 case ReductionOpCode::Any:
#define DISPATCH_DTYPE_TO_TEMPLATE(DTYPE,...)
CLOUDVIEWER_HOST_DEVICE char * GetInputPtr(int64_t input_idx, int64_t workload_idx) const
CLOUDVIEWER_HOST_DEVICE char * GetOutputPtr(int64_t workload_idx) const
int64_t NumWorkloads() const
int64_t NumOutputElements() const
Returns the number of output elements.
CPUArgReductionEngine(const CPUArgReductionEngine &)=delete
CPUArgReductionEngine(const Indexer &indexer)
void Run(const func_t &reduce_func, scalar_t identity)
static void LaunchArgReductionKernelTwoPass(const Indexer &indexer, func_t reduce_func, scalar_t identity)
static void LaunchArgReductionParallelDim(const Indexer &indexer, func_t reduce_func, scalar_t identity)
CPUArgReductionEngine & operator=(const CPUArgReductionEngine &)=delete
CPUReductionEngine(const Indexer &indexer)
CPUReductionEngine & operator=(const CPUReductionEngine &)=delete
CPUReductionEngine(const CPUReductionEngine &)=delete
void Run(const func_t &reduce_func, scalar_t identity)
static scalar_t CPUSumReductionKernel(scalar_t a, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_arg_reduce_ops
static scalar_t CPUMinReductionKernel(scalar_t a, scalar_t b)
static std::pair< int64_t, scalar_t > CPUArgMaxReductionKernel(int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b)
static uint8_t CPUAllReductionKernel(uint8_t a, uint8_t b)
static scalar_t CPUProdReductionKernel(scalar_t a, scalar_t b)
void ReductionCPU(const Tensor &src, Tensor &dst, const SizeVector &dims, bool keepdim, ReductionOpCode op_code)
static scalar_t CPUMaxReductionKernel(scalar_t a, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_regular_reduce_ops
static uint8_t CPUAnyReductionKernel(uint8_t a, uint8_t b)
static std::pair< int64_t, scalar_t > CPUArgMinReductionKernel(int64_t a_idx, scalar_t a, int64_t b_idx, scalar_t b)
static const std::unordered_set< ReductionOpCode, utility::hash_enum_class > s_boolean_reduce_ops
bool InParallel()
Returns true if in an parallel section.
int EstimateMaxThreads()
Estimate the maximum number of threads to be used in a parallel region.
Generic file read and write utility for python interface.