Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the aligned_resource_adaptor to allocate bloom filter device buffers #17758

Open
wants to merge 14 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions cpp/src/io/parquet/bloom_filter_reader.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/aligned_resource_adaptor.hpp>

#include <cuco/bloom_filter_policies.cuh>
#include <cuco/bloom_filter_ref.cuh>
Expand Down Expand Up @@ -66,9 +67,12 @@ struct bloom_filter_caster {
ast::literal const* const literal,
rmm::cuda_stream_view stream) const
{
using key_type = T;
using policy_type = cuco::arrow_filter_policy<key_type, cudf::hashing::detail::XXHash_64>;
using word_type = typename policy_type::word_type;
using key_type = T;
using policy_type = cuco::arrow_filter_policy<key_type, cudf::hashing::detail::XXHash_64>;
using bloom_filter_type = cuco::
bloom_filter_ref<key_type, cuco::extent<std::size_t>, cuco::thread_scope_thread, policy_type>;
using filter_block_type = typename bloom_filter_type::filter_block_type;
using word_type = typename policy_type::word_type;

// Check if the literal has the same type as the predicate column
CUDF_EXPECTS(
Expand Down Expand Up @@ -104,16 +108,13 @@ struct bloom_filter_caster {
auto const num_filter_blocks = filter_size / bytes_per_block;

// Create a bloom filter view.
cuco::bloom_filter_ref<key_type,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>
filter{reinterpret_cast<word_type*>(filter_span[filter_idx].data()),
num_filter_blocks,
{}, // Thread scope as the same literal is being searched across different bitsets
// per thread
{}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow
// compatibility
bloom_filter_type filter{
reinterpret_cast<filter_block_type*>(filter_span[filter_idx].data()),
num_filter_blocks,
{}, // Thread scope as the same literal is being searched across different bitsets per
// thread
{}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow
// compatibility

// If int96_timestamp type, convert literal to string_view and query bloom
// filter
Expand Down Expand Up @@ -381,15 +382,27 @@ class bloom_filter_expression_converter : public equality_literals_collector {
* @param bloom_filter_sizes Bloom filter sizes for all chunks
* @param chunk_source_map Association between each column chunk and its source
* @param stream CUDA stream used for device memory operations and kernel launches
* @param aligned_mr Aligned device memory resource to allocate bloom filter buffers
*/
void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources,
size_t num_chunks,
cudf::host_span<rmm::device_buffer> bloom_filter_data,
cudf::host_span<std::optional<int64_t>> bloom_filter_offsets,
cudf::host_span<std::optional<int32_t>> bloom_filter_sizes,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream)
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr)
{
// Using `cuco::arrow_filter_policy` with a temporary `cuda::std::byte` key type to extract bloom
// filter properties
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr filter_block_alignment =
alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);
auto constexpr words_per_block = policy_type::words_per_block;

// Read tasks for bloom filter data
std::vector<std::future<size_t>> read_tasks;

Expand Down Expand Up @@ -422,12 +435,6 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources
CompactProtocolReader cp{buffer->data(), buffer->size()};
cp.read(&header);

// Get the hardcoded words_per_block value from `cuco::arrow_filter_policy` using a temporary
// `std::byte` key type.
auto constexpr words_per_block =
cuco::arrow_filter_policy<cuda::std::byte,
cudf::hashing::detail::XXHash_64>::words_per_block;

// Check if the bloom filter header is valid.
auto const is_header_valid =
(header.num_bytes % words_per_block) == 0 and
Expand All @@ -448,15 +455,25 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

// Check if we already read in the filter bitset in the initial read.
if (initial_read_size >= bloom_filter_header_size + bitset_size) {
bloom_filter_data[chunk] =
rmm::device_buffer{buffer->data() + bloom_filter_header_size, bitset_size, stream};
bloom_filter_data[chunk] = rmm::device_buffer{
buffer->data() + bloom_filter_header_size, bitset_size, stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
}
// Read the bitset from datasource.
else {
auto const bitset_offset = bloom_filter_offset + bloom_filter_header_size;
// Directly read to device if preferred
if (source->is_device_read_preferred(bitset_size)) {
bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream};
bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
auto future_read_size =
source->device_read_async(bitset_offset,
bitset_size,
Expand All @@ -465,8 +482,14 @@ void read_bloom_filter_data(host_span<std::unique_ptr<datasource> const> sources

read_tasks.emplace_back(std::move(future_read_size));
} else {
buffer = source->host_read(bitset_offset, bitset_size);
bloom_filter_data[chunk] = rmm::device_buffer{buffer->data(), buffer->size(), stream};
buffer = source->host_read(bitset_offset, bitset_size);
bloom_filter_data[chunk] =
rmm::device_buffer{buffer->data(), buffer->size(), stream, aligned_mr};
// The allocated bloom filter buffer must be aligned
CUDF_EXPECTS(reinterpret_cast<std::uintptr_t>(bloom_filter_data[chunk].data()) %
filter_block_alignment ==
0,
"Encountered misaligned bloom filter block");
}
}
});
Expand All @@ -484,7 +507,8 @@ std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
host_span<std::vector<size_type> const> row_group_indices,
host_span<int const> column_schemas,
size_type total_row_groups,
rmm::cuda_stream_view stream) const
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr) const
{
// Descriptors for all the chunks that make up the selected columns
auto const num_input_columns = column_schemas.size();
Expand Down Expand Up @@ -543,7 +567,8 @@ std::vector<rmm::device_buffer> aggregate_reader_metadata::read_bloom_filters(
bloom_filter_offsets,
bloom_filter_sizes,
chunk_source_map,
stream);
stream,
aligned_mr);

// Return bloom filter data
return bloom_filter_data;
Expand Down Expand Up @@ -612,10 +637,22 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// Return early if no column with equality predicate(s)
if (equality_col_schemas.empty()) { return std::nullopt; }

// Required alignment:
// https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67
using policy_type = cuco::arrow_filter_policy<cuda::std::byte, cudf::hashing::detail::XXHash_64>;
auto constexpr alignment = alignof(cuco::bloom_filter_ref<cuda::std::byte,
cuco::extent<std::size_t>,
cuco::thread_scope_thread,
policy_type>::filter_block_type);

// Aligned resource adaptor to allocate bloom filter buffers with
auto aligned_mr =
rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

// Read a vector of bloom filter bitset device buffers for all columns with equality
// predicate(s) across all row groups
auto bloom_filter_data = read_bloom_filters(
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream);
sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr);

// No bloom filter buffers, return the original row group indices
if (bloom_filter_data.empty()) { return std::nullopt; }
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class aggregate_reader_metadata {
* @param[out] bloom_filter_data List of bloom filter data device buffers
* @param column_schemas Schema indices of columns whose bloom filters will be read
* @param stream CUDA stream used for device memory operations and kernel launches
* @param aligned_mr Aligned device memory resource to allocate bloom filter buffers
*
* @return A flattened list of bloom filter bitset device buffers for each predicate column across
* row group
Expand All @@ -213,7 +214,8 @@ class aggregate_reader_metadata {
host_span<std::vector<size_type> const> row_group_indices,
host_span<int const> column_schemas,
size_type num_row_groups,
rmm::cuda_stream_view stream) const;
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref aligned_mr) const;

/**
* @brief Collects Parquet types for the columns with the specified schema indices
Expand Down
Loading