Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar committed Sep 18, 2023
2 parents 3ce402d + 3b691f4 commit 24ebad8
Show file tree
Hide file tree
Showing 20 changed files with 664 additions and 292 deletions.
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ add_library(
src/stream_compaction/apply_boolean_mask.cu
src/stream_compaction/distinct.cu
src/stream_compaction/distinct_count.cu
src/stream_compaction/distinct_reduce.cu
src/stream_compaction/distinct_helpers.cu
src/stream_compaction/drop_nans.cu
src/stream_compaction/drop_nulls.cu
src/stream_compaction/stable_distinct.cu
Expand Down
167 changes: 167 additions & 0 deletions cpp/include/cudf/detail/hash_reduce_by_row.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/uninitialized_fill.h>

#include <cuco/static_map.cuh>

namespace cudf::detail {

using hash_map_type =
cuco::static_map<size_type, size_type, cuda::thread_scope_device, hash_table_allocator_type>;

/**
* @brief The base struct for customized reduction functor to perform reduce-by-key with keys are
* rows that compared equal.
*
* TODO: We need to switch to use `static_reduction_map` when it is ready
* (https://github.com/NVIDIA/cuCollections/pull/98).
*/
template <typename MapView, typename KeyHasher, typename KeyEqual, typename OutputType>
struct reduce_by_row_fn_base {
protected:
MapView const d_map;
KeyHasher const d_hasher;
KeyEqual const d_equal;
OutputType* const d_output;

reduce_by_row_fn_base(MapView const& d_map,
KeyHasher const& d_hasher,
KeyEqual const& d_equal,
OutputType* const d_output)
: d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, d_output{d_output}
{
}

/**
* @brief Return a pointer to the output array at the given index.
*
* @param idx The access index
* @return A pointer to the given index in the output array
*/
__device__ OutputType* get_output_ptr(size_type const idx) const
{
auto const iter = d_map.find(idx, d_hasher, d_equal);

if (iter != d_map.end()) {
// Only one (undetermined) index value of the duplicate rows could be inserted into the map.
// As such, looking up for all indices of duplicate rows always returns the same value.
auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed);

// All duplicate rows will have concurrent access to this same output slot.
return &d_output[inserted_idx];
} else {
// All input `idx` values have been inserted into the map before.
// Thus, searching for an `idx` key resulting in the `end()` iterator only happens if
// `d_equal(idx, idx) == false`.
// Such situations are due to comparing nulls or NaNs which are considered as always unequal.
// In those cases, all rows containing nulls or NaNs are distinct. Just return their direct
// output slot.
return &d_output[idx];
}
}
};

/**
* @brief Perform a reduction on groups of rows that are compared equal.
*
* This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared
* equal. A hash table is used to find groups of equal rows.
*
* At the beginning of the operation, the entire output array is filled with a value given by
* the `init` parameter. Then, the reduction result for each row group is written into the output
* array at the index of an unspecified row in the group.
*
* @tparam ReduceFuncBuilder The builder class that must have a `build()` method returning a
* reduction functor derived from `reduce_by_row_fn_base`
* @tparam OutputType Type of the reduction results
* @param map The auxiliary map to perform reduction
* @param preprocessed_input The preprocessed of the input rows for computing row hashing and row
* comparisons
* @param num_rows The number of all input rows
* @param has_nulls Indicate whether the input rows has any nulls at any nested levels
* @param has_nested_columns Indicates whether the input table has any nested columns
* @param nulls_equal Flag to specify whether null elements should be considered as equal
* @param nans_equal Flag to specify whether NaN values in floating point column should be
* considered equal.
* @param init The initial value for reduction of each row group
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A device_uvector containing the reduction results
*/
template <typename ReduceFuncBuilder, typename OutputType>
rmm::device_uvector<OutputType> hash_reduce_by_row(
hash_map_type const& map,
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> const preprocessed_input,
size_type num_rows,
cudf::nullate::DYNAMIC has_nulls,
bool has_nested_columns,
null_equality nulls_equal,
nan_equality nans_equal,
ReduceFuncBuilder func_builder,
OutputType init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const map_dview = map.get_device_view();
auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const key_hasher = row_hasher.device_hasher(has_nulls);
auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto reduction_results = rmm::device_uvector<OutputType>(num_rows, stream, mr);
thrust::uninitialized_fill(
rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init);

auto const reduce_by_row = [&](auto const value_comp) {
if (has_nested_columns) {
auto const key_equal = row_comp.equal_to<true>(has_nulls, nulls_equal, value_comp);
thrust::for_each(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin()));
} else {
auto const key_equal = row_comp.equal_to<false>(has_nulls, nulls_equal, value_comp);
thrust::for_each(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin()));
}
};

if (nans_equal == nan_equality::ALL_EQUAL) {
using nan_equal_comparator =
cudf::experimental::row::equality::nan_equal_physical_equality_comparator;
reduce_by_row(nan_equal_comparator{});
} else {
using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator;
reduce_by_row(nan_unequal_comparator{});
}

return reduction_results;
}

} // namespace cudf::detail
28 changes: 14 additions & 14 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "distinct_reduce.cuh"
#include "distinct_helpers.hpp"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/gather.hpp>
Expand Down Expand Up @@ -50,8 +50,8 @@ rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
}

auto map = hash_map_type{compute_hash_table_size(input.num_rows()),
cuco::empty_key{COMPACTION_EMPTY_KEY_SENTINEL},
cuco::empty_value{COMPACTION_EMPTY_VALUE_SENTINEL},
cuco::empty_key{-1},
cuco::empty_value{std::numeric_limits<size_type>::min()},
detail::hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

Expand All @@ -61,7 +61,7 @@ rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
auto const has_nested_columns = cudf::detail::has_nested_columns(input);

auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const key_hasher = experimental::compaction_hash(row_hasher.device_hasher(has_nulls));
auto const key_hasher = row_hasher.device_hasher(has_nulls);

auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);

Expand Down Expand Up @@ -96,16 +96,16 @@ rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
}

// For other keep options, reduce by row on rows that compare equal.
auto const reduction_results = hash_reduce_by_row(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());
auto const reduction_results = reduce_by_row(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());

// Extract the desired output indices from reduction results.
auto const map_end = [&] {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/stream_compaction/distinct_count.cu
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ cudf::size_type distinct_count(table_view const& keys,
auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(keys, stream);
auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const hash_key = experimental::compaction_hash(row_hasher.device_hasher(has_nulls));
auto const hash_key = row_hasher.device_hasher(has_nulls);
auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const comparator_helper = [&](auto const row_equal) {
using hasher_type = decltype(hash_key);
auto key_set = cuco::experimental::static_set{
cuco::experimental::extent{compute_hash_table_size(num_rows)},
cuco::empty_key<cudf::size_type>{COMPACTION_EMPTY_KEY_SENTINEL},
cuco::empty_key<cudf::size_type>{-1},
row_equal,
cuco::experimental::linear_probing<1, hasher_type>{hash_key},
detail::hash_table_allocator_type{default_allocator<char>{}, stream},
Expand Down
109 changes: 109 additions & 0 deletions cpp/src/stream_compaction/distinct_helpers.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "distinct_helpers.hpp"

#include <cudf/detail/hash_reduce_by_row.cuh>

namespace cudf::detail {

namespace {
/**
* @brief The functor to find the first/last/all duplicate row for rows that compared equal.
*/
template <typename MapView, typename KeyHasher, typename KeyEqual>
struct reduce_fn : reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, size_type> {
duplicate_keep_option const keep;

reduce_fn(MapView const& d_map,
KeyHasher const& d_hasher,
KeyEqual const& d_equal,
duplicate_keep_option const keep,
size_type* const d_output)
: reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, size_type>{d_map,
d_hasher,
d_equal,
d_output},
keep{keep}
{
}

__device__ void operator()(size_type const idx) const
{
auto const out_ptr = this->get_output_ptr(idx);

if (keep == duplicate_keep_option::KEEP_FIRST) {
// Store the smallest index of all rows that are equal.
atomicMin(out_ptr, idx);
} else if (keep == duplicate_keep_option::KEEP_LAST) {
// Store the greatest index of all rows that are equal.
atomicMax(out_ptr, idx);
} else {
// Count the number of rows in each group of rows that are compared equal.
atomicAdd(out_ptr, size_type{1});
}
}
};

/**
* @brief The builder to construct an instance of `reduce_fn` functor base on the given
* value of the `duplicate_keep_option` member variable.
*/
struct reduce_func_builder {
duplicate_keep_option const keep;

template <typename MapView, typename KeyHasher, typename KeyEqual>
auto build(MapView const& d_map,
KeyHasher const& d_hasher,
KeyEqual const& d_equal,
size_type* const d_output)
{
return reduce_fn<MapView, KeyHasher, KeyEqual>{d_map, d_hasher, d_equal, keep, d_output};
}
};

} // namespace

// This function is split from `distinct.cu` to improve compile time.
rmm::device_uvector<size_type> reduce_by_row(
hash_map_type const& map,
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> const preprocessed_input,
size_type num_rows,
cudf::nullate::DYNAMIC has_nulls,
bool has_nested_columns,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY,
"This function should not be called with KEEP_ANY");

return hash_reduce_by_row(map,
preprocessed_input,
num_rows,
has_nulls,
has_nested_columns,
nulls_equal,
nans_equal,
reduce_func_builder{keep},
reduction_init_value(keep),
stream,
mr);
}

} // namespace cudf::detail
Loading

0 comments on commit 24ebad8

Please sign in to comment.