diff --git a/cpp/src/io/parquet/bloom_filter_reader.cu b/cpp/src/io/parquet/bloom_filter_reader.cu index 69e4723eded..af524e1f70a 100644 --- a/cpp/src/io/parquet/bloom_filter_reader.cu +++ b/cpp/src/io/parquet/bloom_filter_reader.cu @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -66,9 +67,12 @@ struct bloom_filter_caster { ast::literal const* const literal, rmm::cuda_stream_view stream) const { - using key_type = T; - using policy_type = cuco::arrow_filter_policy; - using word_type = typename policy_type::word_type; + using key_type = T; + using policy_type = cuco::arrow_filter_policy; + using bloom_filter_type = cuco:: + bloom_filter_ref, cuco::thread_scope_thread, policy_type>; + using filter_block_type = typename bloom_filter_type::filter_block_type; + using word_type = typename policy_type::word_type; // Check if the literal has the same type as the predicate column CUDF_EXPECTS( @@ -104,16 +108,13 @@ struct bloom_filter_caster { auto const num_filter_blocks = filter_size / bytes_per_block; // Create a bloom filter view. - cuco::bloom_filter_ref, - cuco::thread_scope_thread, - policy_type> - filter{reinterpret_cast(filter_span[filter_idx].data()), - num_filter_blocks, - {}, // Thread scope as the same literal is being searched across different bitsets - // per thread - {}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow - // compatibility + bloom_filter_type filter{ + reinterpret_cast(filter_span[filter_idx].data()), + num_filter_blocks, + {}, // Thread scope as the same literal is being searched across different bitsets per + // thread + {}}; // Arrow policy with cudf::hashing::detail::XXHash_64 seeded with 0 for Arrow + // compatibility // If int96_timestamp type, convert literal to string_view and query bloom // filter @@ -381,6 +382,7 @@ class bloom_filter_expression_converter : public equality_literals_collector { * @param bloom_filter_sizes Bloom filter sizes for all chunks * @param chunk_source_map Association between each column chunk and its source * @param stream CUDA stream used for device memory operations and kernel launches + * @param aligned_mr Aligned device memory resource to allocate bloom filter buffers */ void read_bloom_filter_data(host_span const> sources, size_t num_chunks, @@ -388,8 +390,19 @@ void read_bloom_filter_data(host_span const> sources cudf::host_span> bloom_filter_offsets, cudf::host_span> bloom_filter_sizes, std::vector const& chunk_source_map, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref aligned_mr) { + // Using `cuco::arrow_filter_policy` with a temporary `cuda::std::byte` key type to extract bloom + // filter properties + using policy_type = cuco::arrow_filter_policy; + auto constexpr filter_block_alignment = + alignof(cuco::bloom_filter_ref, + cuco::thread_scope_thread, + policy_type>::filter_block_type); + auto constexpr words_per_block = policy_type::words_per_block; + // Read tasks for bloom filter data std::vector> read_tasks; @@ -422,12 +435,6 @@ void read_bloom_filter_data(host_span const> sources CompactProtocolReader cp{buffer->data(), buffer->size()}; cp.read(&header); - // Get the hardcoded words_per_block value from `cuco::arrow_filter_policy` using a temporary - // `std::byte` key type. - auto constexpr words_per_block = - cuco::arrow_filter_policy::words_per_block; - // Check if the bloom filter header is valid. auto const is_header_valid = (header.num_bytes % words_per_block) == 0 and @@ -448,15 +455,25 @@ void read_bloom_filter_data(host_span const> sources // Check if we already read in the filter bitset in the initial read. if (initial_read_size >= bloom_filter_header_size + bitset_size) { - bloom_filter_data[chunk] = - rmm::device_buffer{buffer->data() + bloom_filter_header_size, bitset_size, stream}; + bloom_filter_data[chunk] = rmm::device_buffer{ + buffer->data() + bloom_filter_header_size, bitset_size, stream, aligned_mr}; + // The allocated bloom filter buffer must be aligned + CUDF_EXPECTS(reinterpret_cast(bloom_filter_data[chunk].data()) % + filter_block_alignment == + 0, + "Encountered misaligned bloom filter block"); } // Read the bitset from datasource. else { auto const bitset_offset = bloom_filter_offset + bloom_filter_header_size; // Directly read to device if preferred if (source->is_device_read_preferred(bitset_size)) { - bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream}; + bloom_filter_data[chunk] = rmm::device_buffer{bitset_size, stream, aligned_mr}; + // The allocated bloom filter buffer must be aligned + CUDF_EXPECTS(reinterpret_cast(bloom_filter_data[chunk].data()) % + filter_block_alignment == + 0, + "Encountered misaligned bloom filter block"); auto future_read_size = source->device_read_async(bitset_offset, bitset_size, @@ -465,8 +482,14 @@ void read_bloom_filter_data(host_span const> sources read_tasks.emplace_back(std::move(future_read_size)); } else { - buffer = source->host_read(bitset_offset, bitset_size); - bloom_filter_data[chunk] = rmm::device_buffer{buffer->data(), buffer->size(), stream}; + buffer = source->host_read(bitset_offset, bitset_size); + bloom_filter_data[chunk] = + rmm::device_buffer{buffer->data(), buffer->size(), stream, aligned_mr}; + // The allocated bloom filter buffer must be aligned + CUDF_EXPECTS(reinterpret_cast(bloom_filter_data[chunk].data()) % + filter_block_alignment == + 0, + "Encountered misaligned bloom filter block"); } } }); @@ -484,7 +507,8 @@ std::vector aggregate_reader_metadata::read_bloom_filters( host_span const> row_group_indices, host_span column_schemas, size_type total_row_groups, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref aligned_mr) const { // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = column_schemas.size(); @@ -543,7 +567,8 @@ std::vector aggregate_reader_metadata::read_bloom_filters( bloom_filter_offsets, bloom_filter_sizes, chunk_source_map, - stream); + stream, + aligned_mr); // Return bloom filter data return bloom_filter_data; @@ -612,10 +637,22 @@ std::optional>> aggregate_reader_metadata::ap // Return early if no column with equality predicate(s) if (equality_col_schemas.empty()) { return std::nullopt; } + // Required alignment: + // https://github.com/NVIDIA/cuCollections/blob/deab5799f3e4226cb8a49acf2199c03b14941ee4/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh#L55-L67 + using policy_type = cuco::arrow_filter_policy; + auto constexpr alignment = alignof(cuco::bloom_filter_ref, + cuco::thread_scope_thread, + policy_type>::filter_block_type); + + // Aligned resource adaptor to allocate bloom filter buffers with + auto aligned_mr = + rmm::mr::aligned_resource_adaptor(cudf::get_current_device_resource(), alignment); + // Read a vector of bloom filter bitset device buffers for all columns with equality // predicate(s) across all row groups auto bloom_filter_data = read_bloom_filters( - sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream); + sources, input_row_group_indices, equality_col_schemas, total_row_groups, stream, aligned_mr); // No bloom filter buffers, return the original row group indices if (bloom_filter_data.empty()) { return std::nullopt; } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index a28ce616e2c..ba5e53e3104 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -204,6 +204,7 @@ class aggregate_reader_metadata { * @param[out] bloom_filter_data List of bloom filter data device buffers * @param column_schemas Schema indices of columns whose bloom filters will be read * @param stream CUDA stream used for device memory operations and kernel launches + * @param aligned_mr Aligned device memory resource to allocate bloom filter buffers * * @return A flattened list of bloom filter bitset device buffers for each predicate column across * row group @@ -213,7 +214,8 @@ class aggregate_reader_metadata { host_span const> row_group_indices, host_span column_schemas, size_type num_row_groups, - rmm::cuda_stream_view stream) const; + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref aligned_mr) const; /** * @brief Collects Parquet types for the columns with the specified schema indices