From b8503bc000f19b983b19292b16f0048254f2b3a9 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 3 May 2024 07:44:08 -0700 Subject: [PATCH] Add support for large string columns to Parquet reader and writer (#15632) Part of #13733. Adds support for reading and writing cuDF string columns where the string data exceeds 2GB. This is accomplished by skipping the final offsets calculation in the string decoding kernel when the 2GB threshold is exceeded, and instead uses `cudf::strings::detail::make_offsets_child_column()`. This could lead to increased overhead with many columns (see #13024), so this will need some more benchmarking. But if there are many columns that exceed the 2GB limit, it's likely reads will have to be chunked to stay within the memory budget. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Muhammad Haseeb (https://github.com/mhaseeb123) - David Wendt (https://github.com/davidwendt) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/15632 --- cpp/CMakeLists.txt | 1 + cpp/src/io/parquet/page_delta_decode.cu | 34 +++++---- cpp/src/io/parquet/page_string_decode.cu | 33 +++++---- cpp/src/io/parquet/parquet_gpu.hpp | 8 +- cpp/src/io/parquet/reader_impl.cpp | 29 ++++++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 8 +- cpp/src/io/parquet/writer_impl.cu | 6 +- cpp/src/io/utilities/column_buffer.cpp | 10 --- cpp/src/io/utilities/column_buffer_strings.cu | 53 ++++++++++++++ cpp/tests/CMakeLists.txt | 2 +- cpp/tests/large_strings/parquet_tests.cpp | 73 +++++++++++++++++++ 11 files changed, 200 insertions(+), 57 deletions(-) create mode 100644 cpp/src/io/utilities/column_buffer_strings.cu create mode 100644 cpp/tests/large_strings/parquet_tests.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 232a4f40d8e..f11f3fc3c9a 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -430,6 +430,7 @@ add_library( src/io/text/multibyte_split.cu src/io/utilities/arrow_io_source.cpp src/io/utilities/column_buffer.cpp + src/io/utilities/column_buffer_strings.cu src/io/utilities/config_utils.cpp src/io/utilities/data_casting.cu src/io/utilities/data_sink.cpp diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index da1bbaebd73..0c9d4e77f0c 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -579,15 +579,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } @@ -738,15 +741,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } // finally, copy the string data into place auto const dst = nesting_info_base[leaf_level_index].string_out; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 5ba813f518f..cf1dc58b06a 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -955,7 +955,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { using cudf::detail::warp_size; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(4) size_type last_offset; + __shared__ size_t last_offset; __shared__ __align__(16) page_state_buffers_s state_buffers; @@ -1054,9 +1054,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) ? gpuGetStringData(s, sb, src_pos + skipped_leaf_values + i) : cuda::std::pair{nullptr, 0}; - __shared__ cub::WarpScan::TempStorage temp_storage; - size_type offset, warp_total; - cub::WarpScan(temp_storage).ExclusiveSum(len, offset, warp_total); + __shared__ cub::WarpScan::TempStorage temp_storage; + size_t offset, warp_total; + cub::WarpScan(temp_storage).ExclusiveSum(len, offset, warp_total); offset += last_offset; // choose a character parallel string copy when the average string is longer than a warp @@ -1075,10 +1075,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } __syncwarp(); } else if (use_char_ll) { - __shared__ __align__(8) uint8_t const* pointers[warp_size]; - __shared__ __align__(4) size_type offsets[warp_size]; - __shared__ __align__(4) int dsts[warp_size]; - __shared__ __align__(4) int lengths[warp_size]; + __shared__ uint8_t const* pointers[warp_size]; + __shared__ size_t offsets[warp_size]; + __shared__ int dsts[warp_size]; + __shared__ int lengths[warp_size]; offsets[me] = offset; pointers[me] = reinterpret_cast(ptr); @@ -1119,15 +1119,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index c06fb63acda..3b18175dccd 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -326,8 +326,8 @@ struct PageInfo { int32_t skipped_leaf_values; // for string columns only, the size of all the chars in the string for // this page. only valid/computed during the base preprocess pass + size_t str_offset; // offset into string data for this page int32_t str_bytes; - int32_t str_offset; // offset into string data for this page bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes // nesting information (input/output) for each page. this array contains @@ -420,7 +420,8 @@ struct ColumnChunkDesc { src_col_schema(src_col_schema_), h_chunk_info(chunk_info_), list_bytes_per_row_est(list_bytes_per_row_est_), - is_strings_to_cat(strings_to_categorical_) + is_strings_to_cat(strings_to_categorical_), + is_large_string_col(false) { } @@ -454,7 +455,8 @@ struct ColumnChunkDesc { float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row - bool is_strings_to_cat{}; // convert strings to hashes + bool is_strings_to_cat{}; // convert strings to hashes + bool is_large_string_col{}; // `true` if string data uses 64-bit offsets }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b7172f5ba67..0602b5ec007 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -99,11 +100,21 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row col_string_sizes = calculate_page_string_offsets(); // check for overflow - if (std::any_of(col_string_sizes.cbegin(), col_string_sizes.cend(), [](std::size_t sz) { - return sz > std::numeric_limits::max(); - })) { + auto const threshold = static_cast(strings::detail::get_offset64_threshold()); + auto const has_large_strings = std::any_of(col_string_sizes.cbegin(), + col_string_sizes.cend(), + [=](std::size_t sz) { return sz > threshold; }); + if (has_large_strings and not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } + + // mark any chunks that are large string columns + if (has_large_strings) { + for (auto& chunk : pass.chunks) { + auto const idx = chunk.src_col_index; + if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } + } + } } // In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector @@ -348,11 +359,13 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row } else if (out_buf.type.id() == type_id::STRING) { // need to cap off the string offsets column auto const sz = static_cast(col_string_sizes[idx]); - CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast(out_buf.data()) + out_buf.size, - &sz, - sizeof(size_type), - cudaMemcpyDefault, - _stream.value())); + if (sz <= strings::detail::get_offset64_threshold()) { + CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast(out_buf.data()) + out_buf.size, + &sz, + sizeof(size_type), + cudaMemcpyDefault, + _stream.value())); + } } } } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 4b7a64ac6ab..8c9b3c1a1e6 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1169,10 +1169,10 @@ struct page_to_string_size { struct page_offset_output_iter { PageInfo* p; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; + using value_type = size_t; + using difference_type = size_t; + using pointer = size_t*; + using reference = size_t&; using iterator_category = thrust::output_device_iterator_tag; __host__ __device__ page_offset_output_iter operator+(int i) { return {p + i}; } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 286c7b361a9..24aa630a05f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -278,8 +279,9 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream) return size_of(column.type()) * column.size(); } else if (column.type().id() == type_id::STRING) { auto const scol = strings_column_view(column); - return cudf::detail::get_value(scol.offsets(), column.size(), stream) - - cudf::detail::get_value(scol.offsets(), 0, stream); + return cudf::strings::detail::get_offset_value( + scol.offsets(), column.size() + column.offset(), stream) - + cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream); } else if (column.type().id() == type_id::STRUCT) { auto const scol = structs_column_view(column); size_t ret = 0; diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5dc2291abdc..db84778edc6 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -69,16 +69,6 @@ void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes _string_data = rmm::device_buffer(num_bytes, stream, _mr); } -std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_column_impl( - rmm::cuda_stream_view stream) -{ - // no need for copies, just transfer ownership of the data_buffers to the columns - auto offsets_col = std::make_unique( - data_type{type_to_id()}, size + 1, std::move(_data), rmm::device_buffer{}, 0); - return make_strings_column( - size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); -} - namespace { /** diff --git a/cpp/src/io/utilities/column_buffer_strings.cu b/cpp/src/io/utilities/column_buffer_strings.cu new file mode 100644 index 00000000000..4bc303a34a5 --- /dev/null +++ b/cpp/src/io/utilities/column_buffer_strings.cu @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "column_buffer.hpp" + +#include +#include +#include + +namespace cudf::io::detail { + +std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_column_impl( + rmm::cuda_stream_view stream) +{ + // if the size of _string_data is over the threshold for 64bit size_type, _data will contain + // sizes rather than offsets. need special handling for that case. + auto const threshold = static_cast(strings::detail::get_offset64_threshold()); + if (_string_data.size() > threshold) { + if (not strings::detail::is_large_strings_enabled()) { + CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); + } + // create new offsets + auto const offsets_ptr = static_cast(_data.data()); + auto offsets_col = make_numeric_column( + data_type{type_id::INT64}, size + 1, mask_state::UNALLOCATED, stream, _mr); + auto d_offsets64 = offsets_col->mutable_view().template data(); + // it's safe to call with size + 1 because _data is also sized that large + cudf::detail::sizes_to_offsets(offsets_ptr, offsets_ptr + size + 1, d_offsets64, stream); + return make_strings_column( + size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); + } else { + // no need for copies, just transfer ownership of the data_buffers to the columns + auto offsets_col = std::make_unique( + data_type{type_to_id()}, size + 1, std::move(_data), rmm::device_buffer{}, 0); + return make_strings_column( + size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index fa633dfa67b..bbb919aa2d1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -572,7 +572,7 @@ ConfigureTest( # * large strings test ---------------------------------------------------------------------------- ConfigureTest( LARGE_STRINGS_TEST large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp - large_strings/concatenate_tests.cpp + large_strings/concatenate_tests.cpp large_strings/parquet_tests.cpp GPUS 1 PERCENT 100 ) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp new file mode 100644 index 00000000000..007c08ce0fb --- /dev/null +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace { + +cudf::test::TempDirTestEnvironment* const g_temp_env = + static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +} // namespace + +struct ParquetStringsTest : public cudf::test::StringsLargeTest {}; + +TEST_F(ParquetStringsTest, ReadLargeStrings) +{ + // need to create a string column larger than `threshold` + auto const col0 = this->long_column(); + auto const column_size = cudf::strings_column_view(col0).chars_size(cudf::get_default_stream()); + auto const threshold = column_size - 1; + auto const expected = cudf::table_view{{col0, col0, col0}}; + + auto expected_metadata = cudf::io::table_input_metadata{expected}; + expected_metadata.column_metadata[1].set_encoding( + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); + expected_metadata.column_metadata[2].set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY); + + // set smaller threshold to reduce file size and execution time + setenv("LIBCUDF_LARGE_STRINGS_THRESHOLD", std::to_string(threshold).c_str(), 1); + + auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::ZSTD) + .stats_level(cudf::io::STATISTICS_NONE) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options default_in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto const result = cudf::io::read_parquet(default_in_opts); + auto const result_view = result.tbl->view(); + for (auto cv : result_view) { + auto const offsets = cudf::strings_column_view(cv).offsets(); + EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); + } + CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); + + // go back to normal threshold + unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); +}