From d9279929554a40b0417dd4f11e74e8f149477f73 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Wed, 20 Nov 2024 17:25:42 -0500 Subject: [PATCH 1/5] Move strings repeat benchmarks to nvbench (#17304) Moves the `cpp/benchmarks/string/repeat_strings.cpp` implementation from google-bench to nvbench. This covers the overloads of the `cudf::strings::repeat_strings` API. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Nghia Truong (https://github.com/ttnghia) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/17304 --- cpp/benchmarks/CMakeLists.txt | 3 +- cpp/benchmarks/string/repeat_strings.cpp | 123 ++++++++--------------- 2 files changed, 43 insertions(+), 83 deletions(-) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 3e52c502113..d3de9b39977 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -360,7 +360,7 @@ ConfigureNVBench( # ################################################################################################## # * strings benchmark ------------------------------------------------------------------- -ConfigureBench(STRINGS_BENCH string/factory.cu string/repeat_strings.cpp) +ConfigureBench(STRINGS_BENCH string/factory.cu) ConfigureNVBench( STRINGS_NVBENCH @@ -384,6 +384,7 @@ ConfigureNVBench( string/lengths.cpp string/like.cpp string/make_strings_column.cu + string/repeat_strings.cpp string/replace.cpp string/replace_re.cpp string/reverse.cpp diff --git a/cpp/benchmarks/string/repeat_strings.cpp b/cpp/benchmarks/string/repeat_strings.cpp index f1d1516f248..29012e2cbf9 100644 --- a/cpp/benchmarks/string/repeat_strings.cpp +++ b/cpp/benchmarks/string/repeat_strings.cpp @@ -14,99 +14,58 @@ * limitations under the License. */ -#include "string_bench_args.hpp" - #include #include -#include #include #include #include -static constexpr cudf::size_type default_repeat_times = 16; -static constexpr cudf::size_type min_repeat_times = -16; -static constexpr cudf::size_type max_repeat_times = 16; +#include -static std::unique_ptr create_data_table(cudf::size_type n_cols, - cudf::size_type n_rows, - cudf::size_type max_str_length) +static void bench_repeat(nvbench::state& state) { - CUDF_EXPECTS(n_cols == 1 || n_cols == 2, "Invalid number of columns."); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const min_width = static_cast(state.get_int64("min_width")); + auto const max_width = static_cast(state.get_int64("max_width")); + auto const min_repeat = static_cast(state.get_int64("min_repeat")); + auto const max_repeat = static_cast(state.get_int64("max_repeat")); + auto const api = state.get_string("api"); - std::vector dtype_ids{cudf::type_id::STRING}; auto builder = data_profile_builder().distribution( - cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length); - - if (n_cols == 2) { - dtype_ids.push_back(cudf::type_id::INT32); - builder.distribution( - cudf::type_id::INT32, distribution_id::NORMAL, min_repeat_times, max_repeat_times); + cudf::type_id::STRING, distribution_id::NORMAL, min_width, max_width); + builder.distribution(cudf::type_id::INT32, distribution_id::NORMAL, min_repeat, max_repeat); + + auto const table = create_random_table( + {cudf::type_id::STRING, cudf::type_id::INT32}, row_count{num_rows}, data_profile{builder}); + auto const input = cudf::strings_column_view(table->view().column(0)); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + auto chars_size = input.chars_size(stream); + state.add_global_memory_reads(chars_size); + + if (api == "scalar") { + state.add_global_memory_writes(chars_size * max_repeat); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::repeat_strings(input, max_repeat); }); + } else if (api == "column") { + auto repeats = table->view().column(1); + { + auto result = cudf::strings::repeat_strings(input, repeats); + auto output = cudf::strings_column_view(result->view()); + state.add_global_memory_writes(output.chars_size(stream)); + } + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { cudf::strings::repeat_strings(input, repeats); }); } - - return create_random_table(dtype_ids, row_count{n_rows}, data_profile{builder}); } -static void BM_repeat_strings_scalar_times(benchmark::State& state) -{ - auto const n_rows = static_cast(state.range(0)); - auto const max_str_length = static_cast(state.range(1)); - auto const table = create_data_table(1, n_rows, max_str_length); - auto const strings_col = cudf::strings_column_view(table->view().column(0)); - - for ([[maybe_unused]] auto _ : state) { - [[maybe_unused]] cuda_event_timer raii(state, true, cudf::get_default_stream()); - cudf::strings::repeat_strings(strings_col, default_repeat_times); - } - - state.SetBytesProcessed(state.iterations() * strings_col.chars_size(cudf::get_default_stream())); -} - -static void BM_repeat_strings_column_times(benchmark::State& state) -{ - auto const n_rows = static_cast(state.range(0)); - auto const max_str_length = static_cast(state.range(1)); - auto const table = create_data_table(2, n_rows, max_str_length); - auto const strings_col = cudf::strings_column_view(table->view().column(0)); - auto const repeat_times_col = table->view().column(1); - - for ([[maybe_unused]] auto _ : state) { - [[maybe_unused]] cuda_event_timer raii(state, true, cudf::get_default_stream()); - cudf::strings::repeat_strings(strings_col, repeat_times_col); - } - - state.SetBytesProcessed(state.iterations() * (strings_col.chars_size(cudf::get_default_stream()) + - repeat_times_col.size() * sizeof(int32_t))); -} - -static void generate_bench_args(benchmark::internal::Benchmark* b) -{ - int const min_rows = 1 << 8; - int const max_rows = 1 << 18; - int const row_mult = 4; - int const min_strlen = 1 << 4; - int const max_strlen = 1 << 8; - int const len_mult = 4; - generate_string_bench_args(b, min_rows, max_rows, row_mult, min_strlen, max_strlen, len_mult); -} - -class RepeatStrings : public cudf::benchmark {}; - -#define REPEAT_STRINGS_SCALAR_TIMES_BENCHMARK_DEFINE(name) \ - BENCHMARK_DEFINE_F(RepeatStrings, name) \ - (::benchmark::State & st) { BM_repeat_strings_scalar_times(st); } \ - BENCHMARK_REGISTER_F(RepeatStrings, name) \ - ->Apply(generate_bench_args) \ - ->UseManualTime() \ - ->Unit(benchmark::kMillisecond); - -#define REPEAT_STRINGS_COLUMN_TIMES_BENCHMARK_DEFINE(name) \ - BENCHMARK_DEFINE_F(RepeatStrings, name) \ - (::benchmark::State & st) { BM_repeat_strings_column_times(st); } \ - BENCHMARK_REGISTER_F(RepeatStrings, name) \ - ->Apply(generate_bench_args) \ - ->UseManualTime() \ - ->Unit(benchmark::kMillisecond); - -REPEAT_STRINGS_SCALAR_TIMES_BENCHMARK_DEFINE(scalar_times) -REPEAT_STRINGS_COLUMN_TIMES_BENCHMARK_DEFINE(column_times) +NVBENCH_BENCH(bench_repeat) + .set_name("repeat") + .add_int64_axis("min_width", {0}) + .add_int64_axis("max_width", {32, 64, 128, 256}) + .add_int64_axis("min_repeat", {0}) + .add_int64_axis("max_repeat", {16}) + .add_int64_axis("num_rows", {32768, 262144, 2097152}) + .add_string_axis("api", {"scalar", "column"}); From 68c4285717bd1150c234e5a6e7f8bad7fa5550e2 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 21 Nov 2024 01:10:50 +0100 Subject: [PATCH 2/5] Add `pynvml` as a dependency for `dask-cudf` (#17386) https://github.com/rapidsai/cudf/pull/17250 started using `pynvml` but did not add the proper dependency, this change fixes the missing dependency. Authors: - Peter Andreas Entschev (https://github.com/pentschev) - https://github.com/jakirkham Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - https://github.com/jakirkham URL: https://github.com/rapidsai/cudf/pull/17386 --- conda/environments/all_cuda-118_arch-x86_64.yaml | 1 + conda/environments/all_cuda-125_arch-x86_64.yaml | 1 + conda/recipes/dask-cudf/meta.yaml | 1 + dependencies.yaml | 1 + python/dask_cudf/dask_cudf/io/parquet.py | 2 +- python/dask_cudf/pyproject.toml | 1 + 6 files changed, 6 insertions(+), 1 deletion(-) diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index d21497c4def..1ec002d3ec6 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -71,6 +71,7 @@ dependencies: - ptxcompiler - pyarrow>=14.0.0,<19.0.0a0 - pydata-sphinx-theme!=0.14.2 +- pynvml>=11.4.1,<12.0.0a0 - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 400c1195e00..b6d1cf75721 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -69,6 +69,7 @@ dependencies: - pyarrow>=14.0.0,<19.0.0a0 - pydata-sphinx-theme!=0.14.2 - pynvjitlink>=0.0.0a0 +- pynvml>=11.4.1,<12.0.0a0 - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov diff --git a/conda/recipes/dask-cudf/meta.yaml b/conda/recipes/dask-cudf/meta.yaml index 1e6c0a35a09..74ecded8ead 100644 --- a/conda/recipes/dask-cudf/meta.yaml +++ b/conda/recipes/dask-cudf/meta.yaml @@ -43,6 +43,7 @@ requirements: run: - python - cudf ={{ version }} + - pynvml >=11.4.1,<12.0.0a0 - rapids-dask-dependency ={{ minor_version }} - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} diff --git a/dependencies.yaml b/dependencies.yaml index 682aaa612b4..6c38d1c290a 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -758,6 +758,7 @@ dependencies: common: - output_types: [conda, requirements, pyproject] packages: + - pynvml>=11.4.1,<12.0.0a0 - rapids-dask-dependency==24.12.*,>=0.0.0a0 run_custreamz: common: diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index bf8fae552c2..bbedd046760 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -55,7 +55,7 @@ def _get_device_size(): handle = pynvml.nvmlDeviceGetHandleByIndex(int(index)) return pynvml.nvmlDeviceGetMemoryInfo(handle).total - except (ImportError, ValueError): + except ValueError: # Fall back to a conservative 8GiB default return 8 * 1024**3 diff --git a/python/dask_cudf/pyproject.toml b/python/dask_cudf/pyproject.toml index 07d9143db36..5dac70cc295 100644 --- a/python/dask_cudf/pyproject.toml +++ b/python/dask_cudf/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "fsspec>=0.6.0", "numpy>=1.23,<3.0a0", "pandas>=2.0,<2.2.4dev0", + "pynvml>=11.4.1,<12.0.0a0", "rapids-dask-dependency==24.12.*,>=0.0.0a0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. classifiers = [ From 0d9e577ccaab0d72f1b216fbe068afd7a0fd887e Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 20 Nov 2024 18:47:01 -0800 Subject: [PATCH 3/5] Ignore errors when testing glibc versions (#17389) This is likely the easiest fix for avoiding CI errors from this part of the code. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/17389 --- ci/run_cudf_polars_polars_tests.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ci/run_cudf_polars_polars_tests.sh b/ci/run_cudf_polars_polars_tests.sh index b1bfac2a1dd..c851f65d4f6 100755 --- a/ci/run_cudf_polars_polars_tests.sh +++ b/ci/run_cudf_polars_polars_tests.sh @@ -28,8 +28,11 @@ if [[ $(arch) == "aarch64" ]]; then DESELECTED_TESTS+=("tests/unit/operations/test_join.py::test_join_4_columns_with_validity") else # Ensure that we don't run dbgen when it uses newer symbols than supported by the glibc version in the CI image. + # Allow errors since any of these commands could produce empty results that would cause the script to fail. + set +e glibc_minor_version=$(ldd --version | head -1 | grep -o "[0-9]\.[0-9]\+" | tail -1 | cut -d '.' -f2) latest_glibc_symbol_found=$(nm py-polars/tests/benchmark/data/pdsh/dbgen/dbgen | grep GLIBC | grep -o "[0-9]\.[0-9]\+" | sort --version-sort | tail -1 | cut -d "." -f 2) + set -e if [[ ${glibc_minor_version} -lt ${latest_glibc_symbol_found} ]]; then DESELECTED_TESTS+=("tests/benchmark/test_pdsh.py::test_pdsh") fi From f54c1a5ad34133605d3b5b447d9717ce7eb6dba0 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Thu, 21 Nov 2024 09:43:47 -0500 Subject: [PATCH 4/5] Migrate CSV writer to pylibcudf (#17163) Apart of #15162 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - David Wendt (https://github.com/davidwendt) - Matthew Roeschke (https://github.com/mroeschke) - Vyas Ramasubramani (https://github.com/vyasr) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/17163 --- cpp/include/cudf/io/csv.hpp | 2 +- python/cudf/cudf/_lib/csv.pyx | 108 +++------ python/pylibcudf/pylibcudf/io/csv.pxd | 35 +++ python/pylibcudf/pylibcudf/io/csv.pyi | 22 ++ python/pylibcudf/pylibcudf/io/csv.pyx | 216 +++++++++++++++++- python/pylibcudf/pylibcudf/io/types.pyx | 50 ++-- .../pylibcudf/pylibcudf/tests/common/utils.py | 8 +- python/pylibcudf/pylibcudf/tests/conftest.py | 57 +++-- .../pylibcudf/pylibcudf/tests/io/test_csv.py | 85 +++++++ 9 files changed, 462 insertions(+), 121 deletions(-) create mode 100644 python/pylibcudf/pylibcudf/io/csv.pxd diff --git a/cpp/include/cudf/io/csv.hpp b/cpp/include/cudf/io/csv.hpp index dae056ef157..9b2de7c72ec 100644 --- a/cpp/include/cudf/io/csv.hpp +++ b/cpp/include/cudf/io/csv.hpp @@ -1362,7 +1362,7 @@ table_with_metadata read_csv( */ /** - *@brief Builder to build options for `writer_csv()`. + *@brief Builder to build options for `write_csv()`. */ class csv_writer_options_builder; diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index c09e06bfc59..59a970263e0 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -1,10 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. from libcpp cimport bool -from libcpp.memory cimport unique_ptr -from libcpp.string cimport string -from libcpp.utility cimport move -from libcpp.vector cimport vector cimport pylibcudf.libcudf.types as libcudf_types @@ -23,16 +19,7 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -from pylibcudf.libcudf.io.csv cimport ( - csv_writer_options, - write_csv as cpp_write_csv, -) -from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.types cimport sink_info -from pylibcudf.libcudf.table.table_view cimport table_view - -from cudf._lib.io.utils cimport make_sink_info -from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table +from cudf._lib.utils cimport data_from_pylibcudf_io import pylibcudf as plc @@ -318,59 +305,40 @@ def write_csv( -------- cudf.to_csv """ - cdef table_view input_table_view = table_view_from_table( - table, not index - ) - cdef bool include_header_c = header - cdef char delim_c = ord(sep) - cdef string line_term_c = lineterminator.encode() - cdef string na_c = na_rep.encode() - cdef int rows_per_chunk_c = rows_per_chunk - cdef vector[string] col_names - cdef string true_value_c = 'True'.encode() - cdef string false_value_c = 'False'.encode() - cdef unique_ptr[data_sink] data_sink_c - cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) - - if header is True: - all_names = columns_apply_na_rep(table._column_names, na_rep) - if index is True: - all_names = table._index.names + all_names - - if len(all_names) > 0: - col_names.reserve(len(all_names)) - if len(all_names) == 1: - if all_names[0] in (None, ''): - col_names.push_back('""'.encode()) - else: - col_names.push_back( - str(all_names[0]).encode() - ) - else: - for idx, col_name in enumerate(all_names): - if col_name is None: - col_names.push_back(''.encode()) - else: - col_names.push_back( - str(col_name).encode() - ) - - cdef csv_writer_options options = move( - csv_writer_options.builder(sink_info_c, input_table_view) - .names(col_names) - .na_rep(na_c) - .include_header(include_header_c) - .rows_per_chunk(rows_per_chunk_c) - .line_terminator(line_term_c) - .inter_column_delimiter(delim_c) - .true_value(true_value_c) - .false_value(false_value_c) - .build() - ) - + index_and_not_empty = index is True and table.index is not None + columns = [ + col.to_pylibcudf(mode="read") for col in table.index._columns + ] if index_and_not_empty else [] + columns.extend(col.to_pylibcudf(mode="read") for col in table._columns) + col_names = [] + if header: + all_names = list(table.index.names) if index_and_not_empty else [] + all_names.extend( + na_rep if name is None or pd.isnull(name) + else name for name in table._column_names + ) + col_names = [ + '""' if (name in (None, '') and len(all_names) == 1) + else (str(name) if name not in (None, '') else '') + for name in all_names + ] try: - with nogil: - cpp_write_csv(options) + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([path_or_buf]), plc.Table(columns) + ) + .names(col_names) + .na_rep(na_rep) + .include_header(header) + .rows_per_chunk(rows_per_chunk) + .line_terminator(str(lineterminator)) + .inter_column_delimiter(str(sep)) + .true_value("True") + .false_value("False") + .build() + ) + ) except OverflowError: raise OverflowError( f"Writing CSV file with chunksize={rows_per_chunk} failed. " @@ -419,11 +387,3 @@ cdef DataType _get_plc_data_type_from_dtype(object dtype) except *: dtype = cudf.dtype(dtype) return dtype_to_pylibcudf_type(dtype) - - -def columns_apply_na_rep(column_names, na_rep): - return tuple( - na_rep if pd.isnull(col_name) - else col_name - for col_name in column_names - ) diff --git a/python/pylibcudf/pylibcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/io/csv.pxd new file mode 100644 index 00000000000..f04edaa316a --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/csv.pxd @@ -0,0 +1,35 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.vector cimport vector +from libcpp.string cimport string +from libcpp cimport bool +from pylibcudf.libcudf.io.csv cimport ( + csv_writer_options, + csv_writer_options_builder, +) +from pylibcudf.libcudf.io.types cimport quote_style +from pylibcudf.io.types cimport SinkInfo +from pylibcudf.table cimport Table + +cdef class CsvWriterOptions: + cdef csv_writer_options c_obj + cdef Table table + cdef SinkInfo sink + + +cdef class CsvWriterOptionsBuilder: + cdef csv_writer_options_builder c_obj + cdef Table table + cdef SinkInfo sink + cpdef CsvWriterOptionsBuilder names(self, list names) + cpdef CsvWriterOptionsBuilder na_rep(self, str val) + cpdef CsvWriterOptionsBuilder include_header(self, bool val) + cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val) + cpdef CsvWriterOptionsBuilder line_terminator(self, str term) + cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim) + cpdef CsvWriterOptionsBuilder true_value(self, str val) + cpdef CsvWriterOptionsBuilder false_value(self, str val) + cpdef CsvWriterOptions build(self) + + +cpdef void write_csv(CsvWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index 356825a927d..583b66bc29c 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -5,9 +5,11 @@ from collections.abc import Mapping from pylibcudf.io.types import ( CompressionType, QuoteStyle, + SinkInfo, SourceInfo, TableWithMetadata, ) +from pylibcudf.table import Table from pylibcudf.types import DataType def read_csv( @@ -52,3 +54,23 @@ def read_csv( # detect_whitespace_around_quotes: bool = False, # timestamp_type: DataType = DataType(type_id.EMPTY), ) -> TableWithMetadata: ... +def write_csv(options: CsvWriterOptionsBuilder) -> None: ... + +class CsvWriterOptions: + def __init__(self): ... + @staticmethod + def builder(sink: SinkInfo, table: Table) -> CsvWriterOptionsBuilder: ... + +class CsvWriterOptionsBuilder: + def __init__(self): ... + def names(self, names: list) -> CsvWriterOptionsBuilder: ... + def na_rep(self, val: str) -> CsvWriterOptionsBuilder: ... + def include_header(self, val: bool) -> CsvWriterOptionsBuilder: ... + def rows_per_chunk(self, val: int) -> CsvWriterOptionsBuilder: ... + def line_terminator(self, term: str) -> CsvWriterOptionsBuilder: ... + def inter_column_delimiter( + self, delim: str + ) -> CsvWriterOptionsBuilder: ... + def true_value(self, val: str) -> CsvWriterOptionsBuilder: ... + def false_value(self, val: str) -> CsvWriterOptionsBuilder: ... + def build(self) -> CsvWriterOptions: ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 858e580ab34..8be391de2c2 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -2,14 +2,18 @@ from libcpp cimport bool from libcpp.map cimport map + from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport SourceInfo, SinkInfo, TableWithMetadata from pylibcudf.libcudf.io.csv cimport ( csv_reader_options, + csv_writer_options, read_csv as cpp_read_csv, + write_csv as cpp_write_csv, ) + from pylibcudf.libcudf.io.types cimport ( compression_type, quote_style, @@ -17,9 +21,14 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType +from pylibcudf.table cimport Table - -__all__ = ["read_csv"] +__all__ = [ + "read_csv", + "write_csv", + "CsvWriterOptions", + "CsvWriterOptionsBuilder", +] cdef tuple _process_parse_dates_hex(list cols): cdef vector[string] str_cols @@ -82,6 +91,8 @@ def read_csv( ): """Reads a CSV file into a :py:class:`~.types.TableWithMetadata`. + For details, see :cpp:func:`read_csv`. + Parameters ---------- source_info : SourceInfo @@ -263,3 +274,202 @@ def read_csv( c_result = move(cpp_read_csv(options)) return TableWithMetadata.from_libcudf(c_result) + + +# TODO: Implement the remaining methods +cdef class CsvWriterOptions: + """The settings to use for ``write_csv`` + + For details, see :cpp:class:`cudf::io::csv_writer_options` + """ + @staticmethod + def builder(SinkInfo sink, Table table): + """Create a CsvWriterOptionsBuilder object + + For details, see :cpp:func:`cudf::io::csv_writer_options::builder` + + Parameters + ---------- + sink : SinkInfo + The sink used for writer output + table : Table + Table to be written to output + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + cdef CsvWriterOptionsBuilder csv_builder = CsvWriterOptionsBuilder.__new__( + CsvWriterOptionsBuilder + ) + csv_builder.c_obj = csv_writer_options.builder(sink.c_obj, table.view()) + csv_builder.table = table + csv_builder.sink = sink + return csv_builder + + +# TODO: Implement the remaining methods +cdef class CsvWriterOptionsBuilder: + """Builder to build options for ``write_csv`` + + For details, see :cpp:class:`cudf::io::csv_writer_options_builder` + """ + cpdef CsvWriterOptionsBuilder names(self, list names): + """Sets optional column names. + + Parameters + ---------- + names : list[str] + Column names + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.names([name.encode() for name in names]) + return self + + cpdef CsvWriterOptionsBuilder na_rep(self, str val): + """Sets string to used for null entries. + + Parameters + ---------- + val : str + String to represent null value + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.na_rep(val.encode()) + return self + + cpdef CsvWriterOptionsBuilder include_header(self, bool val): + """Enables/Disables headers being written to csv. + + Parameters + ---------- + val : bool + Boolean value to enable/disable + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.include_header(val) + return self + + cpdef CsvWriterOptionsBuilder rows_per_chunk(self, int val): + """Sets maximum number of rows to process for each file write. + + Parameters + ---------- + val : int + Number of rows per chunk + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.rows_per_chunk(val) + return self + + cpdef CsvWriterOptionsBuilder line_terminator(self, str term): + """Sets character used for separating lines. + + Parameters + ---------- + term : str + Character to represent line termination + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.line_terminator(term.encode()) + return self + + cpdef CsvWriterOptionsBuilder inter_column_delimiter(self, str delim): + """Sets character used for separating column values. + + Parameters + ---------- + delim : str + Character to delimit column values + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.inter_column_delimiter(ord(delim)) + return self + + cpdef CsvWriterOptionsBuilder true_value(self, str val): + """Sets string used for values != 0 + + Parameters + ---------- + val : str + String to represent values != 0 + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.true_value(val.encode()) + return self + + cpdef CsvWriterOptionsBuilder false_value(self, str val): + """Sets string used for values == 0 + + Parameters + ---------- + val : str + String to represent values == 0 + + Returns + ------- + CsvWriterOptionsBuilder + Builder to build CsvWriterOptions + """ + self.c_obj.false_value(val.encode()) + return self + + cpdef CsvWriterOptions build(self): + """Create a CsvWriterOptions object""" + cdef CsvWriterOptions csv_options = CsvWriterOptions.__new__( + CsvWriterOptions + ) + csv_options.c_obj = move(self.c_obj.build()) + csv_options.table = self.table + csv_options.sink = self.sink + return csv_options + + +cpdef void write_csv( + CsvWriterOptions options +): + """ + Write to CSV format. + + The table to write, output paths, and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`write_csv`. + + Parameters + ---------- + options: CsvWriterOptions + Settings for controlling writing behavior + """ + + with nogil: + cpp_write_csv(move(options.c_obj)) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 7a3f16c4c50..51d5bda75c7 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -261,18 +261,24 @@ cdef cppclass iobase_data_sink(data_sink): cdef class SinkInfo: - """A class containing details on a source to read from. + """ + A class containing details about destinations (sinks) to write data to. - For details, see :cpp:class:`cudf::io::sink_info`. + For more details, see :cpp:class:`cudf::io::sink_info`. Parameters ---------- - sinks : list of str, PathLike, BytesIO, StringIO + sinks : list of str, PathLike, or io.IOBase instances + A list of sinks to write data to. Each sink can be: - A homogeneous list of sinks (this can be a string filename, - bytes, or one of the Python I/O classes) to read from. + - A string representing a filename. + - A PathLike object. + - An instance of a Python I/O class that is a subclass of io.IOBase + (eg., io.BytesIO, io.StringIO). - Mixing different types of sinks will raise a `ValueError`. + The list must be homogeneous in type unless all sinks are instances + of subclasses of io.IOBase. Mixing different types of sinks + (that are not all io.IOBase instances) will raise a ValueError. """ def __init__(self, list sinks): @@ -280,32 +286,42 @@ cdef class SinkInfo: cdef vector[string] paths if not sinks: - raise ValueError("Need to pass at least one sink") + raise ValueError("At least one sink must be provided.") if isinstance(sinks[0], os.PathLike): sinks = [os.path.expanduser(s) for s in sinks] cdef object initial_sink_cls = type(sinks[0]) - if not all(isinstance(s, initial_sink_cls) for s in sinks): - raise ValueError("All sinks must be of the same type!") + if not all( + isinstance(s, initial_sink_cls) or ( + isinstance(sinks[0], io.IOBase) and isinstance(s, io.IOBase) + ) for s in sinks + ): + raise ValueError( + "All sinks must be of the same type unless they are all instances " + "of subclasses of io.IOBase." + ) - if initial_sink_cls in {io.StringIO, io.BytesIO, io.TextIOBase}: + if isinstance(sinks[0], io.IOBase): data_sinks.reserve(len(sinks)) - if isinstance(sinks[0], (io.StringIO, io.BytesIO)): - for s in sinks: + for s in sinks: + if isinstance(s, (io.StringIO, io.BytesIO)): self.sink_storage.push_back( unique_ptr[data_sink](new iobase_data_sink(s)) ) - elif isinstance(sinks[0], io.TextIOBase): - for s in sinks: - if codecs.lookup(s).name not in ('utf-8', 'ascii'): + elif isinstance(s, io.TextIOBase): + if codecs.lookup(s.encoding).name not in ('utf-8', 'ascii'): raise NotImplementedError(f"Unsupported encoding {s.encoding}") self.sink_storage.push_back( unique_ptr[data_sink](new iobase_data_sink(s.buffer)) ) - data_sinks.push_back(self.sink_storage.back().get()) - elif initial_sink_cls is str: + else: + self.sink_storage.push_back( + unique_ptr[data_sink](new iobase_data_sink(s)) + ) + data_sinks.push_back(self.sink_storage.back().get()) + elif isinstance(sinks[0], str): paths.reserve(len(sinks)) for s in sinks: paths.push_back( s.encode()) diff --git a/python/pylibcudf/pylibcudf/tests/common/utils.py b/python/pylibcudf/pylibcudf/tests/common/utils.py index d95849ef371..58c94713d09 100644 --- a/python/pylibcudf/pylibcudf/tests/common/utils.py +++ b/python/pylibcudf/pylibcudf/tests/common/utils.py @@ -385,12 +385,10 @@ def make_source(path_or_buf, pa_table, format, **kwargs): NESTED_STRUCT_TESTING_TYPE, ] +NON_NESTED_PA_TYPES = NUMERIC_PA_TYPES + STRING_PA_TYPES + BOOL_PA_TYPES + DEFAULT_PA_TYPES = ( - NUMERIC_PA_TYPES - + STRING_PA_TYPES - + BOOL_PA_TYPES - + LIST_PA_TYPES - + DEFAULT_PA_STRUCT_TESTING_TYPES + NON_NESTED_PA_TYPES + LIST_PA_TYPES + DEFAULT_PA_STRUCT_TESTING_TYPES ) # Map pylibcudf compression types to pandas ones diff --git a/python/pylibcudf/pylibcudf/tests/conftest.py b/python/pylibcudf/pylibcudf/tests/conftest.py index 5265e411c7f..36ab6798d8a 100644 --- a/python/pylibcudf/pylibcudf/tests/conftest.py +++ b/python/pylibcudf/pylibcudf/tests/conftest.py @@ -15,7 +15,12 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) -from utils import ALL_PA_TYPES, DEFAULT_PA_TYPES, NUMERIC_PA_TYPES +from utils import ( + ALL_PA_TYPES, + DEFAULT_PA_TYPES, + NON_NESTED_PA_TYPES, + NUMERIC_PA_TYPES, +) def _type_to_str(typ): @@ -79,29 +84,13 @@ def _get_vals_of_type(pa_type, length, seed): ) -# TODO: Consider adding another fixture/adapting this -# fixture to consider nullability -@pytest.fixture(scope="session", params=[0, 100]) -def table_data(request): - """ - Returns (TableWithMetadata, pa_table). - - This is the default fixture you should be using for testing - pylibcudf I/O writers. - - Contains one of each category (e.g. int, bool, list, struct) - of dtypes. - """ - nrows = request.param - +# TODO: Consider adapting this helper function +# to consider nullability +def _generate_table_data(types, nrows, seed=42): table_dict = {} - # Colnames in the format expected by - # plc.io.TableWithMetadata colnames = [] - seed = 42 - - for typ in ALL_PA_TYPES: + for typ in types: child_colnames = [] def _generate_nested_data(typ): @@ -151,6 +140,32 @@ def _generate_nested_data(typ): ), pa_table +@pytest.fixture(scope="session", params=[0, 100]) +def table_data(request): + """ + Returns (TableWithMetadata, pa_table). + + This is the default fixture you should be using for testing + pylibcudf I/O writers. + + Contains one of each category (e.g. int, bool, list, struct) + of dtypes. + """ + nrows = request.param + return _generate_table_data(ALL_PA_TYPES, nrows) + + +@pytest.fixture(scope="session", params=[0, 100]) +def table_data_with_non_nested_pa_types(request): + """ + Returns (TableWithMetadata, pa_table). + + This fixture is for testing with non-nested PyArrow types. + """ + nrows = request.param + return _generate_table_data(NON_NESTED_PA_TYPES, nrows) + + @pytest.fixture(params=[(0, 0), ("half", 0), (-1, "half")]) def nrows_skiprows(table_data, request): """ diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index 22c83acc47c..90d2d0896a5 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -10,6 +10,7 @@ _convert_types, assert_table_and_meta_eq, make_source, + sink_to_str, write_source_str, ) @@ -282,3 +283,87 @@ def test_read_csv_header(csv_table_data, source_or_sink, header): # list true_values = None, # list false_values = None, # bool dayfirst = False, + + +@pytest.mark.parametrize("sep", [",", "*"]) +@pytest.mark.parametrize("lineterminator", ["\n", "\n\n"]) +@pytest.mark.parametrize("header", [True, False]) +@pytest.mark.parametrize("rows_per_chunk", [8, 100]) +def test_write_csv( + table_data_with_non_nested_pa_types, + source_or_sink, + sep, + lineterminator, + header, + rows_per_chunk, +): + plc_tbl_w_meta, pa_table = table_data_with_non_nested_pa_types + sink = source_or_sink + + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([sink]), plc_tbl_w_meta.tbl + ) + .names(plc_tbl_w_meta.column_names()) + .na_rep("") + .include_header(header) + .rows_per_chunk(rows_per_chunk) + .line_terminator(lineterminator) + .inter_column_delimiter(sep) + .true_value("True") + .false_value("False") + .build() + ) + ) + + # Convert everything to string to make comparisons easier + str_result = sink_to_str(sink) + + pd_result = pa_table.to_pandas().to_csv( + sep=sep, + lineterminator=lineterminator, + header=header, + index=False, + ) + + assert str_result == pd_result + + +@pytest.mark.parametrize("na_rep", ["", "NA"]) +def test_write_csv_na_rep(na_rep): + names = ["a", "b"] + pa_tbl = pa.Table.from_arrays( + [pa.array([1.0, 2.0, None]), pa.array([True, None, False])], + names=names, + ) + plc_tbl = plc.interop.from_arrow(pa_tbl) + plc_tbl_w_meta = plc.io.types.TableWithMetadata( + plc_tbl, column_names=[(name, []) for name in names] + ) + + sink = io.StringIO() + + plc.io.csv.write_csv( + ( + plc.io.csv.CsvWriterOptions.builder( + plc.io.SinkInfo([sink]), plc_tbl_w_meta.tbl + ) + .names(plc_tbl_w_meta.column_names()) + .na_rep(na_rep) + .include_header(True) + .rows_per_chunk(8) + .line_terminator("\n") + .inter_column_delimiter(",") + .true_value("True") + .false_value("False") + .build() + ) + ) + + # Convert everything to string to make comparisons easier + str_result = sink_to_str(sink) + + pd_result = pa_tbl.to_pandas().to_csv(na_rep=na_rep, index=False) + + assert str_result == pd_result From 78db66b84108cf99850e5389e2d8534e801dde69 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 21 Nov 2024 10:17:54 -0500 Subject: [PATCH 5/5] Expose stream-ordering to interop APIs (#17397) Adds stream parameter to ``` cudf::from_dlpack cudf::to_dlpack ``` Added stream gtests to verify correct stream forwarding. Reference: https://github.com/rapidsai/cudf/issues/13744 Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - David Wendt (https://github.com/davidwendt) - Tianyu Liu (https://github.com/kingcrimsontianyu) URL: https://github.com/rapidsai/cudf/pull/17397 --- cpp/include/cudf/interop.hpp | 4 +++ cpp/src/interop/dlpack.cpp | 9 ++++-- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/interop_test.cpp | 46 ++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 cpp/tests/streams/interop_test.cpp diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index f789d950e51..810f0377597 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -57,12 +57,14 @@ namespace CUDF_EXPORT cudf { * @throw cudf::logic_error if the any of the DLTensor fields are unsupported * * @param managed_tensor a 1D or 2D column-major (Fortran order) tensor + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table's device memory * * @return Table with a copy of the tensor data */ std::unique_ptr from_dlpack( DLManagedTensor const* managed_tensor, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @@ -79,12 +81,14 @@ std::unique_ptr
from_dlpack( * or if any of columns have non-zero null count * * @param input Table to convert to DLPack + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned DLPack tensor's device memory * * @return 1D or 2D DLPack tensor with a copy of the table data, or nullptr */ DLManagedTensor* to_dlpack( table_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); /** @} */ // end of group diff --git a/cpp/src/interop/dlpack.cpp b/cpp/src/interop/dlpack.cpp index 4395b741e53..b5cc4cbba0d 100644 --- a/cpp/src/interop/dlpack.cpp +++ b/cpp/src/interop/dlpack.cpp @@ -297,16 +297,19 @@ DLManagedTensor* to_dlpack(table_view const& input, } // namespace detail std::unique_ptr
from_dlpack(DLManagedTensor const* managed_tensor, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::from_dlpack(managed_tensor, cudf::get_default_stream(), mr); + return detail::from_dlpack(managed_tensor, stream, mr); } -DLManagedTensor* to_dlpack(table_view const& input, rmm::device_async_resource_ref mr) +DLManagedTensor* to_dlpack(table_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::to_dlpack(input, cudf::get_default_stream(), mr); + return detail::to_dlpack(input, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 666a7d4ba4b..91c00d6af34 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -701,6 +701,7 @@ ConfigureTest(STREAM_DICTIONARY_TEST streams/dictionary_test.cpp STREAM_MODE tes ConfigureTest(STREAM_FILLING_TEST streams/filling_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_GROUPBY_TEST streams/groupby_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_HASHING_TEST streams/hash_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_INTEROP streams/interop_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JOIN_TEST streams/join_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LABELING_BINS_TEST streams/labeling_bins_test.cpp STREAM_MODE testing) diff --git a/cpp/tests/streams/interop_test.cpp b/cpp/tests/streams/interop_test.cpp new file mode 100644 index 00000000000..7133baf6df1 --- /dev/null +++ b/cpp/tests/streams/interop_test.cpp @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include + +#include + +struct dlpack_deleter { + void operator()(DLManagedTensor* tensor) { tensor->deleter(tensor); } +}; + +struct DLPackTest : public cudf::test::BaseFixture {}; + +TEST_F(DLPackTest, ToDLPack) +{ + cudf::table_view empty(std::vector{}); + cudf::to_dlpack(empty, cudf::test::get_default_stream()); +} + +TEST_F(DLPackTest, FromDLPack) +{ + using unique_managed_tensor = std::unique_ptr; + cudf::test::fixed_width_column_wrapper col1({}); + cudf::test::fixed_width_column_wrapper col2({}); + cudf::table_view input({col1, col2}); + unique_managed_tensor tensor(cudf::to_dlpack(input, cudf::test::get_default_stream())); + auto result = cudf::from_dlpack(tensor.get(), cudf::test::get_default_stream()); +}