From 848dd5acaa67dfc1b620572fe93d888a69e73ec7 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Mon, 24 Apr 2023 19:54:53 -0700 Subject: [PATCH] Refactor pinned memory vector and ORC+Parquet writers (#13206) Currently, `cudf::detail::pinned_allocator` is used in various places to implement a pinned host vector. This standardizes such usage, removing `cudf::detail::pinned_allocator` from the usage sites and replacing its usage by a standard `cudf::detail::pinned_host_vector` instead. Some small changes are also made for ORC/Parquet writer classes, replacing `bool _single_write_mode` by `SingleWriteMode _single_write_mode`. This is needed for https://github.com/rapidsai/cudf/pull/13076. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - Vukasin Milovanovic (https://github.com/vuule) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/13206 --- conda/recipes/libcudf/meta.yaml | 2 +- cpp/benchmarks/io/text/multibyte_split.cpp | 5 +- ...d_allocator.hpp => pinned_host_vector.hpp} | 11 +- cpp/include/cudf/io/detail/orc.hpp | 4 +- cpp/include/cudf/io/detail/parquet.hpp | 4 +- cpp/include/cudf/io/detail/utils.hpp | 4 +- cpp/src/io/functions.cpp | 8 +- cpp/src/io/orc/writer_impl.cu | 168 ++++++++---------- cpp/src/io/orc/writer_impl.hpp | 46 ++--- cpp/src/io/parquet/writer_impl.cu | 44 ++--- cpp/src/io/parquet/writer_impl.hpp | 10 +- cpp/src/io/text/bgzip_data_chunk_source.cu | 13 +- .../io/text/data_chunk_source_factories.cpp | 17 +- cpp/src/io/utilities/hostdevice_vector.hpp | 7 +- 14 files changed, 153 insertions(+), 190 deletions(-) rename cpp/include/cudf/detail/utilities/{pinned_allocator.hpp => pinned_host_vector.hpp} (96%) diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index 2bb571f858d..94b244b94d5 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -139,7 +139,7 @@ outputs: - test -f $PREFIX/include/cudf/detail/utilities/integer_utils.hpp - test -f $PREFIX/include/cudf/detail/utilities/linked_column.hpp - test -f $PREFIX/include/cudf/detail/utilities/logger.hpp - - test -f $PREFIX/include/cudf/detail/utilities/pinned_allocator.hpp + - test -f $PREFIX/include/cudf/detail/utilities/pinned_host_vector.hpp - test -f $PREFIX/include/cudf/detail/utilities/vector_factories.hpp - test -f $PREFIX/include/cudf/detail/utilities/visitor_overload.hpp - test -f $PREFIX/include/cudf/dictionary/detail/concatenate.hpp diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index 41b5ddb567e..a697c98a320 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include #include @@ -33,7 +33,6 @@ #include #include -#include #include #include @@ -136,7 +135,7 @@ static void bench_multibyte_split(nvbench::state& state, std::unique_ptr datasource; auto device_input = create_random_input(file_size_approx, delim_factor, 0.05, delim); auto host_input = std::vector{}; - auto host_pinned_input = thrust::host_vector>{}; + auto host_pinned_input = cudf::detail::pinned_host_vector{}; if (source_type != data_chunk_source_type::device && source_type != data_chunk_source_type::host_pinned) { diff --git a/cpp/include/cudf/detail/utilities/pinned_allocator.hpp b/cpp/include/cudf/detail/utilities/pinned_host_vector.hpp similarity index 96% rename from cpp/include/cudf/detail/utilities/pinned_allocator.hpp rename to cpp/include/cudf/detail/utilities/pinned_host_vector.hpp index 84abf7c014f..83f061e9407 100644 --- a/cpp/include/cudf/detail/utilities/pinned_allocator.hpp +++ b/cpp/include/cudf/detail/utilities/pinned_host_vector.hpp @@ -1,5 +1,5 @@ /* - * Copyright 2008-2022 NVIDIA Corporation + * Copyright 2008-2023 NVIDIA Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ #include +#include + namespace cudf::detail { /*! \p pinned_allocator is a CUDA-specific host memory allocator @@ -199,4 +201,11 @@ class pinned_allocator { return !operator==(x); } }; + +/** + * @brief A vector class with pinned host memory allocator + */ +template +using pinned_host_vector = thrust::host_vector>; + } // namespace cudf::detail diff --git a/cpp/include/cudf/io/detail/orc.hpp b/cpp/include/cudf/io/detail/orc.hpp index 1a53690e317..b7794c0df6a 100644 --- a/cpp/include/cudf/io/detail/orc.hpp +++ b/cpp/include/cudf/io/detail/orc.hpp @@ -96,7 +96,7 @@ class writer { */ explicit writer(std::unique_ptr sink, orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -109,7 +109,7 @@ class writer { */ explicit writer(std::unique_ptr sink, chunked_orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 304d42a29c9..9a94924824d 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -156,7 +156,7 @@ class writer { */ explicit writer(std::vector> sinks, parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -171,7 +171,7 @@ class writer { */ explicit writer(std::vector> sinks, chunked_parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** diff --git a/cpp/include/cudf/io/detail/utils.hpp b/cpp/include/cudf/io/detail/utils.hpp index adb7078d96d..7bbda21858d 100644 --- a/cpp/include/cudf/io/detail/utils.hpp +++ b/cpp/include/cudf/io/detail/utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ namespace detail { /** * @brief Whether writer writes in chunks or all at once */ -enum class SingleWriteMode : bool { YES, NO }; +enum class single_write_mode : bool { YES, NO }; } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 76c50d548f9..ddbbce53bab 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -428,7 +428,7 @@ void write_orc(orc_writer_options const& options) CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); auto writer = std::make_unique( - std::move(sinks[0]), options, io_detail::SingleWriteMode::YES, cudf::get_default_stream()); + std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream()); writer->write(options.get_table()); } @@ -444,7 +444,7 @@ orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); writer = std::make_unique( - std::move(sinks[0]), options, io_detail::SingleWriteMode::NO, cudf::get_default_stream()); + std::move(sinks[0]), options, io_detail::single_write_mode::NO, cudf::get_default_stream()); } /** @@ -519,7 +519,7 @@ std::unique_ptr> write_parquet(parquet_writer_options const auto sinks = make_datasinks(options.get_sink()); auto writer = std::make_unique( - std::move(sinks), options, io_detail::SingleWriteMode::YES, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::YES, cudf::get_default_stream()); writer->write(options.get_table(), options.get_partitions()); @@ -575,7 +575,7 @@ parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options co auto sinks = make_datasinks(options.get_sink()); writer = std::make_unique( - std::move(sinks), options, io_detail::SingleWriteMode::NO, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::NO, cudf::get_default_stream()); } /** diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index f1eb52f63f1..fef1bb23733 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -79,11 +80,6 @@ struct row_group_index_info { }; namespace { -/** - * @brief Helper for pinned host memory - */ -template -using pinned_buffer = std::unique_ptr; /** * @brief Translates ORC compression to nvCOMP compression @@ -379,11 +375,11 @@ __global__ void copy_string_data(char* string_pool, } // namespace void persisted_statistics::persist(int num_table_rows, - bool single_write_mode, + single_write_mode write_mode, intermediate_statistics& intermediate_stats, rmm::cuda_stream_view stream) { - if (not single_write_mode) { + if (write_mode == single_write_mode::NO) { // persist the strings in the chunks into a string pool and update pointers auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); // min offset and max offset + 1 for total size @@ -670,7 +666,7 @@ orc_streams create_streams(host_span columns, std::map const& decimal_column_sizes, bool enable_dictionary, CompressionKind compression_kind, - bool single_write_mode) + single_write_mode write_mode) { // 'column 0' row index stream std::vector streams{{ROW_INDEX, 0}}; // TODO: Separate index and data streams? @@ -685,7 +681,7 @@ orc_streams create_streams(host_span columns, for (auto& column : columns) { auto const is_nullable = [&]() -> bool { - if (single_write_mode) { + if (write_mode == single_write_mode::YES) { return column.nullable(); } else { // For chunked write, when not provided nullability, we assume the worst case scenario @@ -2191,33 +2187,22 @@ std::unique_ptr make_table_meta(table_view const& input) * @param compression_kind The compression kind * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers - * @param single_write_mode Flag to indicate if there is only a single table write + * @param write_mode Flag to indicate if there is only a single table write * @param out_sink Sink for writing data * @param stream CUDA stream used for device memory operations and kernel launches * @return A tuple of the intermediate results containing the processed data */ -std::tuple, - hostdevice_2dvector, - encoded_data, - file_segmentation, - hostdevice_2dvector, - std::vector, - orc_table_view, - rmm::device_buffer, - intermediate_statistics, - pinned_buffer> -convert_table_to_orc_data(table_view const& input, - table_input_metadata const& table_meta, - stripe_size_limits max_stripe_size, - size_type row_index_stride, - bool enable_dictionary, - CompressionKind compression_kind, - size_t compression_blocksize, - statistics_freq stats_freq, - bool single_write_mode, - data_sink const& out_sink, - rmm::cuda_stream_view stream) +auto convert_table_to_orc_data(table_view const& input, + table_input_metadata const& table_meta, + stripe_size_limits max_stripe_size, + size_type row_index_stride, + bool enable_dictionary, + CompressionKind compression_kind, + size_t compression_blocksize, + statistics_freq stats_freq, + single_write_mode write_mode, + data_sink const& out_sink, + rmm::cuda_stream_view stream) { auto const input_tview = table_device_view::create(input, stream); @@ -2269,7 +2254,7 @@ convert_table_to_orc_data(table_view const& input, decimal_column_sizes(dec_chunk_sizes.rg_sizes), enable_dictionary, compression_kind, - single_write_mode); + write_mode); auto enc_data = encode_columns(orc_table, std::move(dictionaries), std::move(dec_chunk_sizes), @@ -2288,17 +2273,17 @@ convert_table_to_orc_data(table_view const& input, auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data, &strm_descs, stream); if (num_rows == 0) { - return {std::move(streams), - hostdevice_vector{}, // comp_results - std::move(strm_descs), - std::move(enc_data), - std::move(segmentation), - std::move(stripe_dict), - std::move(stripes), - std::move(orc_table), - rmm::device_buffer{}, // compressed_data - intermediate_statistics{stream}, - pinned_buffer{nullptr, cudaFreeHost}}; + return std::tuple{std::move(enc_data), + std::move(segmentation), + std::move(orc_table), + rmm::device_uvector{0, stream}, // compressed_data + hostdevice_vector{}, // comp_results + std::move(strm_descs), + intermediate_statistics{stream}, + std::move(streams), + std::move(stripes), + std::move(stripe_dict), + cudf::detail::pinned_host_vector()}; } // Allocate intermediate output stream buffer @@ -2312,7 +2297,7 @@ convert_table_to_orc_data(table_view const& input, auto const padded_block_header_size = util::round_up_unsafe(block_header_size, compressed_block_align); - auto stream_output = [&]() { + auto bounce_buffer = [&]() { size_t max_stream_size = 0; bool all_device_write = true; @@ -2333,20 +2318,11 @@ convert_table_to_orc_data(table_view const& input, max_stream_size = std::max(max_stream_size, stream_size); } - if (all_device_write) { - return pinned_buffer{nullptr, cudaFreeHost}; - } else { - return pinned_buffer{[](size_t size) { - uint8_t* ptr = nullptr; - CUDF_CUDA_TRY(cudaMallocHost(&ptr, size)); - return ptr; - }(max_stream_size), - cudaFreeHost}; - } + return cudf::detail::pinned_host_vector(all_device_write ? 0 : max_stream_size); }(); // Compress the data streams - rmm::device_buffer compressed_data(compressed_bfr_size, stream); + rmm::device_uvector compressed_data(compressed_bfr_size, stream); hostdevice_vector comp_results(num_compressed_blocks, stream); thrust::fill(rmm::exec_policy(stream), comp_results.d_begin(), @@ -2354,7 +2330,7 @@ convert_table_to_orc_data(table_view const& input, compression_result{0, compression_status::FAILURE}); if (compression_kind != NONE) { strm_descs.host_to_device(stream); - gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), + gpu::CompressOrcDataStreams(compressed_data.data(), num_compressed_blocks, compression_kind, compression_blocksize, @@ -2374,24 +2350,24 @@ convert_table_to_orc_data(table_view const& input, auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream); - return {std::move(streams), - std::move(comp_results), - std::move(strm_descs), - std::move(enc_data), - std::move(segmentation), - std::move(stripe_dict), - std::move(stripes), - std::move(orc_table), - std::move(compressed_data), - std::move(intermediate_stats), - std::move(stream_output)}; + return std::tuple{std::move(enc_data), + std::move(segmentation), + std::move(orc_table), + std::move(compressed_data), + std::move(comp_results), + std::move(strm_descs), + std::move(intermediate_stats), + std::move(streams), + std::move(stripes), + std::move(stripe_dict), + std::move(bounce_buffer)}; } } // namespace writer::impl::impl(std::unique_ptr sink, orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, @@ -2399,7 +2375,7 @@ writer::impl::impl(std::unique_ptr sink, _compression_kind(to_orc_compression(options.get_compression())), _compression_blocksize(compression_block_size(_compression_kind)), _stats_freq(options.get_statistics_freq()), - _single_write_mode(mode == SingleWriteMode::YES), + _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), _out_sink(std::move(sink)) { @@ -2411,7 +2387,7 @@ writer::impl::impl(std::unique_ptr sink, writer::impl::impl(std::unique_ptr sink, chunked_orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, @@ -2419,7 +2395,7 @@ writer::impl::impl(std::unique_ptr sink, _compression_kind(to_orc_compression(options.get_compression())), _compression_blocksize(compression_block_size(_compression_kind)), _stats_freq(options.get_statistics_freq()), - _single_write_mode(mode == SingleWriteMode::YES), + _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), _out_sink(std::move(sink)) { @@ -2448,17 +2424,17 @@ void writer::impl::write(table_view const& input) // is still intact. // Note that `out_sink_` is intentionally passed by const reference to prevent accidentally // writing anything to it. - [[maybe_unused]] auto [streams, - comp_results, - strm_descs, - enc_data, + [[maybe_unused]] auto [enc_data, segmentation, - stripe_dict, /* unused, but its data will be accessed via pointer later */ - stripes, orc_table, compressed_data, + comp_results, + strm_descs, intermediate_stats, - stream_output] = [&] { + streams, + stripes, + stripe_dict, /* unused, but its data will be accessed via pointer later */ + bounce_buffer] = [&] { try { return convert_table_to_orc_data(input, *_table_meta, @@ -2480,31 +2456,31 @@ void writer::impl::write(table_view const& input) }(); // Compression/encoding were all successful. Now write the intermediate results. - write_orc_data_to_sink(streams, - comp_results, - strm_descs, - enc_data, + write_orc_data_to_sink(enc_data, segmentation, - stripes, orc_table, compressed_data, + comp_results, + strm_descs, intermediate_stats, - stream_output.get()); + streams, + stripes, + bounce_buffer); // Update data into the footer. This needs to be called even when num_rows==0. add_table_to_footer_data(orc_table, stripes); } -void writer::impl::write_orc_data_to_sink(orc_streams& streams, - hostdevice_vector const& comp_results, - hostdevice_2dvector const& strm_descs, - encoded_data const& enc_data, +void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, file_segmentation const& segmentation, - std::vector& stripes, orc_table_view const& orc_table, - rmm::device_buffer const& compressed_data, + device_span compressed_data, + host_span comp_results, + host_2dspan strm_descs, intermediate_statistics& intermediate_stats, - uint8_t* stream_output) + orc_streams& streams, + host_span stripes, + host_span bounce_buffer) { if (orc_table.num_rows() == 0) { return; } @@ -2543,8 +2519,8 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, write_tasks.push_back(write_data_stream( strm_desc, enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first], - static_cast(compressed_data.data()), - stream_output, + compressed_data.data(), + bounce_buffer.data(), &stripe, &streams, _compression_kind, @@ -2712,7 +2688,7 @@ void writer::impl::close() // Forward to implementation writer::writer(std::unique_ptr sink, orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _impl(std::make_unique(std::move(sink), options, mode, stream)) { @@ -2721,7 +2697,7 @@ writer::writer(std::unique_ptr sink, // Forward to implementation writer::writer(std::unique_ptr sink, chunked_orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _impl(std::make_unique(std::move(sink), options, mode, stream)) { diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index f003de8087e..2b04b418b41 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -209,7 +209,7 @@ struct persisted_statistics { } void persist(int num_table_rows, - bool single_write_mode, + single_write_mode write_mode, intermediate_statistics& intermediate_stats, rmm::cuda_stream_view stream); @@ -248,7 +248,7 @@ class writer::impl { */ explicit impl(std::unique_ptr sink, orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -261,7 +261,7 @@ class writer::impl { */ explicit impl(std::unique_ptr sink, chunked_orc_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -293,27 +293,27 @@ class writer::impl { * The intermediate data is generated from processing (compressing/encoding) an cuDF input table * by `process_for_write` called in the `write()` function. * - * @param streams List of stream descriptors - * @param comp_results Status of data compression - * @param strm_descs List of stream descriptors - * @param enc_data ORC per-chunk streams of encoded data - * @param segmentation Description of how the ORC file is segmented into stripes and rowgroups - * @param stripes List of stripe description - * @param orc_table Non-owning view of a cuDF table that includes ORC-related information - * @param compressed_data Compressed stream data - * @param intermediate_stats Statistics data stored between calls to write - * @param stream_output Temporary host output buffer + * @param[in] enc_data ORC per-chunk streams of encoded data + * @param[in] segmentation Description of how the ORC file is segmented into stripes and rowgroups + * @param[in] orc_table Non-owning view of a cuDF table that includes ORC-related information + * @param[in] compressed_data Compressed stream data + * @param[in] comp_results Status of data compression + * @param[in] strm_descs List of stream descriptors + * @param[in,out] intermediate_stats Statistics data stored between calls to write + * @param[in,out] streams List of stream descriptors + * @param[in,out] stripes List of stripe description + * @param[out] bounce_buffer Temporary host output buffer */ - void write_orc_data_to_sink(orc_streams& streams, - hostdevice_vector const& comp_results, - hostdevice_2dvector const& strm_descs, - encoded_data const& enc_data, + void write_orc_data_to_sink(encoded_data const& enc_data, file_segmentation const& segmentation, - std::vector& stripes, orc_table_view const& orc_table, - rmm::device_buffer const& compressed_data, + device_span compressed_data, + host_span comp_results, + host_2dspan strm_descs, intermediate_statistics& intermediate_stats, - uint8_t* stream_output); + orc_streams& streams, + host_span stripes, + host_span bounce_buffer); /** * @brief Add the processed table data into the internal file footer. @@ -334,9 +334,9 @@ class writer::impl { CompressionKind const _compression_kind; size_t const _compression_blocksize; statistics_freq const _stats_freq; - bool const _single_write_mode; // Special parameter only used by `write()` to indicate that - // we are guaranteeing a single table write. This enables some - // internal optimizations. + single_write_mode const _single_write_mode; // Special parameter only used by `write()` to + // indicate that we are guaranteeing a single table + // write. This enables some internal optimizations. std::map const _kv_meta; // Optional user metadata. std::unique_ptr const _out_sink; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 79581c0d21c..48108ca2669 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -67,11 +68,6 @@ using namespace cudf::io::parquet; using namespace cudf::io; namespace { -/** - * @brief Helper for pinned host memory - */ -template -using pinned_buffer = std::unique_ptr; /** * @brief Function that translates GDF compression to parquet compression @@ -510,7 +506,7 @@ struct leaf_schema_fn { inline bool is_col_nullable(cudf::detail::LinkedColPtr const& col, column_in_metadata const& col_meta, - bool single_write_mode) + single_write_mode write_mode) { if (col_meta.is_nullability_defined()) { CUDF_EXPECTS(col_meta.nullable() || !col->nullable(), @@ -520,7 +516,7 @@ inline bool is_col_nullable(cudf::detail::LinkedColPtr const& col, } // For chunked write, when not provided nullability, we assume the worst case scenario // that all columns are nullable. - return not single_write_mode or col->nullable(); + return write_mode == single_write_mode::NO or col->nullable(); } /** @@ -532,7 +528,7 @@ inline bool is_col_nullable(cudf::detail::LinkedColPtr const& col, std::vector construct_schema_tree( cudf::detail::LinkedColVector const& linked_columns, table_input_metadata& metadata, - bool single_write_mode, + single_write_mode write_mode, bool int96_timestamps) { std::vector schema; @@ -546,7 +542,7 @@ std::vector construct_schema_tree( std::function add_schema = [&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) { - bool col_nullable = is_col_nullable(col, col_meta, single_write_mode); + bool col_nullable = is_col_nullable(col, col_meta, write_mode); auto set_field_id = [&schema, parent_idx](schema_tree_node& s, column_in_metadata const& col_meta) { @@ -683,7 +679,7 @@ std::vector construct_schema_tree( right_child_meta.set_name("value"); // check the repetition type of key is required i.e. the col should be non-nullable auto key_col = col->children[lists_column_view::child_column_index]->children[0]; - CUDF_EXPECTS(!is_col_nullable(key_col, left_child_meta, single_write_mode), + CUDF_EXPECTS(!is_col_nullable(key_col, left_child_meta, write_mode), "key column cannot be nullable. For chunked writing, explicitly set the " "nullability to false in metadata"); // process key @@ -1319,7 +1315,7 @@ size_t writer::impl::column_index_buffer_size(gpu::EncColumnChunk* ck) const writer::impl::impl(std::vector> sinks, parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), _compression(to_parquet_compression(options.get_compression())), @@ -1334,7 +1330,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), - _single_write_mode(mode == SingleWriteMode::YES), + _single_write_mode(mode), _out_sink(std::move(sinks)) { if (options.get_metadata()) { @@ -1345,7 +1341,7 @@ writer::impl::impl(std::vector> sinks, writer::impl::impl(std::vector> sinks, chunked_parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _stream(stream), _compression(to_parquet_compression(options.get_compression())), @@ -1360,7 +1356,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), - _single_write_mode(mode == SingleWriteMode::YES), + _single_write_mode(mode), _out_sink(std::move(sinks)) { if (options.get_metadata()) { @@ -1835,7 +1831,7 @@ void writer::impl::write(table_view const& table, std::vector co num_stats_bfr); } - pinned_buffer host_bfr{nullptr, cudaFreeHost}; + cudf::detail::pinned_host_vector host_bfr(max_chunk_bfr_size); // Encode row groups in batches for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { @@ -1889,25 +1885,17 @@ void writer::impl::write(table_view const& table, std::vector co _stream.synchronize(); } } else { - if (!host_bfr) { - host_bfr = pinned_buffer{[](size_t size) { - uint8_t* ptr = nullptr; - CUDF_CUDA_TRY(cudaMallocHost(&ptr, size)); - return ptr; - }(max_chunk_bfr_size), - cudaFreeHost}; - } // copy the full data - CUDF_CUDA_TRY(cudaMemcpyAsync(host_bfr.get(), + CUDF_CUDA_TRY(cudaMemcpyAsync(host_bfr.data(), dev_bfr, ck.ck_stat_size + ck.compressed_size, cudaMemcpyDefault, _stream.value())); _stream.synchronize(); - _out_sink[p]->host_write(host_bfr.get() + ck.ck_stat_size, ck.compressed_size); + _out_sink[p]->host_write(host_bfr.data() + ck.ck_stat_size, ck.compressed_size); if (ck.ck_stat_size != 0) { column_chunk_meta.statistics_blob.resize(ck.ck_stat_size); - memcpy(column_chunk_meta.statistics_blob.data(), host_bfr.get(), ck.ck_stat_size); + memcpy(column_chunk_meta.statistics_blob.data(), host_bfr.data(), ck.ck_stat_size); } } row_group.total_byte_size += ck.compressed_size; @@ -2054,7 +2042,7 @@ std::unique_ptr> writer::impl::close( // Forward to implementation writer::writer(std::vector> sinks, parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _impl(std::make_unique(std::move(sinks), options, mode, stream)) { @@ -2062,7 +2050,7 @@ writer::writer(std::vector> sinks, writer::writer(std::vector> sinks, chunked_parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream) : _impl(std::make_unique(std::move(sinks), options, mode, stream)) { diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 34ad95f3ee4..c88287994a1 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -68,7 +68,7 @@ class writer::impl { */ explicit impl(std::vector> sinks, parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -81,7 +81,7 @@ class writer::impl { */ explicit impl(std::vector> sinks, chunked_parquet_writer_options const& options, - SingleWriteMode mode, + single_write_mode mode, rmm::cuda_stream_view stream); /** @@ -229,9 +229,9 @@ class writer::impl { bool const _int96_timestamps; int32_t const _column_index_truncate_length; std::vector> const _kv_meta; // Optional user metadata. - bool const _single_write_mode = true; // Special parameter only used by `write()` to indicate - // that we are guaranteeing a single table write. - // This enables some internal optimizations. + single_write_mode const _single_write_mode; // Special parameter only used by `write()` to + // indicate that we are guaranteeing a single table + // write. This enables some internal optimizations. std::vector> const _out_sink; // Internal states, filled during `write()` and written to sink during `write` and `close()`. diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index 32678191cb1..715f70605df 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -19,7 +19,7 @@ #include "io/utilities/config_utils.hpp" #include -#include +#include #include #include #include @@ -65,10 +65,7 @@ struct bgzip_nvcomp_transform_functor { class bgzip_data_chunk_reader : public data_chunk_reader { private: template - using pinned_host_vector = thrust::host_vector>; - - template - static void copy_to_device(const pinned_host_vector& host, + static void copy_to_device(cudf::detail::pinned_host_vector const& host, rmm::device_uvector& device, rmm::cuda_stream_view stream) { @@ -84,9 +81,9 @@ class bgzip_data_chunk_reader : public data_chunk_reader { 1 << 16; // 64k offset allocation, resized on demand cudaEvent_t event; - pinned_host_vector h_compressed_blocks; - pinned_host_vector h_compressed_offsets; - pinned_host_vector h_decompressed_offsets; + cudf::detail::pinned_host_vector h_compressed_blocks; + cudf::detail::pinned_host_vector h_compressed_offsets; + cudf::detail::pinned_host_vector h_decompressed_offsets; rmm::device_uvector d_compressed_blocks; rmm::device_uvector d_decompressed_blocks; rmm::device_uvector d_compressed_offsets; diff --git a/cpp/src/io/text/data_chunk_source_factories.cpp b/cpp/src/io/text/data_chunk_source_factories.cpp index b6c88c1346f..1ff1a4f8ebf 100644 --- a/cpp/src/io/text/data_chunk_source_factories.cpp +++ b/cpp/src/io/text/data_chunk_source_factories.cpp @@ -17,7 +17,7 @@ #include "io/text/device_data_chunks.hpp" #include -#include +#include #include #include @@ -30,16 +30,16 @@ namespace cudf::io::text { namespace { +struct host_ticket { + cudaEvent_t event; + cudf::detail::pinned_host_vector buffer; +}; + /** * @brief A reader which produces owning chunks of device memory which contain a copy of the data * from an istream. */ class datasource_chunk_reader : public data_chunk_reader { - struct host_ticket { - cudaEvent_t event; - thrust::host_vector> buffer; - }; - constexpr static int num_tickets = 2; public: @@ -114,11 +114,6 @@ class datasource_chunk_reader : public data_chunk_reader { * from an istream. */ class istream_data_chunk_reader : public data_chunk_reader { - struct host_ticket { - cudaEvent_t event; - thrust::host_vector> buffer; - }; - constexpr static int num_tickets = 2; public: diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index ff8502b1dcd..566132f8463 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -19,7 +19,7 @@ #include "config_utils.hpp" #include "hostdevice_span.hpp" -#include +#include #include #include #include @@ -67,7 +67,7 @@ class hostdevice_vector { if (hostdevice_vector_uses_pageable_buffer()) { h_data_owner = thrust::host_vector(); } else { - h_data_owner = thrust::host_vector>(); + h_data_owner = cudf::detail::pinned_host_vector(); } std::visit( @@ -177,8 +177,7 @@ class hostdevice_vector { } private: - std::variant, thrust::host_vector>> - h_data_owner; + std::variant, cudf::detail::pinned_host_vector> h_data_owner; T* host_data = nullptr; size_t current_size = 0; rmm::device_uvector d_data;