diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a7f8c6ca0a9..3be07480b15 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -121,8 +121,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} @@ -153,8 +151,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} @@ -164,8 +160,6 @@ jobs: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: - # This selects "ARCH=amd64 + the latest supported Python + CUDA". - matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) build_type: nightly branch: ${{ inputs.branch }} date: ${{ inputs.date }} diff --git a/ci/run_cudf_polars_polars_tests.sh b/ci/run_cudf_polars_polars_tests.sh index 95f78f17f2f..49437510c7e 100755 --- a/ci/run_cudf_polars_polars_tests.sh +++ b/ci/run_cudf_polars_polars_tests.sh @@ -12,8 +12,27 @@ DESELECTED_TESTS=( "tests/unit/streaming/test_streaming_sort.py::test_streaming_sort[True]" # relies on polars built in debug mode "tests/unit/test_cpu_check.py::test_check_cpu_flags_skipped_no_flags" # Mock library error "tests/docs/test_user_guide.py" # No dot binary in CI image + "tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14 ) +if [[ $(arch) == "aarch64" ]]; then + # The binary used for TPC-H generation is compiled for x86_64, not aarch64. + DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh") + # The connectorx package is not available on arm + DESELECTED_TESTS+=("tests/unit/io/database/test_read.py::test_read_database") + # The necessary timezone information cannot be found in our CI image. + DESELECTED_TESTS+=("tests/unit/io/test_parquet.py::test_parametric_small_page_mask_filtering") + DESELECTED_TESTS+=("tests/unit/testing/test_assert_series_equal.py::test_assert_series_equal_parametric") + DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity") +else + # Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image. + glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2) + latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2) + if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then + DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh") + fi +fi + DESELECTED_TESTS=$(printf -- " --deselect %s" "${DESELECTED_TESTS[@]}") python -m pytest \ --import-mode=importlib \ diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index e91443ddba8..d21497c4def 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -66,7 +66,7 @@ dependencies: - pandas - pandas>=2.0,<2.2.4dev0 - pandoc -- polars>=1.11,<1.14 +- polars>=1.11,<1.15 - pre-commit - ptxcompiler - pyarrow>=14.0.0,<19.0.0a0 diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 2dccb595e59..400c1195e00 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -64,7 +64,7 @@ dependencies: - pandas - pandas>=2.0,<2.2.4dev0 - pandoc -- polars>=1.11,<1.14 +- polars>=1.11,<1.15 - pre-commit - pyarrow>=14.0.0,<19.0.0a0 - pydata-sphinx-theme!=0.14.2 diff --git a/conda/recipes/cudf-polars/meta.yaml b/conda/recipes/cudf-polars/meta.yaml index 7a477291e7a..b6c03dc1bc2 100644 --- a/conda/recipes/cudf-polars/meta.yaml +++ b/conda/recipes/cudf-polars/meta.yaml @@ -43,7 +43,7 @@ requirements: run: - python - pylibcudf ={{ version }} - - polars >=1.11,<1.14 + - polars >=1.11,<1.15 - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} test: diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6752ce12d83..506f6c185f5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -464,6 +464,7 @@ add_library( src/io/avro/avro_gpu.cu src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp + src/io/comp/comp.cpp src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index f50730b2e98..335f9f435eb 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -354,7 +354,7 @@ ConfigureNVBench( # ################################################################################################## # * strings benchmark ------------------------------------------------------------------- -ConfigureBench(STRINGS_BENCH string/repeat_strings.cpp string/replace.cpp string/url_decode.cu) +ConfigureBench(STRINGS_BENCH string/repeat_strings.cpp string/url_decode.cu) ConfigureNVBench( STRINGS_NVBENCH @@ -379,6 +379,7 @@ ConfigureNVBench( string/lengths.cpp string/like.cpp string/make_strings_column.cu + string/replace.cpp string/replace_re.cpp string/reverse.cpp string/slice.cpp diff --git a/cpp/benchmarks/string/replace.cpp b/cpp/benchmarks/string/replace.cpp index 3d9d51bfd6d..643e857f356 100644 --- a/cpp/benchmarks/string/replace.cpp +++ b/cpp/benchmarks/string/replace.cpp @@ -14,11 +14,8 @@ * limitations under the License. */ -#include "string_bench_args.hpp" - #include #include -#include #include @@ -27,59 +24,51 @@ #include #include -#include - -class StringReplace : public cudf::benchmark {}; +#include enum replace_type { scalar, slice, multi }; -static void BM_replace(benchmark::State& state, replace_type rt) +static void bench_replace(nvbench::state& state) { - cudf::size_type const n_rows{static_cast(state.range(0))}; - cudf::size_type const max_str_length{static_cast(state.range(1))}; + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const min_width = static_cast(state.get_int64("min_width")); + auto const max_width = static_cast(state.get_int64("max_width")); + auto const api = state.get_string("api"); + data_profile const profile = data_profile_builder().distribution( - cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length); - auto const column = create_random_column(cudf::type_id::STRING, row_count{n_rows}, profile); - cudf::strings_column_view input(column->view()); - cudf::string_scalar target("+"); - cudf::string_scalar repl(""); - cudf::test::strings_column_wrapper targets({"+", "-"}); - cudf::test::strings_column_wrapper repls({"", ""}); + cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width); + auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile); - for (auto _ : state) { - cuda_event_timer raii(state, true, cudf::get_default_stream()); - switch (rt) { - case scalar: cudf::strings::replace(input, target, repl); break; - case slice: cudf::strings::replace_slice(input, repl, 1, 10); break; - case multi: - cudf::strings::replace_multiple( - input, cudf::strings_column_view(targets), cudf::strings_column_view(repls)); - break; - } - } + cudf::strings_column_view input(column->view()); - state.SetBytesProcessed(state.iterations() * input.chars_size(cudf::get_default_stream())); -} + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + auto const chars_size = input.chars_size(stream); + state.add_global_memory_reads(chars_size); + state.add_global_memory_writes(chars_size); -static void generate_bench_args(benchmark::internal::Benchmark* b) -{ - int const min_rows = 1 << 12; - int const max_rows = 1 << 24; - int const row_mult = 8; - int const min_rowlen = 1 << 5; - int const max_rowlen = 1 << 13; - int const len_mult = 2; - generate_string_bench_args(b, min_rows, max_rows, row_mult, min_rowlen, max_rowlen, len_mult); + if (api == "scalar") { + cudf::string_scalar target("+"); + cudf::string_scalar repl("-"); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::replace(input, target, repl); }); + } else if (api == "multi") { + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::test::strings_column_wrapper targets({"+", " "}); + cudf::test::strings_column_wrapper repls({"-", "_"}); + cudf::strings::replace_multiple( + input, cudf::strings_column_view(targets), cudf::strings_column_view(repls)); + }); + } else if (api == "slice") { + cudf::string_scalar repl("0123456789"); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::replace_slice(input, repl, 1, 10); }); + } } -#define STRINGS_BENCHMARK_DEFINE(name) \ - BENCHMARK_DEFINE_F(StringReplace, name) \ - (::benchmark::State & st) { BM_replace(st, replace_type::name); } \ - BENCHMARK_REGISTER_F(StringReplace, name) \ - ->Apply(generate_bench_args) \ - ->UseManualTime() \ - ->Unit(benchmark::kMillisecond); - -STRINGS_BENCHMARK_DEFINE(scalar) -STRINGS_BENCHMARK_DEFINE(slice) -STRINGS_BENCHMARK_DEFINE(multi) +NVBENCH_BENCH(bench_replace) + .set_name("replace") + .add_int64_axis("min_width", {0}) + .add_int64_axis("max_width", {32, 64, 128, 256}) + .add_int64_axis("num_rows", {32768, 262144, 2097152}) + .add_string_axis("api", {"scalar", "multi", "slice"}); diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 7cd4697f592..0c3244a1c75 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -946,6 +946,8 @@ class json_writer_options_builder; class json_writer_options { // Specify the sink to use for writer output sink_info _sink; + // Specify the compression format of the sink + compression_type _compression = compression_type::NONE; // maximum number of rows to write in each chunk (limits memory use) size_type _rows_per_chunk = std::numeric_limits::max(); // Set of columns to output @@ -1022,6 +1024,13 @@ class json_writer_options { */ [[nodiscard]] std::string const& get_na_rep() const { return _na_rep; } + /** + * @brief Returns compression type used for sink + * + * @return compression type for sink + */ + [[nodiscard]] compression_type get_compression() const { return _compression; } + /** * @brief Whether to output nulls as 'null'. * @@ -1066,6 +1075,13 @@ class json_writer_options { */ void set_table(table_view tbl) { _table = tbl; } + /** + * @brief Sets compression type to be used + * + * @param comptype Compression type for sink + */ + void set_compression(compression_type comptype) { _compression = comptype; } + /** * @brief Sets metadata. * @@ -1153,6 +1169,18 @@ class json_writer_options_builder { return *this; } + /** + * @brief Sets compression type of output sink + * + * @param comptype Compression type used + * @return this for chaining + */ + json_writer_options_builder& compression(compression_type comptype) + { + options._compression = comptype; + return *this; + } + /** * @brief Sets optional metadata (with column names). * diff --git a/cpp/src/binaryop/compiled/binary_ops.cuh b/cpp/src/binaryop/compiled/binary_ops.cuh index c6af0c3c58a..06987139188 100644 --- a/cpp/src/binaryop/compiled/binary_ops.cuh +++ b/cpp/src/binaryop/compiled/binary_ops.cuh @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include @@ -253,16 +253,11 @@ struct binary_op_double_device_dispatcher { template CUDF_KERNEL void for_each_kernel(cudf::size_type size, Functor f) { - int tid = threadIdx.x; - int blkid = blockIdx.x; - int blksz = blockDim.x; - int gridsz = gridDim.x; - - int start = tid + blkid * blksz; - int step = blksz * gridsz; + auto start = cudf::detail::grid_1d::global_thread_id(); + auto const stride = cudf::detail::grid_1d::grid_stride(); #pragma unroll - for (cudf::size_type i = start; i < size; i += step) { + for (auto i = start; i < size; i += stride) { f(i); } } @@ -282,9 +277,9 @@ void for_each(rmm::cuda_stream_view stream, cudf::size_type size, Functor f) int min_grid_size; CUDF_CUDA_TRY( cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, for_each_kernel)); - // 2 elements per thread. - int const grid_size = util::div_rounding_up_safe(size, 2 * block_size); - for_each_kernel<<>>(size, std::forward(f)); + auto grid = cudf::detail::grid_1d(size, block_size, 2 /* elements_per_thread */); + for_each_kernel<<>>( + size, std::forward(f)); } template diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp new file mode 100644 index 00000000000..b26a6292806 --- /dev/null +++ b/cpp/src/io/comp/comp.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "comp.hpp" + +#include "io/utilities/hostdevice_vector.hpp" +#include "nvcomp_adapter.hpp" + +#include +#include +#include +#include +#include +#include + +#include // GZIP compression + +namespace cudf::io::detail { + +namespace { + +/** + * @brief GZIP host compressor (includes header) + */ +std::vector compress_gzip(host_span src) +{ + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = src.size(); + zs.next_in = reinterpret_cast(const_cast(src.data())); + + std::vector dst; + zs.avail_out = 0; + zs.next_out = nullptr; + + int windowbits = 15; + int gzip_encoding = 16; + int ret = deflateInit2( + &zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed."); + + uint32_t estcomplen = deflateBound(&zs, src.size()); + dst.resize(estcomplen); + zs.avail_out = estcomplen; + zs.next_out = dst.data(); + + ret = deflate(&zs, Z_FINISH); + CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!"); + dst.resize(std::distance(dst.data(), zs.next_out)); + + ret = deflateEnd(&zs); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation"); + + return dst; +} + +/** + * @brief SNAPPY device compressor + */ +std::vector compress_snappy(host_span src, + rmm::cuda_stream_view stream) +{ + auto const d_src = + cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); + cudf::detail::hostdevice_vector> inputs(1, stream); + inputs[0] = d_src; + inputs.host_to_device_async(stream); + + auto dst_size = compress_max_output_chunk_size(nvcomp::compression_type::SNAPPY, src.size()); + rmm::device_uvector d_dst(dst_size, stream); + cudf::detail::hostdevice_vector> outputs(1, stream); + outputs[0] = d_dst; + outputs.host_to_device_async(stream); + + cudf::detail::hostdevice_vector hd_status(1, stream); + hd_status[0] = {}; + hd_status.host_to_device_async(stream); + + nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); + + hd_status.device_to_host_sync(stream); + CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, + "snappy compression failed"); + return cudf::detail::make_std_vector_sync(d_dst, stream); +} + +} // namespace + +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + switch (compression) { + case compression_type::GZIP: return compress_gzip(src); + case compression_type::SNAPPY: return compress_snappy(src, stream); + default: CUDF_FAIL("Unsupported compression type"); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp new file mode 100644 index 00000000000..652abbbeda6 --- /dev/null +++ b/cpp/src/io/comp/comp.hpp @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace CUDF_EXPORT cudf { +namespace io::detail { + +/** + * @brief Compresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Decompressed host buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Vector containing the Compressed output + */ +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream); + +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index fff1cf0c96a..090ea1430b5 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -980,27 +980,27 @@ __device__ int parse_gzip_header(uint8_t const* src, size_t src_size) { uint8_t flags = src[3]; hdr_len = 10; - if (flags & GZIPHeaderFlag::fextra) // Extra fields present + if (flags & detail::GZIPHeaderFlag::fextra) // Extra fields present { int xlen = src[hdr_len] | (src[hdr_len + 1] << 8); hdr_len += xlen; if (hdr_len >= src_size) return -1; } - if (flags & GZIPHeaderFlag::fname) // Original file name present + if (flags & detail::GZIPHeaderFlag::fname) // Original file name present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fcomment) // Comment present + if (flags & detail::GZIPHeaderFlag::fcomment) // Comment present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fhcrc) // Header CRC present + if (flags & detail::GZIPHeaderFlag::fhcrc) // Header CRC present { hdr_len += 2; } diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 1c9578fa5c0..ca722a9b7ee 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ using cudf::host_span; -namespace cudf { -namespace io { +namespace CUDF_EXPORT cudf { +namespace io::detail { /** * @brief Decompresses a system memory buffer. @@ -36,13 +36,35 @@ namespace io { * * @return Vector containing the Decompressed output */ -std::vector decompress(compression_type compression, host_span src); +[[nodiscard]] std::vector decompress(compression_type compression, + host_span src); +/** + * @brief Decompresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * @param dst Destination host span to place decompressed buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Size of decompressed output + */ size_t decompress(compression_type compression, host_span src, host_span dst, rmm::cuda_stream_view stream); +/** + * @brief Without actually decompressing the compressed input buffer passed, return the size of + * decompressed output. If the decompressed size cannot be extracted apriori, return zero. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * + * @return Size of decompressed output + */ +size_t get_uncompressed_size(compression_type compression, host_span src); + /** * @brief GZIP header flags * See https://tools.ietf.org/html/rfc1952 @@ -55,5 +77,5 @@ constexpr uint8_t fname = 0x08; // Original file name present constexpr uint8_t fcomment = 0x10; // Comment present }; // namespace GZIPHeaderFlag -} // namespace io -} // namespace cudf +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index fb8c308065d..b3d43fa786a 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -28,13 +28,12 @@ #include // memset -using cudf::host_span; - -namespace cudf { -namespace io { +namespace cudf::io::detail { #pragma pack(push, 1) +namespace { + struct gz_file_header_s { uint8_t id1; // 0x1f uint8_t id2; // 0x8b @@ -261,7 +260,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.avail_out = dst.size(); strm.total_out = 0; auto zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers - CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream"); + CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream: inflateInit2 failed"); do { if (strm.avail_out == 0) { dst.resize(strm.total_out + (1 << 30)); @@ -273,125 +272,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.total_out == dst.size()); dst.resize(strm.total_out); inflateEnd(&strm); - CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); -} - -std::vector decompress(compression_type compression, host_span src) -{ - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - - auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; - - switch (compression) { - case compression_type::AUTO: - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src.size())) { - compression = compression_type::GZIP; - comp_data = gz.comp_data; - comp_len = gz.comp_len; - uncomp_len = gz.isize; - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, raw, src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - compression = compression_type::ZIP; - comp_data = raw + file_start; - comp_len = lfh->comp_size; - uncomp_len = lfh->uncomp_size; - break; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - case compression_type::BZIP2: - if (src.size() > 4) { - auto const* fhdr = reinterpret_cast(raw); - // Check for BZIP2 file signature "BZh1" to "BZh9" - if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && - fhdr->blksz >= '1' && fhdr->blksz <= '9') { - compression = compression_type::BZIP2; - comp_data = raw; - comp_len = src.size(); - uncomp_len = 0; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - default: CUDF_FAIL("Unsupported compressed stream type"); - } - - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); - - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size - } - - if (compression == compression_type::GZIP || compression == compression_type::ZIP) { - // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); - return dst; - } - if (compression == compression_type::BZIP2) { - size_t src_ofs = 0; - size_t dst_ofs = 0; - int bz_err = 0; - std::vector dst(uncomp_len); - do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); - if (bz_err == BZ_OUTBUFF_FULL) { - // TBD: We could infer the compression ratio based on produced/consumed byte counts - // in order to minimize realloc events and over-allocation - dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); - dst.resize(dst_len); - uncomp_len = dst_len; - } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); - } - } while (bz_err == BZ_OUTBUFF_FULL); - CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); - return dst; - } - - CUDF_FAIL("Unsupported compressed stream type"); + CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream: Z_STREAM_END not encountered"); } /** @@ -536,14 +417,130 @@ size_t decompress_zstd(host_span src, CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` - cudf::detail::cuda_memcpy_async( - dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); return hd_stats[0].bytes_written; } +struct source_properties { + compression_type compression = compression_type::NONE; + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; +}; + +source_properties get_source_properties(compression_type compression, host_span src) +{ + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { + gz_archive_s gz; + auto const parse_succeeded = ParseGZArchive(&gz, src.data(), src.size()); + CUDF_EXPECTS(parse_succeeded, "Failed to parse GZIP header while fetching source properties"); + compression = compression_type::GZIP; + comp_data = gz.comp_data; + comp_len = gz.comp_len; + uncomp_len = gz.isize; + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::ZIP: { + zip_archive_s za; + if (OpenZipArchive(&za, raw, src.size())) { + size_t cdfh_ofs = 0; + for (int i = 0; i < za.eocd->num_entries; i++) { + auto const* cdfh = reinterpret_cast( + reinterpret_cast(za.cdfh) + cdfh_ofs); + int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; + if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { + // Bad cdir + break; + } + // For now, only accept with non-zero file sizes and DEFLATE + if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { + size_t lfh_ofs = cdfh->hdr_ofs; + auto const* lfh = reinterpret_cast(raw + lfh_ofs); + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { + if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { + size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; + size_t file_end = file_start + lfh->comp_size; + if (file_end <= src.size()) { + // Pick the first valid file of non-zero size (only 1 file expected in archive) + compression = compression_type::ZIP; + comp_data = raw + file_start; + comp_len = lfh->comp_size; + uncomp_len = lfh->uncomp_size; + break; + } + } + } + } + cdfh_ofs += cdfh_len; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::BZIP2: { + if (src.size() > 4) { + auto const* fhdr = reinterpret_cast(raw); + // Check for BZIP2 file signature "BZh1" to "BZh9" + if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && + fhdr->blksz >= '1' && fhdr->blksz <= '9') { + compression = compression_type::BZIP2; + comp_data = raw; + comp_len = src.size(); + uncomp_len = 0; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::SNAPPY: { + uncomp_len = 0; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) + { + uint32_t l = 0, c; + do { + c = *cur++; + auto const lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { + uncomp_len = 0; + break; + } + uncomp_len |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties"); + } + comp_data = raw; + comp_len = src.size(); + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + default: CUDF_FAIL("Unsupported compressed stream type"); + } + + return source_properties{compression, comp_data, comp_len, uncomp_len}; +} + +} // namespace + +size_t get_uncompressed_size(compression_type compression, host_span src) +{ + return get_source_properties(compression, src).uncomp_len; +} + size_t decompress(compression_type compression, host_span src, host_span dst, @@ -558,5 +555,63 @@ size_t decompress(compression_type compression, } } -} // namespace io -} // namespace cudf +std::vector decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto srcprops = get_source_properties(compression, src); + CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0, + "Unsupported compressed stream type"); + + if (srcprops.uncomp_len <= 0) { + srcprops.uncomp_len = + srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size + } + + if (compression == compression_type::GZIP) { + // INFLATE + std::vector dst(srcprops.uncomp_len); + decompress_gzip(src, dst); + return dst; + } + if (compression == compression_type::ZIP) { + std::vector dst(srcprops.uncomp_len); + cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len); + return dst; + } + if (compression == compression_type::BZIP2) { + size_t src_ofs = 0; + size_t dst_ofs = 0; + int bz_err = 0; + std::vector dst(srcprops.uncomp_len); + do { + size_t dst_len = srcprops.uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress( + srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + if (bz_err == BZ_OUTBUFF_FULL) { + // TBD: We could infer the compression ratio based on produced/consumed byte counts + // in order to minimize realloc events and over-allocation + dst_ofs = dst_len; + dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2); + dst.resize(dst_len); + srcprops.uncomp_len = dst_len; + } else if (bz_err == 0) { + srcprops.uncomp_len = dst_len; + dst.resize(srcprops.uncomp_len); + } + } while (bz_err == BZ_OUTBUFF_FULL); + CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); + return dst; + } + if (compression == compression_type::SNAPPY) { + std::vector dst(srcprops.uncomp_len); + decompress_snappy(src, dst); + return dst; + } + + CUDF_FAIL("Unsupported compressed stream type"); +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 279f5e71351..82d8152ca1c 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,70 @@ namespace cudf::io::json::detail { namespace { +class compressed_host_buffer_source final : public datasource { + public: + explicit compressed_host_buffer_source(std::unique_ptr const& src, + compression_type comptype) + : _comptype{comptype}, _dbuf_ptr{src->host_read(0, src->size())} + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || + comptype == compression_type::SNAPPY) { + _decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer); + } else { + _decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer); + _decompressed_ch_buffer_size = _decompressed_buffer.size(); + } + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) { + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + std::memcpy(dst, _decompressed_buffer.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + return std::make_unique(_decompressed_buffer.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + + private: + std::unique_ptr _dbuf_ptr; + compression_type _comptype; + size_t _decompressed_ch_buffer_size; + std::vector _decompressed_buffer; +}; + // Return total size of sources enclosing the passed range std::size_t sources_size(host_span> const sources, std::size_t range_offset, @@ -126,13 +191,12 @@ datasource::owning_buffer get_record_range_raw_input( { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - compression_type const reader_compression = reader_opts.get_compression(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -143,22 +207,16 @@ datasource::owning_buffer get_record_range_raw_input( int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - // The allocation for single source compressed input is estimated by assuming a ~4:1 - // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea - // of subchunks. - auto constexpr header_size = 4096; std::size_t buffer_size = - reader_compression != compression_type::NONE - ? total_source_size * estimated_compression_ratio + header_size - : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_buffer buffer(buffer_size, stream); device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; - auto readbufspan = ingest_raw_input( - bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream); + auto readbufspan = + ingest_raw_input(bufspan, sources, chunk_offset, chunk_size, delimiter, stream); auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = @@ -179,7 +237,6 @@ datasource::owning_buffer get_record_range_raw_input( buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), sources, - reader_compression, next_subchunk_start, size_per_subchunk, delimiter, @@ -196,11 +253,9 @@ datasource::owning_buffer get_record_range_raw_input( // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; - buffer_size = reader_compression != compression_type::NONE - ? 2 * buffer_size - : std::min(total_source_size, - buffer_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + buffer_size = std::min(total_source_size, + buffer_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; buffer.resize(buffer_size, stream); bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); } @@ -258,111 +313,11 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } -} // anonymous namespace - -device_span ingest_raw_input(device_span buffer, - host_span> sources, - compression_type compression, - std::size_t range_offset, - std::size_t range_size, - char delimiter, - rmm::cuda_stream_view stream) +table_with_metadata read_json_impl(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - - if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); - std::vector> h_buffers; - std::size_t bytes_read = 0; - std::transform_inclusive_scan(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - std::plus{}, - [](std::unique_ptr const& s) { return s->size(); }); - auto upper = - std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); - range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; - i++) { - if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read + - (num_delimiter_chars * delimiter_map.size()); - if (sources[i]->is_device_read_preferred(data_size)) { - bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); - bytes_read += h_buffer->size(); - } - range_offset = 0; - delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); - } - // Removing delimiter inserted after last non-empty source is read - if (!delimiter_map.empty()) { delimiter_map.pop_back(); } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator(delimiter); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - delimiter_map, stream, cudf::get_current_device_resource_ref()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - buffer.data()); - } - stream.synchronize(); - return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); - } - // TODO: allow byte range reading from multiple compressed files. - auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); - auto uncomp_data = decompress(compression, hbuffer); - auto ret_buffer = buffer.first(uncomp_data.size()); - cudf::detail::cuda_memcpy( - ret_buffer, - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream); - return ret_buffer; -} - -table_with_metadata read_json(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Specifying a byte range is supported only for JSON Lines"); - } - - if (sources.size() > 1) { - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, - "Multiple compressed inputs are not supported"); - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Multiple inputs are supported only for JSON Lines format"); - } - /* * The batched JSON reader enforces that the size of each batch is at most INT_MAX * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by @@ -462,4 +417,101 @@ table_with_metadata read_json(host_span> sources, {partial_tables[0].metadata.schema_info}}; } +} // anonymous namespace + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + std::size_t range_offset, + std::size_t range_size, + char delimiter, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + std::size_t bytes_read = 0; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](std::unique_ptr const& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { + if (sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); + bytes_read += h_buffer->size(); + } + range_offset = 0; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); + } + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1 && !delimiter_map.empty()) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator(delimiter); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + delimiter_map, stream, cudf::get_current_device_resource_ref()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + buffer.data()); + } + stream.synchronize(); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); +} + +table_with_metadata read_json(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Specifying a byte range is supported only for JSON Lines"); + } + + if (sources.size() > 1) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Multiple inputs are supported only for JSON Lines format"); + } + + if (reader_opts.get_compression() == compression_type::NONE) + return read_json_impl(sources, reader_opts, stream, mr); + + std::vector> compressed_sources; + for (size_t i = 0; i < sources.size(); i++) { + compressed_sources.emplace_back( + std::make_unique(sources[i], reader_opts.get_compression())); + } + // in read_json_impl, we need the compressed source size to actually be the + // uncompressed source size for correct batching + return read_json_impl(compressed_sources, reader_opts, stream, mr); +} + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 4def69cc629..ac980938522 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -32,10 +32,9 @@ namespace CUDF_EXPORT cudf { namespace io::json::detail { // Some magic numbers -constexpr int num_subchunks = 10; // per chunk_size -constexpr size_t min_subchunk_size = 10000; -constexpr int estimated_compression_ratio = 4; -constexpr int max_subchunks_prealloced = 3; +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int max_subchunks_prealloced = 3; /** * @brief Read from array of data sources into RMM buffer. The size of the returned device span @@ -45,15 +44,14 @@ constexpr int max_subchunks_prealloced = 3; * * @param buffer Device span buffer to which data is read * @param sources Array of data sources - * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source + * @param delimiter Delimiter character for JSONL inputs * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ device_span ingest_raw_input(device_span buffer, host_span> sources, - compression_type compression, size_t range_offset, size_t range_size, char delimiter, diff --git a/cpp/src/io/json/write_json.cu b/cpp/src/io/json/write_json.cu index e1241f8f90c..8156258c810 100644 --- a/cpp/src/io/json/write_json.cu +++ b/cpp/src/io/json/write_json.cu @@ -19,6 +19,7 @@ * @brief cuDF-IO JSON writer implementation */ +#include "io/comp/comp.hpp" #include "io/csv/durations.hpp" #include "io/utilities/parsing_utils.cuh" #include "lists/utilities.hpp" @@ -828,10 +829,10 @@ void write_chunked(data_sink* out_sink, } } -void write_json(data_sink* out_sink, - table_view const& table, - json_writer_options const& options, - rmm::cuda_stream_view stream) +void write_json_uncompressed(data_sink* out_sink, + table_view const& table, + json_writer_options const& options, + rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); std::vector user_column_names = [&]() { @@ -934,4 +935,24 @@ void write_json(data_sink* out_sink, } } +void write_json(data_sink* out_sink, + table_view const& table, + json_writer_options const& options, + rmm::cuda_stream_view stream) +{ + if (options.get_compression() != compression_type::NONE) { + std::vector hbuf; + auto hbuf_sink_ptr = data_sink::create(&hbuf); + write_json_uncompressed(hbuf_sink_ptr.get(), table, options, stream); + stream.synchronize(); + auto comp_hbuf = cudf::io::detail::compress( + options.get_compression(), + host_span(reinterpret_cast(hbuf.data()), hbuf.size()), + stream); + out_sink->host_write(comp_hbuf.data(), comp_hbuf.size()); + return; + } + write_json_uncompressed(out_sink, table, options, stream); +} + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 1fe5e5aa41e..7046b3b3f91 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -460,7 +460,7 @@ host_span OrcDecompressor::decompress_blocks(host_span= begin) || (page_begin <= end && page_end >= end)) - : ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end)); + // Test for list schemas. + auto const is_bounds_page_lists = + ((page_begin <= begin and page_end >= begin) or (page_begin <= end and page_end >= end)); + + // For non-list schemas, rows cannot span pages, so use a more restrictive test. Make sure to + // relax the test for `page_end` if we adjusted the `num_rows` for the last page to compensate + // for list row size estimates in `generate_list_column_row_count_estimates()` when chunked + // read mode. + auto const test_page_end_nonlists = + s->page.is_num_rows_adjusted ? page_end >= end : page_end > end; + + auto const is_bounds_page_nonlists = + (page_begin < begin and page_end > begin) or (page_begin < end and test_page_end_nonlists); + + return has_repetition ? is_bounds_page_lists : is_bounds_page_nonlists; } /** diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index a8a8c441a84..6aec4ce0ec2 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -433,6 +433,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, // definition levels bs->page.chunk_row = 0; bs->page.num_rows = 0; + bs->page.is_num_rows_adjusted = false; bs->page.skipped_values = -1; bs->page.skipped_leaf_values = 0; bs->page.str_bytes = 0; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 3b4d0e6dc80..ce9d48693ec 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -310,8 +310,10 @@ struct PageInfo { // - In the case of a nested schema, you have to decode the repetition and definition // levels to extract actual column values int32_t num_input_values; - int32_t chunk_row; // starting row of this page relative to the start of the chunk - int32_t num_rows; // number of rows in this page + int32_t chunk_row; // starting row of this page relative to the start of the chunk + int32_t num_rows; // number of rows in this page + bool is_num_rows_adjusted; // Flag to indicate if the number of rows of this page have been + // adjusted to compensate for the list row size estimates. // the next four are calculated in gpuComputePageStringSizes int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index f03f1214b9a..bcdae4cbd3b 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -729,7 +729,10 @@ struct set_final_row_count { if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } size_t const page_start_row = chunk.start_row + page.chunk_row; size_t const chunk_last_row = chunk.start_row + chunk.num_rows; - page.num_rows = chunk_last_row - page_start_row; + // Mark `is_num_rows_adjusted` to signal string decoders that the `num_rows` of this page has + // been adjusted. + page.is_num_rows_adjusted = page.num_rows != (chunk_last_row - page_start_row); + page.num_rows = chunk_last_row - page_start_row; } }; diff --git a/cpp/src/join/distinct_hash_join.cu b/cpp/src/join/distinct_hash_join.cu index 515d28201e8..ce4d2067b82 100644 --- a/cpp/src/join/distinct_hash_join.cu +++ b/cpp/src/join/distinct_hash_join.cu @@ -33,7 +33,9 @@ #include #include #include +#include #include +#include #include #include @@ -79,14 +81,9 @@ class build_keys_fn { /** * @brief Device output transform functor to construct `size_type` with `cuco::pair` or `cuco::pair` + * rhs_index_type>` */ struct output_fn { - __device__ constexpr cudf::size_type operator()( - cuco::pair const& x) const - { - return static_cast(x.second); - } __device__ constexpr cudf::size_type operator()( cuco::pair const& x) const { @@ -176,15 +173,33 @@ distinct_hash_join::inner_join(rmm::cuda_stream_view stream, auto const iter = cudf::detail::make_counting_transform_iterator( 0, build_keys_fn{d_probe_hasher}); - auto const build_indices_begin = - thrust::make_transform_output_iterator(build_indices->begin(), output_fn{}); - auto const probe_indices_begin = - thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{}); - - auto const [probe_indices_end, _] = this->_hash_table.retrieve( - iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, {stream.value()}); + auto found_indices = rmm::device_uvector(probe_table_num_rows, stream); + auto const found_begin = + thrust::make_transform_output_iterator(found_indices.begin(), output_fn{}); + + // TODO conditional find for nulls once `cuco::static_set::find_if` is added + // If `idx` is within the range `[0, probe_table_num_rows)` and `found_indices[idx]` is not equal + // to `JoinNoneValue`, then `idx` has a match in the hash set. + this->_hash_table.find_async(iter, iter + probe_table_num_rows, found_begin, stream.value()); + + auto const tuple_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type>( + [found_iter = found_indices.begin()] __device__(size_type idx) { + return thrust::tuple{*(found_iter + idx), idx}; + })); + auto const output_begin = + thrust::make_zip_iterator(build_indices->begin(), probe_indices->begin()); + auto const output_end = + thrust::copy_if(rmm::exec_policy_nosync(stream), + tuple_iter, + tuple_iter + probe_table_num_rows, + found_indices.begin(), + output_begin, + cuda::proclaim_return_type( + [] __device__(size_type idx) { return idx != JoinNoneValue; })); + auto const actual_size = std::distance(output_begin, output_end); - auto const actual_size = std::distance(probe_indices_begin, probe_indices_end); build_indices->resize(actual_size, stream); probe_indices->resize(actual_size, stream); diff --git a/cpp/tests/binaryop/binop-compiled-test.cpp b/cpp/tests/binaryop/binop-compiled-test.cpp index 3bd67001c16..7cdce1ff735 100644 --- a/cpp/tests/binaryop/binop-compiled-test.cpp +++ b/cpp/tests/binaryop/binop-compiled-test.cpp @@ -23,9 +23,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -820,4 +822,24 @@ TEST_F(BinaryOperationCompiledTest_NullOpsString, NullMin_Vector_Vector) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected, result->view()); } +TEST(BinaryOperationCompiledTest, LargeColumnNoOverflow) +{ + cudf::size_type num_rows{1'799'989'091}; + auto big = cudf::make_column_from_scalar( + cudf::numeric_scalar>{10, true}, num_rows); + auto small = cudf::make_column_from_scalar( + cudf::numeric_scalar>{1, true}, num_rows); + + auto mask = cudf::binary_operation(big->view(), + small->view(), + cudf::binary_operator::GREATER, + cudf::data_type{cudf::type_id::BOOL8}); + + auto agg = cudf::make_sum_aggregation(); + auto result = + cudf::reduce(mask->view(), *agg, cudf::data_type{cudf::type_to_id()}); + auto got = static_cast*>(result.get())->value(); + EXPECT_EQ(num_rows, got); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/io/json/json_chunked_reader.cu b/cpp/tests/io/json/json_chunked_reader.cu index c9ee6542a4d..9c8208b8300 100644 --- a/cpp/tests/io/json/json_chunked_reader.cu +++ b/cpp/tests/io/json/json_chunked_reader.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "io/comp/comp.hpp" #include "json_utils.cuh" #include @@ -31,36 +32,66 @@ /** * @brief Base test fixture for JSON reader tests */ -struct JsonReaderTest : public cudf::test::BaseFixture {}; +struct JsonReaderTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonReaderTest, + JsonReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -TEST_F(JsonReaderTest, ByteRange_SingleSource) +TEST_P(JsonReaderTest, ByteRange_SingleSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{json_string.c_str(), json_string.size()}) .compression(cudf::io::compression_type::NONE) .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .compression(comptype) + .lines(true); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); @@ -77,37 +108,54 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) } } -TEST_F(JsonReaderTest, ReadCompleteFiles) +TEST_P(JsonReaderTest, ReadCompleteFiles) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) + cudf::io::json_reader_options cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + cudf::io::table_with_metadata result = cudf::io::read_json(cin_options); std::vector part_tables; - for (auto filepath : filepaths) { - cudf::io::json_reader_options part_in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) + for (auto cfilepath : cfilepaths) { + cudf::io::json_reader_options part_cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepath}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - part_tables.push_back(cudf::io::read_json(part_in_options)); + part_tables.push_back(cudf::io::read_json(part_cin_options)); } auto part_table_views = std::vector(part_tables.size()); @@ -120,42 +168,69 @@ TEST_F(JsonReaderTest, ReadCompleteFiles) CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view()); } -TEST_F(JsonReaderTest, ByteRange_MultiSource) +TEST_P(JsonReaderTest, ByteRange_MultiSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) - .lines(true) + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .compression(cudf::io::compression_type::NONE) + .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) + .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto file_paths = json_lines_options.get_source().filepaths(); - std::vector> datasources; - for (auto& fp : file_paths) { - datasources.emplace_back(cudf::io::datasource::create(fp)); + std::vector> cdatasources; + for (auto& fp : cfilepaths) { + cdatasources.emplace_back(cudf::io::datasource::create(fp)); } + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 26937c9298a..3c8db99c3c7 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -14,6 +14,9 @@ * limitations under the License. */ +#include "io/comp/comp.hpp" +#include "io/comp/io_uncomp.hpp" + #include #include #include @@ -3252,4 +3255,59 @@ TEST_F(JsonReaderTest, JsonNestedDtypeFilterWithOrder) } } +struct JsonCompressedIOTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonCompressedIOTest, + JsonCompressedIOTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); + +TEST_P(JsonCompressedIOTest, BasicJsonLines) +{ + cudf::io::compression_type const comptype = GetParam(); + std::string data = to_records_orient( + {{{"0", "1"}, {"1", "1.1"}}, {{"0", "2"}, {"1", "2.2"}}, {{"0", "3"}, {"1", "3.3"}}}, "\n"); + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(data.data()), data.size()), + cudf::get_default_stream()); + auto decomp_out_buffer = cudf::io::detail::decompress( + comptype, cudf::host_span(cdata.data(), cdata.size())); + std::string const expected = R"({"0":1, "1":1.1} +{"0":2, "1":2.2} +{"0":3, "1":3.3})"; + EXPECT_EQ( + expected, + std::string(reinterpret_cast(decomp_out_buffer.data()), decomp_out_buffer.size())); + } else + cdata = std::vector(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .dtypes(std::vector{dtype(), dtype()}) + .compression(comptype) + .lines(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 2); + EXPECT_EQ(result.tbl->num_rows(), 3); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64); + + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + EXPECT_EQ(result.metadata.schema_info[1].name, "1"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int_wrapper{{1, 2, 3}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2, 3.3}}); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh index c31bb2d24e0..629a89fa777 100644 --- a/cpp/tests/io/json/json_utils.cuh +++ b/cpp/tests/io/json/json_utils.cuh @@ -32,7 +32,9 @@ template std::vector split_byte_range_reading( cudf::host_span> sources, + cudf::host_span> csources, cudf::io::json_reader_options const& reader_opts, + cudf::io::json_reader_options const& creader_opts, IndexType chunk_size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -49,7 +51,6 @@ std::vector split_byte_range_reading( rmm::device_uvector buffer(total_source_size, stream); auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, sources, - reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), reader_opts.get_delimiter(), @@ -95,10 +96,11 @@ std::vector split_byte_range_reading( record_ranges.emplace_back(prev, total_source_size); std::vector tables; + auto creader_opts_chunk = creader_opts; for (auto const& [chunk_start, chunk_end] : record_ranges) { - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + creader_opts_chunk.set_byte_range_offset(chunk_start); + creader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(csources, creader_opts_chunk, stream, mr)); } // assume all records have same number of columns, and inferred same type. (or schema is passed) // TODO a step before to merge all columns, types and infer final schema. diff --git a/cpp/tests/io/json/json_writer.cpp b/cpp/tests/io/json/json_writer.cpp index 39d31c406a5..b96fc6425e4 100644 --- a/cpp/tests/io/json/json_writer.cpp +++ b/cpp/tests/io/json/json_writer.cpp @@ -14,10 +14,14 @@ * limitations under the License. */ +#include "io/comp/io_uncomp.hpp" + #include #include +#include #include #include +#include #include #include @@ -31,7 +35,36 @@ struct JsonWriterTest : public cudf::test::BaseFixture {}; -TEST_F(JsonWriterTest, EmptyInput) +/** + * @brief Test fixture for parametrized JSON reader tests + */ +struct JsonCompressedWriterTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonCompressedWriterTest, + JsonCompressedWriterTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); + +void run_test(cudf::io::json_writer_options const& wopts, std::string const& expected) +{ + auto outbuf = wopts.get_sink().buffers().front(); + auto comptype = wopts.get_compression(); + cudf::io::write_json(wopts, cudf::test::get_default_stream()); + if (comptype != cudf::io::compression_type::NONE) { + auto decomp_out_buffer = cudf::io::detail::decompress( + comptype, + cudf::host_span(reinterpret_cast(outbuf->data()), outbuf->size())); + EXPECT_EQ(expected, + std::string_view(reinterpret_cast(decomp_out_buffer.data()), + decomp_out_buffer.size())); + } else + EXPECT_EQ(expected, std::string_view(outbuf->data(), outbuf->size())); +} + +TEST_P(JsonCompressedWriterTest, EmptyInput) { cudf::test::strings_column_wrapper col1; cudf::test::strings_column_wrapper col2; @@ -49,28 +82,21 @@ TEST_F(JsonWriterTest, EmptyInput) .lines(false) .na_rep("null") .build(); - - // Empty columns in table - cudf::io::write_json(out_options, cudf::test::get_default_stream()); - std::string const expected = R"([])"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, "[]"); // Empty columns in table - JSON Lines out_buffer.clear(); out_options.enable_lines(true); - cudf::io::write_json(out_options, cudf::test::get_default_stream()); - std::string const expected_lines = "\n"; - EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, "\n"); // Empty table - JSON Lines cudf::table_view tbl_view2{}; out_options.set_table(tbl_view2); out_buffer.clear(); - cudf::io::write_json(out_options, cudf::test::get_default_stream()); - EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, "\n"); } -TEST_F(JsonWriterTest, EmptyLeaf) +TEST_P(JsonCompressedWriterTest, EmptyLeaf) { cudf::test::strings_column_wrapper col1{""}; cudf::test::fixed_width_column_wrapper offsets{0, 0}; @@ -92,19 +118,14 @@ TEST_F(JsonWriterTest, EmptyLeaf) .lines(false) .na_rep("null") .build(); - - // Empty columns in table - cudf::io::write_json(out_options, cudf::test::get_default_stream()); - std::string const expected = R"([{"col1":"","col2":[],"col3":[]}])"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, R"([{"col1":"","col2":[],"col3":[]}])"); // Empty columns in table - JSON Lines out_buffer.clear(); out_options.enable_lines(true); - cudf::io::write_json(out_options, cudf::test::get_default_stream()); std::string const expected_lines = R"({"col1":"","col2":[],"col3":[]})" "\n"; - EXPECT_EQ(expected_lines, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected_lines); } TEST_F(JsonWriterTest, ErrorCases) @@ -141,33 +162,34 @@ TEST_F(JsonWriterTest, ErrorCases) cudf::logic_error); } -TEST_F(JsonWriterTest, PlainTable) +TEST_P(JsonCompressedWriterTest, PlainTable) { + cudf::io::compression_type const comptype = GetParam(); cudf::test::strings_column_wrapper col1{"a", "b", "c"}; cudf::test::strings_column_wrapper col2{"d", "e", "f"}; - cudf::test::fixed_width_column_wrapper col3{1, 2, 3}; - cudf::test::fixed_width_column_wrapper col4{1.5, 2.5, 3.5}; - cudf::test::fixed_width_column_wrapper col5{{1, 2, 3}, + cudf::test::fixed_width_column_wrapper col3{1, 2, 3}; + cudf::test::fixed_width_column_wrapper col4{1.5, 2.5, 3.5}; + cudf::test::fixed_width_column_wrapper col5{{1, 2, 3}, cudf::test::iterators::nulls_at({0, 2})}; cudf::table_view tbl_view{{col1, col2, col3, col4, col5}}; - cudf::io::table_metadata mt{{{"col1"}, {"col2"}, {"int"}, {"float"}, {"int16"}}}; + cudf::io::table_metadata mt{{{"col1"}, {"col2"}, {"col3"}, {"col4"}, {"col5"}}}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(true) - .metadata(mt) - .lines(false) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(true) + .metadata(mt) + .lines(false) + .compression(comptype) + .na_rep("null") + .build(); std::string const expected = - R"([{"col1":"a","col2":"d","int":1,"float":1.5,"int16":null},{"col1":"b","col2":"e","int":2,"float":2.5,"int16":2},{"col1":"c","col2":"f","int":3,"float":3.5,"int16":null}])"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + R"([{"col1":"a","col2":"d","col3":1,"col4":1.5,"col5":null},{"col1":"b","col2":"e","col3":2,"col4":2.5,"col5":2},{"col1":"c","col2":"f","col3":3,"col4":3.5,"col5":null}])"; + run_test(out_options, expected); } -TEST_F(JsonWriterTest, SimpleNested) +TEST_P(JsonCompressedWriterTest, SimpleNested) { std::string const data = R"( {"a": 1, "b": 2, "c": {"d": 3 }, "f": 5.5, "g": [1]} @@ -183,23 +205,23 @@ TEST_F(JsonWriterTest, SimpleNested) cudf::io::table_metadata mt{result.metadata}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(false) - .metadata(mt) - .lines(true) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(false) + .metadata(mt) + .lines(true) + .na_rep("null") + .build(); + std::string const expected = R"({"a":1,"b":2,"c":{"d":3},"f":5.5,"g":[1]} {"a":6,"b":7,"c":{"d":8},"f":10.5} {"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[2,null]} {"a":6,"b":7,"c":{"e":9},"f":10.5,"g":[3,4,5]} )"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } -TEST_F(JsonWriterTest, MixedNested) +TEST_P(JsonCompressedWriterTest, MixedNested) { std::string const data = R"( {"a": 1, "b": 2, "c": {"d": [3] }, "f": 5.5, "g": [ {"h": 1}]} @@ -215,20 +237,20 @@ TEST_F(JsonWriterTest, MixedNested) cudf::io::table_metadata mt{result.metadata}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(false) - .metadata(mt) - .lines(false) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(false) + .metadata(mt) + .lines(false) + .na_rep("null") + .build(); + std::string const expected = R"([{"a":1,"b":2,"c":{"d":[3]},"f":5.5,"g":[{"h":1}]},)" R"({"a":6,"b":7,"c":{"d":[8]},"f":10.5},)" R"({"a":1,"b":2,"c":{"e":4},"f":5.5,"g":[{"h":2},null]},)" R"({"a":6,"b":7,"c":{"e":9},"f":10.5,"g":[{"h":3},{"h":4},{"h":5}]}])"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } TEST_F(JsonWriterTest, WriteReadNested) @@ -375,7 +397,7 @@ TEST_F(JsonWriterTest, WriteReadNested) } } -TEST_F(JsonWriterTest, SpecialChars) +TEST_P(JsonCompressedWriterTest, SpecialChars) { cudf::test::fixed_width_column_wrapper a{1, 6, 1, 6}; cudf::test::strings_column_wrapper b{"abcd", "b\b\f\n\r\t", "\"c\"", "/\\"}; @@ -391,17 +413,15 @@ TEST_F(JsonWriterTest, SpecialChars) .na_rep("null") .build(); - cudf::io::write_json(out_options, cudf::test::get_default_stream()); std::string const expected = R"({"\"a\"":1,"'b'":"abcd"} {"\"a\"":6,"'b'":"b\b\f\n\r\t"} {"\"a\"":1,"'b'":"\"c\""} {"\"a\"":6,"'b'":"\/\\"} )"; - auto const output_string = std::string(out_buffer.data(), out_buffer.size()); - EXPECT_EQ(expected, output_string); + run_test(out_options, expected); } -TEST_F(JsonWriterTest, NullList) +TEST_P(JsonCompressedWriterTest, NullList) { std::string const data = R"( {"a": [null], "b": [[1, 2, 3], [null], [null, null, null], [4, null, 5]]} @@ -417,23 +437,23 @@ TEST_F(JsonWriterTest, NullList) cudf::io::table_metadata mt{result.metadata}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(true) - .metadata(mt) - .lines(true) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(true) + .metadata(mt) + .lines(true) + .na_rep("null") + .build(); + std::string const expected = R"({"a":[null],"b":[[1,2,3],[null],[null,null,null],[4,null,5]]} {"a":[2,null,null,3],"b":null} {"a":[null,null,4],"b":[[2,null],null]} {"a":[5,null,null],"b":[null,[3,4,5]]} )"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } -TEST_F(JsonWriterTest, ChunkedNested) +TEST_P(JsonCompressedWriterTest, ChunkedNested) { std::string const data = R"( {"a": 1, "b": -2, "c": { }, "e": [{"f": 1}]} @@ -455,15 +475,15 @@ TEST_F(JsonWriterTest, ChunkedNested) cudf::io::table_metadata mt{result.metadata}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(false) - .metadata(mt) - .lines(true) - .na_rep("null") - .rows_per_chunk(8); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(false) + .metadata(mt) + .lines(true) + .na_rep("null") + .rows_per_chunk(8) + .build(); + std::string const expected = R"({"a":1,"b":-2,"c":{},"e":[{"f":1}]} {"a":2,"b":-2,"c":{}} @@ -475,10 +495,10 @@ TEST_F(JsonWriterTest, ChunkedNested) {"a":8,"b":-2,"c":{"d":64},"e":[{"f":8}]} {"a":9,"b":-2,"c":{"d":81},"e":[{"f":9}]} )"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } -TEST_F(JsonWriterTest, StructAllNullCombinations) +TEST_P(JsonCompressedWriterTest, StructAllNullCombinations) { auto const_1_iter = thrust::make_constant_iterator(1); @@ -512,14 +532,14 @@ TEST_F(JsonWriterTest, StructAllNullCombinations) cudf::io::table_metadata mt{{{"a"}, {"b"}, {"c"}, {"d"}, {"e"}}}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(false) - .metadata(mt) - .lines(true) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(false) + .metadata(mt) + .lines(true) + .na_rep("null") + .build(); + std::string const expected = R"({} {"e":1} {"d":1} @@ -553,10 +573,10 @@ TEST_F(JsonWriterTest, StructAllNullCombinations) {"a":1,"b":1,"c":1,"d":1} {"a":1,"b":1,"c":1,"d":1,"e":1} )"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } -TEST_F(JsonWriterTest, Unicode) +TEST_P(JsonCompressedWriterTest, Unicode) { // UTF-8, UTF-16 cudf::test::strings_column_wrapper col1{"\"\\/\b\f\n\r\t", "ராபிட்ஸ்", "$€𐐷𤭢", "C𝞵𝓓𝒻"}; @@ -574,14 +594,13 @@ TEST_F(JsonWriterTest, Unicode) cudf::io::table_metadata mt{{{"col1"}, {"col2"}, {"int16"}}}; std::vector out_buffer; - auto destination = cudf::io::sink_info(&out_buffer); - auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view) - .include_nulls(true) - .metadata(mt) - .lines(true) - .na_rep("null"); - - cudf::io::write_json(options_builder.build(), cudf::test::get_default_stream()); + auto destination = cudf::io::sink_info(&out_buffer); + auto out_options = cudf::io::json_writer_options_builder(destination, tbl_view) + .include_nulls(true) + .metadata(mt) + .lines(true) + .na_rep("null") + .build(); std::string const expected = R"({"col1":"\"\\\/\b\f\n\r\t","col2":"C\u10ae\u226a\u31f3\u434f\u51f9\u6ca6\u738b\u8fbf\u9fb8\ua057\ubbdc\uc2a4\ud3f6\ue4fe\ufd20","int16":null} @@ -589,7 +608,7 @@ TEST_F(JsonWriterTest, Unicode) {"col1":"$\u20ac\ud801\udc37\ud852\udf62","col2":"\ud841\ude28\ud846\udd4c\ud849\uddc9\ud84c\uddca\ud850\udea9\ud854\udd7d\ud858\ude71\ud85f\udd31\ud860\udc72\ud864\udc79\ud869\udc22\ud86c\udded\ud872\udf2d\ud877\udeb7\ud878\udea6\u5c6e","int16":null} {"col1":"C\ud835\udfb5\ud835\udcd3\ud835\udcbb","col2":"\ud883\udf91\ud885\udd08\ud888\udf49","int16":4} )"; - EXPECT_EQ(expected, std::string(out_buffer.data(), out_buffer.size())); + run_test(out_options, expected); } CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index a212d7d654a..0703fa72f67 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -15,6 +15,7 @@ */ #include "../io/json/json_utils.cuh" +#include "io/comp/comp.hpp" #include "large_strings_fixture.hpp" #include @@ -25,10 +26,19 @@ #include #include -struct JsonLargeReaderTest : public cudf::test::StringsLargeTest {}; +struct JsonLargeReaderTest : public cudf::test::StringsLargeTest, + public testing::WithParamInterface {}; -TEST_F(JsonLargeReaderTest, MultiBatch) +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonLargeReaderTest, + JsonLargeReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::NONE)); + +TEST_P(JsonLargeReaderTest, MultiBatch) { + cudf::io::compression_type const comptype = GetParam(); + std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } @@ -48,11 +58,26 @@ TEST_F(JsonLargeReaderTest, MultiBatch) json_string += json_string; } + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + constexpr int num_sources = 2; std::vector> hostbufs( num_sources, cudf::host_span(reinterpret_cast(json_string.data()), json_string.size())); + std::vector> chostbufs( + num_sources, + cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = @@ -62,14 +87,20 @@ TEST_F(JsonLargeReaderTest, MultiBatch) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(chostbufs.data(), chostbufs.size())}) + .lines(true) + .compression(comptype) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); + + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); - std::vector> datasources; - for (auto& hb : hostbufs) { - datasources.emplace_back(cudf::io::datasource::create(hb)); - } // Test for different chunk sizes std::vector chunk_sizes{batch_size_upper_bound / 4, batch_size_upper_bound / 2, @@ -79,7 +110,9 @@ TEST_F(JsonLargeReaderTest, MultiBatch) for (auto chunk_size : chunk_sizes) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); diff --git a/dependencies.yaml b/dependencies.yaml index a4a4113d1e4..682aaa612b4 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -748,7 +748,7 @@ dependencies: common: - output_types: [conda, requirements, pyproject] packages: - - polars>=1.11,<1.14 + - polars>=1.11,<1.15 run_cudf_polars_experimental: common: - output_types: [conda, requirements, pyproject] diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 41a7db2285a..2958c286d20 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -16,24 +16,20 @@ set(cython_sources aggregation.pyx binaryop.pyx column.pyx - concat.pyx copying.pyx csv.pyx datetime.pyx filling.pyx groupby.pyx - hash.pyx interop.pyx join.pyx json.pyx - labeling.pyx lists.pyx merge.pyx null_mask.pyx orc.pyx parquet.pyx partitioning.pyx - quantiles.pyx reduce.pyx replace.pyx reshape.pyx @@ -50,7 +46,6 @@ set(cython_sources transform.pyx transpose.pyx types.pyx - unary.pyx utils.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 57df6899a22..19dc4488560 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -3,24 +3,20 @@ from . import ( binaryop, - concat, copying, csv, datetime, filling, groupby, - hash, interop, join, json, - labeling, merge, null_mask, nvtext, orc, parquet, partitioning, - quantiles, reduce, replace, reshape, @@ -35,7 +31,6 @@ text, timezone, transpose, - unary, ) MAX_COLUMN_SIZE = np.iinfo(np.int32).max diff --git a/python/cudf/cudf/_lib/column.pyi b/python/cudf/cudf/_lib/column.pyi index bb38488eefb..bdd90be45b8 100644 --- a/python/cudf/cudf/_lib/column.pyi +++ b/python/cudf/cudf/_lib/column.pyi @@ -2,8 +2,12 @@ from __future__ import annotations +from typing import Literal + from typing_extensions import Self +import pylibcudf as plc + from cudf._typing import Dtype, DtypeObj, ScalarLike from cudf.core.buffer import Buffer from cudf.core.column import ColumnBase @@ -71,3 +75,8 @@ class Column: # TODO: The val parameter should be Scalar, not ScalarLike @staticmethod def from_scalar(val: ScalarLike, size: int) -> ColumnBase: ... + @staticmethod + def from_pylibcudf( + col: plc.Column, data_ptr_exposed: bool = False + ) -> ColumnBase: ... + def to_pylibcudf(self, mode: Literal["read", "write"]) -> plc.Column: ... diff --git a/python/cudf/cudf/_lib/concat.pyx b/python/cudf/cudf/_lib/concat.pyx deleted file mode 100644 index e6c2d136f0d..00000000000 --- a/python/cudf/cudf/_lib/concat.pyx +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from libcpp cimport bool - -from cudf._lib.column cimport Column -from cudf._lib.utils cimport data_from_pylibcudf_table - -import pylibcudf - -from cudf.core.buffer import acquire_spill_lock - - -@acquire_spill_lock() -def concat_columns(object columns): - return Column.from_pylibcudf( - pylibcudf.concatenate.concatenate( - [col.to_pylibcudf(mode="read") for col in columns] - ) - ) - - -@acquire_spill_lock() -def concat_tables(object tables, bool ignore_index=False): - plc_tables = [] - for table in tables: - cols = table._columns - if not ignore_index: - cols = table._index._columns + cols - plc_tables.append(pylibcudf.Table([c.to_pylibcudf(mode="read") for c in cols])) - - return data_from_pylibcudf_table( - pylibcudf.concatenate.concatenate(plc_tables), - column_names=tables[0]._column_names, - index_names=None if ignore_index else tables[0]._index_names - ) diff --git a/python/cudf/cudf/_lib/hash.pyx b/python/cudf/cudf/_lib/hash.pyx deleted file mode 100644 index 89309b36371..00000000000 --- a/python/cudf/cudf/_lib/hash.pyx +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -import pylibcudf as plc - -from cudf.core.buffer import acquire_spill_lock - -from pylibcudf.table cimport Table - -from cudf._lib.column cimport Column - - -@acquire_spill_lock() -def hash_partition(list source_columns, list columns_to_hash, - int num_partitions): - plc_table, offsets = plc.partitioning.hash_partition( - plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), - columns_to_hash, - num_partitions - ) - return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets - - -@acquire_spill_lock() -def hash(list source_columns, str method, int seed=0): - cdef Table ctbl = Table( - [c.to_pylibcudf(mode="read") for c in source_columns] - ) - if method == "murmur3": - return Column.from_pylibcudf(plc.hashing.murmurhash3_x86_32(ctbl, seed)) - elif method == "xxhash64": - return Column.from_pylibcudf(plc.hashing.xxhash_64(ctbl, seed)) - elif method == "md5": - return Column.from_pylibcudf(plc.hashing.md5(ctbl)) - elif method == "sha1": - return Column.from_pylibcudf(plc.hashing.sha1(ctbl)) - elif method == "sha224": - return Column.from_pylibcudf(plc.hashing.sha224(ctbl)) - elif method == "sha256": - return Column.from_pylibcudf(plc.hashing.sha256(ctbl)) - elif method == "sha384": - return Column.from_pylibcudf(plc.hashing.sha384(ctbl)) - elif method == "sha512": - return Column.from_pylibcudf(plc.hashing.sha512(ctbl)) - else: - raise ValueError( - f"Unsupported hashing algorithm {method}." - ) diff --git a/python/cudf/cudf/_lib/labeling.pyx b/python/cudf/cudf/_lib/labeling.pyx deleted file mode 100644 index 524bfd3b2e8..00000000000 --- a/python/cudf/cudf/_lib/labeling.pyx +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. - -from libcpp cimport bool as cbool - -import pylibcudf as plc - -from cudf._lib.column cimport Column -from cudf.core.buffer import acquire_spill_lock - - -# Note that the parameter input shadows a Python built-in in the local scope, -# but I'm not too concerned about that since there's no use-case for actual -# input in this context. -@acquire_spill_lock() -def label_bins(Column input, Column left_edges, cbool left_inclusive, - Column right_edges, cbool right_inclusive): - plc_column = plc.labeling.label_bins( - input.to_pylibcudf(mode="read"), - left_edges.to_pylibcudf(mode="read"), - plc.labeling.Inclusive.YES if left_inclusive else plc.labeling.Inclusive.NO, - right_edges.to_pylibcudf(mode="read"), - plc.labeling.Inclusive.YES if right_inclusive else plc.labeling.Inclusive.NO, - ) - return Column.from_pylibcudf(plc_column) diff --git a/python/cudf/cudf/_lib/unary.pyx b/python/cudf/cudf/_lib/unary.pyx deleted file mode 100644 index d5602fd5a1c..00000000000 --- a/python/cudf/cudf/_lib/unary.pyx +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf._lib.column cimport Column -from cudf._lib.types cimport dtype_to_pylibcudf_type - -import numpy as np - -import pylibcudf - -from cudf.api.types import is_decimal_dtype -from cudf.core.buffer import acquire_spill_lock - - -@acquire_spill_lock() -def unary_operation(Column input, object op): - return Column.from_pylibcudf( - pylibcudf.unary.unary_operation(input.to_pylibcudf(mode="read"), op) - ) - - -@acquire_spill_lock() -def is_null(Column input): - return Column.from_pylibcudf( - pylibcudf.unary.is_null(input.to_pylibcudf(mode="read")) - ) - - -@acquire_spill_lock() -def is_valid(Column input): - return Column.from_pylibcudf( - pylibcudf.unary.is_valid(input.to_pylibcudf(mode="read")) - ) - - -@acquire_spill_lock() -def cast(Column input, object dtype=np.float64): - result = Column.from_pylibcudf( - pylibcudf.unary.cast( - input.to_pylibcudf(mode="read"), - dtype_to_pylibcudf_type(dtype) - ) - ) - - if is_decimal_dtype(result.dtype): - result.dtype.precision = dtype.precision - return result - - -@acquire_spill_lock() -def is_nan(Column input): - return Column.from_pylibcudf( - pylibcudf.unary.is_nan(input.to_pylibcudf(mode="read")) - ) - - -@acquire_spill_lock() -def is_non_nan(Column input): - return Column.from_pylibcudf( - pylibcudf.unary.is_not_nan(input.to_pylibcudf(mode="read")) - ) diff --git a/python/cudf/cudf/_lib/utils.pxd b/python/cudf/cudf/_lib/utils.pxd index 623c5064a1a..6db3036d514 100644 --- a/python/cudf/cudf/_lib/utils.pxd +++ b/python/cudf/cudf/_lib/utils.pxd @@ -10,7 +10,7 @@ from pylibcudf.libcudf.table.table cimport table, table_view cdef data_from_unique_ptr( unique_ptr[table] c_tbl, column_names, index_names=*) -cdef data_from_pylibcudf_table(tbl, column_names, index_names=*) +cpdef data_from_pylibcudf_table(tbl, column_names, index_names=*) cpdef data_from_pylibcudf_io(tbl_with_meta, column_names = *, index_names = *) cdef data_from_table_view( table_view tv, object owner, object column_names, object index_names=*) @@ -18,5 +18,5 @@ cdef table_view table_view_from_columns(columns) except * cdef table_view table_view_from_table(tbl, ignore_index=*) except* cdef columns_from_unique_ptr(unique_ptr[table] c_tbl) cdef columns_from_table_view(table_view tv, object owners) -cdef columns_from_pylibcudf_table(tbl) +cpdef columns_from_pylibcudf_table(tbl) cdef _data_from_columns(columns, column_names, index_names=*) diff --git a/python/cudf/cudf/_lib/utils.pyx b/python/cudf/cudf/_lib/utils.pyx index 292de82e4c4..244d7fdc006 100644 --- a/python/cudf/cudf/_lib/utils.pyx +++ b/python/cudf/cudf/_lib/utils.pyx @@ -229,7 +229,7 @@ cdef columns_from_unique_ptr( return columns -cdef columns_from_pylibcudf_table(tbl): +cpdef columns_from_pylibcudf_table(tbl): """Convert a pylibcudf table into list of columns. Parameters @@ -309,7 +309,7 @@ cdef data_from_unique_ptr( ) -cdef data_from_pylibcudf_table(tbl, column_names, index_names=None): +cpdef data_from_pylibcudf_table(tbl, column_names, index_names=None): return _data_from_columns( columns_from_pylibcudf_table(tbl), column_names, diff --git a/python/cudf/cudf/core/_internals/unary.py b/python/cudf/cudf/core/_internals/unary.py new file mode 100644 index 00000000000..3b8e3db60a7 --- /dev/null +++ b/python/cudf/cudf/core/_internals/unary.py @@ -0,0 +1,64 @@ +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pylibcudf as plc + +from cudf._lib.types import dtype_to_pylibcudf_type +from cudf.api.types import is_decimal_dtype +from cudf.core.buffer import acquire_spill_lock + +if TYPE_CHECKING: + from cudf._typing import Dtype + from cudf.core.column import ColumnBase + + +@acquire_spill_lock() +def unary_operation( + col: ColumnBase, op: plc.unary.UnaryOperator +) -> ColumnBase: + return type(col).from_pylibcudf( + plc.unary.unary_operation(col.to_pylibcudf(mode="read"), op) + ) + + +@acquire_spill_lock() +def is_null(col: ColumnBase) -> ColumnBase: + return type(col).from_pylibcudf( + plc.unary.is_null(col.to_pylibcudf(mode="read")) + ) + + +@acquire_spill_lock() +def is_valid(col: ColumnBase) -> ColumnBase: + return type(col).from_pylibcudf( + plc.unary.is_valid(col.to_pylibcudf(mode="read")) + ) + + +@acquire_spill_lock() +def cast(col: ColumnBase, dtype: Dtype) -> ColumnBase: + result = type(col).from_pylibcudf( + plc.unary.cast( + col.to_pylibcudf(mode="read"), dtype_to_pylibcudf_type(dtype) + ) + ) + + if is_decimal_dtype(result.dtype): + result.dtype.precision = dtype.precision # type: ignore[union-attr] + return result + + +@acquire_spill_lock() +def is_nan(col: ColumnBase) -> ColumnBase: + return type(col).from_pylibcudf( + plc.unary.is_nan(col.to_pylibcudf(mode="read")) + ) + + +@acquire_spill_lock() +def is_non_nan(col: ColumnBase) -> ColumnBase: + return type(col).from_pylibcudf( + plc.unary.is_not_nan(col.to_pylibcudf(mode="read")) + ) diff --git a/python/cudf/cudf/core/column/categorical.py b/python/cudf/cudf/core/column/categorical.py index 087d0ed65f5..7354b917f90 100644 --- a/python/cudf/cudf/core/column/categorical.py +++ b/python/cudf/cudf/core/column/categorical.py @@ -14,6 +14,7 @@ import cudf from cudf import _lib as libcudf from cudf._lib.transform import bools_to_mask +from cudf.core._internals import unary from cudf.core.column import column from cudf.core.column.methods import ColumnMethods from cudf.core.dtypes import CategoricalDtype, IntervalDtype @@ -1018,12 +1019,12 @@ def isnull(self) -> ColumnBase: """ Identify missing values in a CategoricalColumn. """ - result = libcudf.unary.is_null(self) + result = unary.is_null(self) if self.categories.dtype.kind == "f": # Need to consider `np.nan` values in case # of an underlying float column - categories = libcudf.unary.is_nan(self.categories) + categories = unary.is_nan(self.categories) if categories.any(): code = self._encode(np.nan) result = result | (self.codes == cudf.Scalar(code)) @@ -1034,12 +1035,12 @@ def notnull(self) -> ColumnBase: """ Identify non-missing values in a CategoricalColumn. """ - result = libcudf.unary.is_valid(self) + result = unary.is_valid(self) if self.categories.dtype.kind == "f": # Need to consider `np.nan` values in case # of an underlying float column - categories = libcudf.unary.is_nan(self.categories) + categories = unary.is_nan(self.categories) if categories.any(): code = self._encode(np.nan) result = result & (self.codes != cudf.Scalar(code)) @@ -1203,9 +1204,7 @@ def _concat( elif newsize == 0: codes_col = column.column_empty(0, head.codes.dtype, masked=True) else: - # Filter out inputs that have 0 length, then concatenate. - codes = [o for o in codes if len(o)] - codes_col = libcudf.concat.concat_columns(objs) + codes_col = column.concat_columns(codes) # type: ignore[arg-type] codes_col = as_unsigned_codes( len(cats), diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index d2f9d208c77..f6eaea4b783 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -19,6 +19,7 @@ from pandas.core.arrays.arrow.extension_types import ArrowIntervalType from typing_extensions import Self +import pylibcudf as plc import rmm import cudf @@ -47,6 +48,7 @@ is_string_dtype, ) from cudf.core._compat import PANDAS_GE_210 +from cudf.core._internals import unary from cudf.core._internals.timezones import get_compatible_timezone from cudf.core.abc import Serializable from cudf.core.buffer import ( @@ -713,12 +715,12 @@ def isnull(self) -> ColumnBase: if not self.has_nulls(include_nan=self.dtype.kind == "f"): return as_column(False, length=len(self)) - result = libcudf.unary.is_null(self) + result = unary.is_null(self) if self.dtype.kind == "f": # Need to consider `np.nan` values in case # of a float column - result = result | libcudf.unary.is_nan(self) + result = result | unary.is_nan(self) return result @@ -727,12 +729,12 @@ def notnull(self) -> ColumnBase: if not self.has_nulls(include_nan=self.dtype.kind == "f"): return as_column(True, length=len(self)) - result = libcudf.unary.is_valid(self) + result = unary.is_valid(self) if self.dtype.kind == "f": # Need to consider `np.nan` values in case # of a float column - result = result & libcudf.unary.is_non_nan(self) + result = result & unary.is_non_nan(self) return result @@ -2299,4 +2301,10 @@ def concat_columns(objs: "MutableSequence[ColumnBase]") -> ColumnBase: return column_empty(0, head.dtype, masked=True) # Filter out inputs that have 0 length, then concatenate. - return libcudf.concat.concat_columns([o for o in objs if len(o)]) + objs_with_len = [o for o in objs if len(o)] + with acquire_spill_lock(): + return Column.from_pylibcudf( + plc.concatenate.concatenate( + [col.to_pylibcudf(mode="read") for col in objs_with_len] + ) + ) diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index bd0d72b9bc0..16124cf0a7d 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -14,17 +14,19 @@ import pandas as pd import pyarrow as pa +import pylibcudf as plc + import cudf from cudf import _lib as libcudf -from cudf._lib.labeling import label_bins from cudf._lib.search import search_sorted from cudf.core._compat import PANDAS_GE_220 +from cudf.core._internals import unary from cudf.core._internals.timezones import ( check_ambiguous_and_nonexistent, get_compatible_timezone, get_tz_data, ) -from cudf.core.buffer import Buffer +from cudf.core.buffer import Buffer, acquire_spill_lock from cudf.core.column import ColumnBase, as_column, column, string from cudf.core.column.timedelta import _unit_to_nanoseconds_conversion from cudf.utils.dtypes import _get_base_dtype @@ -490,7 +492,7 @@ def as_datetime_column(self, dtype: Dtype) -> DatetimeColumn: "Cannot use .astype to convert from timezone-naive dtype to timezone-aware dtype. " "Use tz_localize instead." ) - return libcudf.unary.cast(self, dtype=dtype) + return unary.cast(self, dtype=dtype) # type: ignore[return-value] def as_timedelta_column(self, dtype: Dtype) -> None: # type: ignore[override] raise TypeError( @@ -818,13 +820,16 @@ def _find_ambiguous_and_nonexistent( # The end of an ambiguous time period is what Clock 2 reads at # the moment of transition: ambiguous_end = clock_2.apply_boolean_mask(cond) - ambiguous = label_bins( - self, - left_edges=ambiguous_begin, - left_inclusive=True, - right_edges=ambiguous_end, - right_inclusive=False, - ).notnull() + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + self.to_pylibcudf(mode="read"), + ambiguous_begin.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES, + ambiguous_end.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.NO, + ) + ambiguous = libcudf.column.Column.from_pylibcudf(plc_column) + ambiguous = ambiguous.notnull() # At the start of a non-existent time period, Clock 2 reads less # than Clock 1 (which has been turned forward): @@ -834,13 +839,16 @@ def _find_ambiguous_and_nonexistent( # The end of the non-existent time period is what Clock 1 reads # at the moment of transition: nonexistent_end = clock_1.apply_boolean_mask(cond) - nonexistent = label_bins( - self, - left_edges=nonexistent_begin, - left_inclusive=True, - right_edges=nonexistent_end, - right_inclusive=False, - ).notnull() + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + self.to_pylibcudf(mode="read"), + nonexistent_begin.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES, + nonexistent_end.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.NO, + ) + nonexistent = libcudf.column.Column.from_pylibcudf(plc_column) + nonexistent = nonexistent.notnull() return ambiguous, nonexistent diff --git a/python/cudf/cudf/core/column/decimal.py b/python/cudf/cudf/core/column/decimal.py index 8ae06f72d1e..ce7aa91f775 100644 --- a/python/cudf/cudf/core/column/decimal.py +++ b/python/cudf/cudf/core/column/decimal.py @@ -3,7 +3,6 @@ from __future__ import annotations import warnings -from collections.abc import Sequence from decimal import Decimal from typing import TYPE_CHECKING, cast @@ -17,6 +16,7 @@ from_decimal as cpp_from_decimal, ) from cudf.api.types import is_scalar +from cudf.core._internals import unary from cudf.core.buffer import as_buffer from cudf.core.column import ColumnBase from cudf.core.dtypes import ( @@ -85,7 +85,7 @@ def as_decimal_column( if dtype == self.dtype: return self - return libcudf.unary.cast(self, dtype) + return unary.cast(self, dtype) # type: ignore[return-value] def as_string_column(self) -> cudf.core.column.StringColumn: if len(self) > 0: @@ -216,23 +216,10 @@ def normalize_binop_value(self, other): ) return NotImplemented - def _decimal_quantile( - self, q: float | Sequence[float], interpolation: str, exact: bool - ) -> ColumnBase: - quant = [float(q)] if not isinstance(q, (Sequence, np.ndarray)) else q - # get sorted indices and exclude nulls - indices = libcudf.sort.order_by( - [self], [True], "first", stable=True - ).slice(self.null_count, len(self)) - result = libcudf.quantiles.quantile( - self, quant, interpolation, indices, exact - ) - return result._with_type_metadata(self.dtype) - def as_numerical_column( self, dtype: Dtype ) -> "cudf.core.column.NumericalColumn": - return libcudf.unary.cast(self, dtype) + return unary.cast(self, dtype) # type: ignore[return-value] class Decimal32Column(DecimalBaseColumn): diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index f79496ed0ec..36d1bdb45b6 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -14,6 +14,7 @@ import cudf from cudf import _lib as libcudf from cudf.api.types import is_integer, is_scalar +from cudf.core._internals import unary from cudf.core.column import ColumnBase, as_column, column, string from cudf.core.dtypes import CategoricalDtype from cudf.core.mixins import BinaryOperand @@ -125,7 +126,7 @@ def indices_of(self, value: ScalarLike) -> NumericalColumn: and self.dtype.kind in {"c", "f"} and np.isnan(value) ): - nan_col = libcudf.unary.is_nan(self) + nan_col = unary.is_nan(self) return nan_col.indices_of(True) else: return super().indices_of(value) @@ -184,7 +185,7 @@ def unary_operator(self, unaryop: str | Callable) -> ColumnBase: unaryop = unaryop.upper() unaryop = _unaryop_map.get(unaryop, unaryop) unaryop = pylibcudf.unary.UnaryOperator[unaryop] - return libcudf.unary.unary_operation(self, unaryop) + return unary.unary_operation(self, unaryop) def __invert__(self): if self.dtype.kind in "ui": @@ -388,13 +389,13 @@ def as_timedelta_column( def as_decimal_column( self, dtype: Dtype ) -> "cudf.core.column.DecimalBaseColumn": - return libcudf.unary.cast(self, dtype) + return unary.cast(self, dtype) # type: ignore[return-value] def as_numerical_column(self, dtype: Dtype) -> NumericalColumn: dtype = cudf.dtype(dtype) if dtype == self.dtype: return self - return libcudf.unary.cast(self, dtype) + return unary.cast(self, dtype) # type: ignore[return-value] def all(self, skipna: bool = True) -> bool: # If all entries are null the result is True, including when the column @@ -421,7 +422,7 @@ def any(self, skipna: bool = True) -> bool: def nan_count(self) -> int: if self.dtype.kind != "f": return 0 - nan_col = libcudf.unary.is_nan(self) + nan_col = unary.is_nan(self) return nan_col.sum() def _process_values_for_isin( diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index f6ab91f2f01..6d639337401 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -7,9 +7,11 @@ import numpy as np +import pylibcudf as plc + import cudf from cudf import _lib as libcudf -from cudf.core.buffer import Buffer +from cudf.core.buffer import Buffer, acquire_spill_lock from cudf.core.column import ColumnBase from cudf.core.missing import NA from cudf.core.mixins import Scannable @@ -145,9 +147,15 @@ def quantile( indices = libcudf.sort.order_by( [self], [True], "first", stable=True ).slice(self.null_count, len(self)) - result = libcudf.quantiles.quantile( - self, q, interpolation, indices, exact - ) + with acquire_spill_lock(): + plc_column = plc.quantiles.quantile( + self.to_pylibcudf(mode="read"), + q, + plc.types.Interpolation[interpolation.upper()], + indices.to_pylibcudf(mode="read"), + exact, + ) + result = type(self).from_pylibcudf(plc_column) # type: ignore[assignment] if return_scalar: scalar_result = result.element_indexing(0) if interpolation in {"lower", "higher", "nearest"}: diff --git a/python/cudf/cudf/core/column/timedelta.py b/python/cudf/cudf/core/column/timedelta.py index c3ad09cf898..620fe31c30f 100644 --- a/python/cudf/cudf/core/column/timedelta.py +++ b/python/cudf/cudf/core/column/timedelta.py @@ -13,6 +13,7 @@ import cudf from cudf import _lib as libcudf from cudf.api.types import is_scalar +from cudf.core._internals import unary from cudf.core.buffer import Buffer, acquire_spill_lock from cudf.core.column import ColumnBase, column, string from cudf.utils.dtypes import np_to_pa_dtype @@ -307,7 +308,7 @@ def as_string_column(self) -> cudf.core.column.StringColumn: def as_timedelta_column(self, dtype: Dtype) -> TimeDeltaColumn: if dtype == self.dtype: return self - return libcudf.unary.cast(self, dtype=dtype) + return unary.cast(self, dtype=dtype) # type: ignore[return-value] def find_and_replace( self, diff --git a/python/cudf/cudf/core/cut.py b/python/cudf/cudf/core/cut.py index c9b1fa2669c..a4d12cfc7f0 100644 --- a/python/cudf/cudf/core/cut.py +++ b/python/cudf/cudf/core/cut.py @@ -6,8 +6,12 @@ import numpy as np import pandas as pd +import pylibcudf as plc + import cudf +from cudf._lib.column import Column from cudf.api.types import is_list_like +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import as_column from cudf.core.column.categorical import CategoricalColumn, as_unsigned_codes from cudf.core.index import IntervalIndex, interval_range @@ -256,9 +260,19 @@ def cut( # the input arr must be changed to the same type as the edges input_arr = input_arr.astype(left_edges.dtype) # get the indexes for the appropriate number - index_labels = cudf._lib.labeling.label_bins( - input_arr, left_edges, left_inclusive, right_edges, right_inclusive - ) + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + input_arr.to_pylibcudf(mode="read"), + left_edges.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if left_inclusive + else plc.labeling.Inclusive.NO, + right_edges.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if right_inclusive + else plc.labeling.Inclusive.NO, + ) + index_labels = Column.from_pylibcudf(plc_column) if labels is False: # if labels is false we return the index labels, we return them @@ -283,7 +297,7 @@ def cut( # should allow duplicate categories. return interval_labels[index_labels] - index_labels = as_unsigned_codes(len(interval_labels), index_labels) + index_labels = as_unsigned_codes(len(interval_labels), index_labels) # type: ignore[arg-type] col = CategoricalColumn( data=None, diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index bf1c39b23da..bd78d5dd9f1 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -26,6 +26,8 @@ from pandas.io.formats.printing import pprint_thing from typing_extensions import Self, assert_never +import pylibcudf as plc + import cudf import cudf.core.common from cudf import _lib as libcudf @@ -43,6 +45,7 @@ from cudf.core import column, df_protocol, indexing_utils, reshape from cudf.core._compat import PANDAS_LT_300 from cudf.core.abc import Serializable +from cudf.core.buffer import acquire_spill_lock from cudf.core.column import ( CategoricalColumn, ColumnBase, @@ -1784,11 +1787,32 @@ def _concat( ) # Concatenate the Tables - out = cls._from_data( - *libcudf.concat.concat_tables( - tables, ignore_index=ignore_index or are_all_range_index + ignore = ignore_index or are_all_range_index + index_names = None if ignore else tables[0]._index_names + column_names = tables[0]._column_names + with acquire_spill_lock(): + plc_tables = [ + plc.Table( + [ + c.to_pylibcudf(mode="read") + for c in ( + table._columns + if ignore + else itertools.chain( + table._index._columns, table._columns + ) + ) + ] + ) + for table in tables + ] + + concatted = libcudf.utils.data_from_pylibcudf_table( + plc.concatenate.concatenate(plc_tables), + column_names=column_names, + index_names=index_names, ) - ) + out = cls._from_data(*concatted) # If ignore_index is True, all input frames are empty, and at # least one input frame has an index, assign a new RangeIndex @@ -4962,7 +4986,9 @@ def apply_chunks( ) @_performance_tracking - def partition_by_hash(self, columns, nparts, keep_index=True): + def partition_by_hash( + self, columns, nparts: int, keep_index: bool = True + ) -> list[DataFrame]: """Partition the dataframe by the hashed value of data in *columns*. Parameters @@ -4986,13 +5012,21 @@ def partition_by_hash(self, columns, nparts, keep_index=True): else: cols = [*self._columns] - output_columns, offsets = libcudf.hash.hash_partition( - cols, key_indices, nparts - ) + with acquire_spill_lock(): + plc_table, offsets = plc.partitioning.hash_partition( + plc.Table([col.to_pylibcudf(mode="read") for col in cols]), + key_indices, + nparts, + ) + output_columns = [ + libcudf.column.Column.from_pylibcudf(col) + for col in plc_table.columns() + ] + outdf = self._from_columns_like_self( output_columns, self._column_names, - self._index_names if keep_index else None, + self._index_names if keep_index else None, # type: ignore[arg-type] ) # Slice into partitions. Notice, `hash_partition` returns the start # offset of each partition thus we skip the first offset diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 2b4a17f9559..30868924bcd 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -799,15 +799,20 @@ def _quantile_table( null_precedence = [plc.types.NullOrder[key] for key in null_precedence] - return self._from_columns_like_self( - libcudf.quantiles.quantile_table( - [*self._columns], + with acquire_spill_lock(): + plc_table = plc.quantiles.quantiles( + plc.Table( + [c.to_pylibcudf(mode="read") for c in self._columns] + ), q, interpolation, is_sorted, column_order, null_precedence, - ), + ) + columns = libcudf.utils.columns_from_pylibcudf_table(plc_table) + return self._from_columns_like_self( + columns, column_names=self._column_names, ) diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index e031f2a4e8e..9130779c3e9 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -21,7 +21,7 @@ import pandas as pd from typing_extensions import Self -import pylibcudf +import pylibcudf as plc import cudf import cudf._lib as libcudf @@ -2817,7 +2817,20 @@ def memory_usage(self, index=True, deep=False): """ raise NotImplementedError - def hash_values(self, method="murmur3", seed=None): + def hash_values( + self, + method: Literal[ + "murmur3", + "xxhash64", + "md5", + "sha1", + "sha224", + "sha256", + "sha384", + "sha512", + ] = "murmur3", + seed: int | None = None, + ) -> cudf.Series: """Compute the hash of values in this column. Parameters @@ -2894,11 +2907,31 @@ def hash_values(self, method="murmur3", seed=None): "Provided seed value has no effect for the hash method " f"`{method}`. Only {seed_hash_methods} support seeds." ) - # Note that both Series and DataFrame return Series objects from this - # calculation, necessitating the unfortunate circular reference to the - # child class here. + with acquire_spill_lock(): + plc_table = plc.Table( + [c.to_pylibcudf(mode="read") for c in self._columns] + ) + if method == "murmur3": + plc_column = plc.hashing.murmurhash3_x86_32(plc_table, seed) + elif method == "xxhash64": + plc_column = plc.hashing.xxhash_64(plc_table, seed) + elif method == "md5": + plc_column = plc.hashing.md5(plc_table) + elif method == "sha1": + plc_column = plc.hashing.sha1(plc_table) + elif method == "sha224": + plc_column = plc.hashing.sha224(plc_table) + elif method == "sha256": + plc_column = plc.hashing.sha256(plc_table) + elif method == "sha384": + plc_column = plc.hashing.sha384(plc_table) + elif method == "sha512": + plc_column = plc.hashing.sha512(plc_table) + else: + raise ValueError(f"Unsupported hashing algorithm {method}.") + result = libcudf.column.Column.from_pylibcudf(plc_column) return cudf.Series._from_column( - libcudf.hash.hash([*self._columns], method, seed), + result, index=self.index, ) @@ -6270,7 +6303,7 @@ def rank( if method not in {"average", "min", "max", "first", "dense"}: raise KeyError(method) - method_enum = pylibcudf.aggregation.RankMethod[method.upper()] + method_enum = plc.aggregation.RankMethod[method.upper()] if na_option not in {"keep", "top", "bottom"}: raise ValueError( "na_option must be one of 'keep', 'top', or 'bottom'" diff --git a/python/cudf/cudf/core/resample.py b/python/cudf/cudf/core/resample.py index e0aee28bfeb..d95d252559f 100644 --- a/python/cudf/cudf/core/resample.py +++ b/python/cudf/cudf/core/resample.py @@ -22,9 +22,11 @@ import numpy as np import pandas as pd +import pylibcudf as plc + import cudf -import cudf._lib.labeling -import cudf.core.index +from cudf._lib.column import Column +from cudf.core.buffer import acquire_spill_lock from cudf.core.groupby.groupby import ( DataFrameGroupBy, GroupBy, @@ -48,7 +50,7 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs): func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs ) if len(self.grouping.bin_labels) != len(result): - index = cudf.core.index.Index( + index = cudf.Index( self.grouping.bin_labels, name=self.grouping.names[0] ) return result._align_to_index( @@ -125,7 +127,7 @@ class SeriesResampler(_Resampler, SeriesGroupBy): class _ResampleGrouping(_Grouping): - bin_labels: cudf.core.index.Index + bin_labels: cudf.Index def __init__(self, obj, by=None, level=None): self._freq = getattr(by, "freq", None) @@ -170,7 +172,7 @@ def deserialize(cls, header, frames): out.names = names out._named_columns = _named_columns out._key_columns = key_columns - out.bin_labels = cudf.core.index.Index.deserialize( + out.bin_labels = cudf.Index.deserialize( header["__bin_labels"], frames[-header["__bin_labels_count"] :] ) out._freq = header["_freq"] @@ -268,13 +270,19 @@ def _handle_frequency_grouper(self, by): cast_bin_labels = bin_labels.astype(result_type) # bin the key column: - bin_numbers = cudf._lib.labeling.label_bins( - cast_key_column, - left_edges=cast_bin_labels[:-1]._column, - left_inclusive=(closed == "left"), - right_edges=cast_bin_labels[1:]._column, - right_inclusive=(closed == "right"), - ) + with acquire_spill_lock(): + plc_column = plc.labeling.label_bins( + cast_key_column.to_pylibcudf(mode="read"), + cast_bin_labels[:-1]._column.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if closed == "left" + else plc.labeling.Inclusive.NO, + cast_bin_labels[1:]._column.to_pylibcudf(mode="read"), + plc.labeling.Inclusive.YES + if closed == "right" + else plc.labeling.Inclusive.NO, + ) + bin_numbers = Column.from_pylibcudf(plc_column) if label == "right": cast_bin_labels = cast_bin_labels[1:] diff --git a/python/cudf/cudf/core/tools/numeric.py b/python/cudf/cudf/core/tools/numeric.py index 6cecf3fa170..9a22045ff78 100644 --- a/python/cudf/cudf/core/tools/numeric.py +++ b/python/cudf/cudf/core/tools/numeric.py @@ -11,6 +11,7 @@ from cudf import _lib as libcudf from cudf._lib import strings as libstrings from cudf.api.types import _is_non_decimal_numeric_dtype, is_string_dtype +from cudf.core._internals import unary from cudf.core.column import as_column from cudf.core.dtypes import CategoricalDtype from cudf.core.index import ensure_index @@ -171,7 +172,7 @@ def to_numeric(arg, errors="raise", downcast=None, dtype_backend=None): downcast_dtype = cudf.dtype(t) if downcast_dtype.itemsize <= col.dtype.itemsize: if col.can_cast_safely(downcast_dtype): - col = libcudf.unary.cast(col, downcast_dtype) + col = unary.cast(col, downcast_dtype) break if isinstance(arg, (cudf.Series, pd.Series)): diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index 668e7a77454..8d342f8e6c6 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -10,8 +10,8 @@ from pandas import testing as tm import cudf -from cudf._lib.unary import is_nan from cudf.api.types import is_numeric_dtype, is_string_dtype +from cudf.core._internals.unary import is_nan from cudf.core.missing import NA, NaT diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 96512dacb69..659d2ebd89a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3771,10 +3771,10 @@ def test_parquet_chunked_reader( chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups ): df = pd.DataFrame( - {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + {"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000} ) buffer = BytesIO() - df.to_parquet(buffer) + df.to_parquet(buffer, row_group_size=10000) actual = read_parquet_chunked( [buffer], chunk_read_limit=chunk_read_limit, @@ -3788,6 +3788,108 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [997, 2997, None]) +def test_parquet_chunked_reader_structs( + chunk_read_limit, + pass_read_limit, + num_rows, +): + data = [ + { + "a": "g", + "b": { + "b_a": 10, + "b_b": {"b_b_b": None, "b_b_a": 2}, + }, + "c": None, + }, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]}, + {"a": "j", "b": None, "c": [8, 10]}, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": None}, + None, + { + "a": None, + "b": {"b_a": None, "b_b": {"b_b_b": 1}}, + "c": [18, 19], + }, + {"a": None, "b": None, "c": None}, + ] * 1000 + + pa_struct = pa.Table.from_pydict({"struct": data}) + df = cudf.DataFrame.from_arrow(pa_struct) + buffer = BytesIO() + df.to_parquet(buffer) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("num_rows", [4997, 9997, None]) +@pytest.mark.parametrize( + "str_encoding", + [ + "PLAIN", + "DELTA_BYTE_ARRAY", + "DELTA_LENGTH_BYTE_ARRAY", + ], +) +def test_parquet_chunked_reader_string_decoders( + chunk_read_limit, + pass_read_limit, + num_rows, + str_encoding, +): + df = pd.DataFrame( + { + "i64": [1, 2, 3, None] * 10000, + "str": ["av", "qw", "asd", "xyz"] * 10000, + "list": list( + [["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000 + ), + } + ) + buffer = BytesIO() + # Write 4 Parquet row groups with string column encoded + df.to_parquet( + buffer, + row_group_size=10000, + use_dictionary=False, + column_encoding={"str": str_encoding}, + ) + + # Number of rows to read + nrows = num_rows if num_rows is not None else len(df) + + # Check with num_rows specified + actual = read_parquet_chunked( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + ) + expected = cudf.read_parquet( + buffer, + nrows=nrows, + ) + assert_eq(expected, actual) + + @pytest.mark.parametrize( "nrows,skip_rows", [ diff --git a/python/cudf_polars/cudf_polars/testing/plugin.py b/python/cudf_polars/cudf_polars/testing/plugin.py index 2a9104d8c82..7a759eea2e9 100644 --- a/python/cudf_polars/cudf_polars/testing/plugin.py +++ b/python/cudf_polars/cudf_polars/testing/plugin.py @@ -47,6 +47,9 @@ def pytest_configure(config: pytest.Config) -> None: EXPECTED_FAILURES: Mapping[str, str] = { "tests/unit/io/test_csv.py::test_compressed_csv": "Need to determine if file is compressed", "tests/unit/io/test_csv.py::test_read_csv_only_loads_selected_columns": "Memory usage won't be correct due to GPU", + "tests/unit/io/test_delta.py::test_scan_delta_version": "Need to expose hive partitioning", + "tests/unit/io/test_delta.py::test_scan_delta_relative": "Need to expose hive partitioning", + "tests/unit/io/test_delta.py::test_read_delta_version": "Need to expose hive partitioning", "tests/unit/io/test_lazy_count_star.py::test_count_compressed_csv_18057": "Need to determine if file is compressed", "tests/unit/io/test_lazy_csv.py::test_scan_csv_slice_offset_zero": "Integer overflow in sliced read", "tests/unit/io/test_lazy_parquet.py::test_dsl2ir_cached_metadata[False]": "cudf-polars doesn't use metadata read by rust preprocessing", @@ -64,7 +67,6 @@ def pytest_configure(config: pytest.Config) -> None: "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception", "tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception", - "tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311", "tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394", "tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394", diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index e665d42ab1a..1ce4d7b6867 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -19,7 +19,7 @@ authors = [ license = { text = "Apache 2.0" } requires-python = ">=3.10" dependencies = [ - "polars>=1.11,<1.14", + "polars>=1.11,<1.15", "pylibcudf==24.12.*,>=0.0.0a0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. classifiers = [ diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 61925b21a97..9c58a24c065 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -112,22 +112,7 @@ def test_scan( n_rows=n_rows, ) engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) - if ( - is_chunked - and (columns is None or columns[0] != "a") - and ( - # When we mask with the slice, it happens to remove the - # bad row - (mask is None and slice is not None) - # When we both slice and read a subset of rows it also - # removes the bad row - or (slice is None and n_rows is not None) - ) - ): - # slice read produces wrong result for string column - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) + if slice is not None: q = q.slice(*slice) if mask is not None: @@ -377,13 +362,6 @@ def large_df(df, tmpdir_factory, chunked_slice): def test_scan_parquet_chunked( request, chunked_slice, large_df, chunk_read_limit, pass_read_limit ): - if chunked_slice in {"skip_partial", "partial"} and ( - chunk_read_limit == 0 and pass_read_limit != 0 - ): - request.applymarker( - pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") - ) - assert_gpu_result_equal( large_df, engine=pl.GPUEngine( diff --git a/python/libcudf/libcudf/load.py b/python/libcudf/libcudf/load.py index a91fbb7aecf..c3ff5534e87 100644 --- a/python/libcudf/libcudf/load.py +++ b/python/libcudf/libcudf/load.py @@ -76,9 +76,15 @@ def load_library(): # Prefer the libraries bundled in this package. If they aren't found # (which might be the case in builds where the library was prebuilt before # packaging the wheel), look for a system installation. - libcudf_lib = _load_wheel_installation(soname) - if libcudf_lib is None: - libcudf_lib = _load_system_installation(soname) + try: + libcudf_lib = _load_wheel_installation(soname) + if libcudf_lib is None: + libcudf_lib = _load_system_installation(soname) + except OSError: + # If none of the searches above succeed, just silently return None + # and rely on other mechanisms (like RPATHs on other DSOs) to + # help the loader find the library. + pass # The caller almost never needs to do anything with this library, but no # harm in offering the option since this object at least provides a handle