From b92d0085eb4e22ddb79ad0269014669ed53754cf Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 25 Sep 2024 08:35:39 -1000 Subject: [PATCH 1/7] Fix DataFrame.drop(columns=cudf.Series/Index, axis=1) (#16712) Before when `columns=` was a `cudf.Series/Index` we would call `return array.unique.to_pandas()`, but `.unique` is a method not a property so this would have raised an error. Also took the time to refactor the helper methods here and push down the `errors=` keyword to `Frame._drop_column` Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/16712 --- python/cudf/cudf/core/frame.py | 14 +++++++---- python/cudf/cudf/core/indexed_frame.py | 32 ++++++++---------------- python/cudf/cudf/tests/test_dataframe.py | 11 ++++++++ 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 98af006f6e5..37ad6b8fabb 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -767,11 +767,15 @@ def fillna( ) @_performance_tracking - def _drop_column(self, name): - """Drop a column by *name*""" - if name not in self._data: - raise KeyError(f"column '{name}' does not exist") - del self._data[name] + def _drop_column( + self, name: abc.Hashable, errors: Literal["ignore", "raise"] = "raise" + ) -> None: + """Drop a column by *name* inplace.""" + try: + del self._data[name] + except KeyError as err: + if errors != "ignore": + raise KeyError(f"column '{name}' does not exist") from err @_performance_tracking def _quantile_table( diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 810d4ad74e7..5952815deef 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -3,7 +3,6 @@ from __future__ import annotations -import numbers import operator import textwrap import warnings @@ -150,24 +149,14 @@ ) -def _get_host_unique(array): +def _get_unique_drop_labels(array): + """Return labels to be dropped for IndexFrame.drop.""" if isinstance(array, (cudf.Series, cudf.Index, ColumnBase)): - return array.unique.to_pandas() - elif isinstance(array, (str, numbers.Number)): - return [array] + yield from np.unique(as_column(array).values_host) + elif is_scalar(array): + yield array else: - return set(array) - - -def _drop_columns(f: Frame, columns: abc.Iterable, errors: str): - for c in columns: - try: - f._drop_column(c) - except KeyError as e: - if errors == "ignore": - pass - else: - raise e + yield from set(array) def _indices_from_labels(obj, labels): @@ -5262,15 +5251,14 @@ def drop( out = self.copy() if axis in (1, "columns"): - target = _get_host_unique(target) - - _drop_columns(out, target, errors) + for label in _get_unique_drop_labels(target): + out._drop_column(label, errors=errors) elif axis in (0, "index"): dropped = _drop_rows_by_labels(out, target, level, errors) if columns is not None: - columns = _get_host_unique(columns) - _drop_columns(dropped, columns, errors) + for label in _get_unique_drop_labels(columns): + dropped._drop_column(label, errors=errors) out._mimic_inplace(dropped, inplace=True) diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index f4d1578bda7..6f88d942746 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -515,6 +515,17 @@ def test_dataframe_drop_columns(pdf, columns, inplace): assert_eq(expected, actual) +@pytest.mark.parametrize("obj", ["Index", "Series"]) +def test_drop_cudf_obj_columns(obj): + pdf = pd.DataFrame({"A": [1], "B": [1]}) + gdf = cudf.from_pandas(pdf) + + columns = ["B"] + expected = pdf.drop(labels=getattr(pd, obj)(columns), axis=1) + actual = gdf.drop(columns=getattr(cudf, obj)(columns), axis=1) + assert_eq(expected, actual) + + @pytest.mark.parametrize( "pdf", [ From d11ec7ac18092e71ad004b87b3e42da3606e0f0b Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 25 Sep 2024 14:46:34 -0400 Subject: [PATCH 2/7] [DOC] Update Pylibcudf doc strings (#16810) This PR is a first pass at #15937. We will close #15937 after #15162 is closed Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/16810 --- python/pylibcudf/pylibcudf/binaryop.pyx | 2 +- .../pylibcudf/pylibcudf/column_factories.pyx | 18 +++++++++++++++++ python/pylibcudf/pylibcudf/groupby.pyx | 2 +- python/pylibcudf/pylibcudf/io/avro.pyx | 2 ++ python/pylibcudf/pylibcudf/io/parquet.pyx | 4 ++++ python/pylibcudf/pylibcudf/labeling.pyx | 2 ++ python/pylibcudf/pylibcudf/lists.pyx | 12 +++++++++++ python/pylibcudf/pylibcudf/merge.pyx | 2 ++ python/pylibcudf/pylibcudf/quantiles.pyx | 4 ++++ python/pylibcudf/pylibcudf/reshape.pyx | 4 ++++ python/pylibcudf/pylibcudf/search.pyx | 6 ++++++ python/pylibcudf/pylibcudf/sorting.pyx | 20 +++++++++++++++++++ .../pylibcudf/pylibcudf/stream_compaction.pyx | 18 +++++++++++++++++ .../pylibcudf/pylibcudf/strings/findall.pyx | 2 +- python/pylibcudf/pylibcudf/transform.pyx | 2 ++ 15 files changed, 97 insertions(+), 3 deletions(-) diff --git a/python/pylibcudf/pylibcudf/binaryop.pyx b/python/pylibcudf/pylibcudf/binaryop.pyx index 5a67f4d6cdb..5f9d145139a 100644 --- a/python/pylibcudf/pylibcudf/binaryop.pyx +++ b/python/pylibcudf/pylibcudf/binaryop.pyx @@ -94,7 +94,7 @@ cpdef bool is_supported_operation( ): """Check if an operation is supported for the given data types. - For details, see :cpp:func::is_supported_operation`. + For details, see :cpp:func::`is_supported_operation`. Parameters ---------- diff --git a/python/pylibcudf/pylibcudf/column_factories.pyx b/python/pylibcudf/pylibcudf/column_factories.pyx index 4601cba515a..e9085e3ea02 100644 --- a/python/pylibcudf/pylibcudf/column_factories.pyx +++ b/python/pylibcudf/pylibcudf/column_factories.pyx @@ -18,6 +18,20 @@ from .types import MaskState, TypeId cpdef Column make_empty_column(MakeEmptyColumnOperand type_or_id): + """Creates an empty column of the specified type. + + For details, see :cpp:func::`make_empty_column`. + + Parameters + ---------- + type_or_id : Union[DataType, type_id, object] + The column data type. + + Returns + ------- + Column + An empty Column + """ cdef unique_ptr[column] result cdef type_id id @@ -60,7 +74,11 @@ cpdef Column make_numeric_column( size_type size, MaskArg mstate ): + """Creates an empty numeric column. + + For details, see :cpp:func::`make_numeric_column`. + """ cdef unique_ptr[column] result cdef mask_state state diff --git a/python/pylibcudf/pylibcudf/groupby.pyx b/python/pylibcudf/pylibcudf/groupby.pyx index ae5d33aaa46..afb95dba5b3 100644 --- a/python/pylibcudf/pylibcudf/groupby.pyx +++ b/python/pylibcudf/pylibcudf/groupby.pyx @@ -286,7 +286,7 @@ cdef class GroupBy: Returns ------- - Tuple[List[int], Table, Table]] + Tuple[List[int], Table, Table] A tuple of tables containing three items: - A list of integer offsets into the group keys/values - A table of group keys diff --git a/python/pylibcudf/pylibcudf/io/avro.pyx b/python/pylibcudf/pylibcudf/io/avro.pyx index 667c67f4c36..438b0ff1634 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyx +++ b/python/pylibcudf/pylibcudf/io/avro.pyx @@ -20,6 +20,8 @@ cpdef TableWithMetadata read_avro( """ Reads an Avro dataset into a :py:class:`~.types.TableWithMetadata`. + For details, see :cpp:func:`read_avro`. + Parameters ---------- source_info: SourceInfo diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index df1f1b14247..981ca7b8159 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -59,6 +59,8 @@ cdef class ChunkedParquetReader: """ Reads chunks of a Parquet file into a :py:class:`~.types.TableWithMetadata`. + For details, see :cpp:class:`chunked_parquet_reader`. + Parameters ---------- source_info : SourceInfo @@ -167,6 +169,8 @@ cpdef read_parquet( ): """Reads an Parquet file into a :py:class:`~.types.TableWithMetadata`. + For details, see :cpp:func:`read_parquet`. + Parameters ---------- source_info : SourceInfo diff --git a/python/pylibcudf/pylibcudf/labeling.pyx b/python/pylibcudf/pylibcudf/labeling.pyx index b5a7445df36..b3f6a92d85c 100644 --- a/python/pylibcudf/pylibcudf/labeling.pyx +++ b/python/pylibcudf/pylibcudf/labeling.pyx @@ -20,6 +20,8 @@ cpdef Column label_bins( ): """Labels elements based on membership in the specified bins. + For details see :cpp:func:`label_bins`. + Parameters ---------- input : Column diff --git a/python/pylibcudf/pylibcudf/lists.pyx b/python/pylibcudf/pylibcudf/lists.pyx index 947caddc485..6f82124d06e 100644 --- a/python/pylibcudf/pylibcudf/lists.pyx +++ b/python/pylibcudf/pylibcudf/lists.pyx @@ -52,6 +52,8 @@ cpdef Table explode_outer(Table input, size_type explode_column_idx): All other columns will be duplicated for each element in the list. + For details, see :cpp:func:`explode_outer`. + Parameters ---------- input : Table @@ -75,6 +77,8 @@ cpdef Table explode_outer(Table input, size_type explode_column_idx): cpdef Column concatenate_rows(Table input): """Concatenate multiple lists columns into a single lists column row-wise. + For details, see :cpp:func:`concatenate_list_elements`. + Parameters ---------- input : Table @@ -96,6 +100,8 @@ cpdef Column concatenate_rows(Table input): cpdef Column concatenate_list_elements(Column input, bool dropna): """Concatenate multiple lists on the same row into a single list. + For details, see :cpp:func:`concatenate_list_elements`. + Parameters ---------- input : Column @@ -168,6 +174,8 @@ cpdef Column contains_nulls(Column input): """Create a column of bool values indicating whether each row in the lists column contains a null value. + For details, see :cpp:func:`contains_nulls`. + Parameters ---------- input : Column @@ -290,6 +298,8 @@ cpdef Column segmented_gather(Column input, Column gather_map_list): cpdef Column extract_list_element(Column input, ColumnOrSizeType index): """Create a column of extracted list elements. + For details, see :cpp:func:`extract_list_element`. + Parameters ---------- input : Column @@ -318,6 +328,8 @@ cpdef Column count_elements(Column input): list element in the given lists column. For details, see :cpp:func:`count_elements`. + For details, see :cpp:func:`count_elements`. + Parameters ---------- input : Column diff --git a/python/pylibcudf/pylibcudf/merge.pyx b/python/pylibcudf/pylibcudf/merge.pyx index a7d43c9d158..6d707b67449 100644 --- a/python/pylibcudf/pylibcudf/merge.pyx +++ b/python/pylibcudf/pylibcudf/merge.pyx @@ -19,6 +19,8 @@ cpdef Table merge ( ): """Merge a set of sorted tables. + For details see :cpp:func:`merge`. + Parameters ---------- tables_to_merge : list diff --git a/python/pylibcudf/pylibcudf/quantiles.pyx b/python/pylibcudf/pylibcudf/quantiles.pyx index b847ade774d..3a771fbe7ef 100644 --- a/python/pylibcudf/pylibcudf/quantiles.pyx +++ b/python/pylibcudf/pylibcudf/quantiles.pyx @@ -30,6 +30,8 @@ cpdef Column quantile( Computes the specified quantiles by interpolating values between which they lie, using the interpolation strategy specified in interp. + For details see :cpp:func:`quantile`. + Parameters ---------- input: Column @@ -91,6 +93,8 @@ cpdef Table quantiles( specified quantiles. In the event a quantile lies in between rows, the specified interpolation strategy is used to pick between the rows. + For details see :cpp:func:`quantiles`. + Parameters ---------- input: Table diff --git a/python/pylibcudf/pylibcudf/reshape.pyx b/python/pylibcudf/pylibcudf/reshape.pyx index a99145be900..eb1499ebbea 100644 --- a/python/pylibcudf/pylibcudf/reshape.pyx +++ b/python/pylibcudf/pylibcudf/reshape.pyx @@ -23,6 +23,8 @@ cpdef Column interleave_columns(Table source_table): in = [[A1, A2, A3], [B1, B2, B3]] return = [A1, B1, A2, B2, A3, B3] + For details, see :cpp:func:`interleave_columns`. + Parameters ---------- source_table: Table @@ -44,6 +46,8 @@ cpdef Column interleave_columns(Table source_table): cpdef Table tile(Table source_table, size_type count): """Repeats the rows from input table count times to form a new table. + For details, see :cpp:func:`tile`. + Parameters ---------- source_table: Table diff --git a/python/pylibcudf/pylibcudf/search.pyx b/python/pylibcudf/pylibcudf/search.pyx index ff2468f3f9c..814bc6553d8 100644 --- a/python/pylibcudf/pylibcudf/search.pyx +++ b/python/pylibcudf/pylibcudf/search.pyx @@ -19,6 +19,8 @@ cpdef Column lower_bound( ): """Find smallest indices in haystack where needles may be inserted to retain order. + For details, see :cpp:func:`lower_bound`. + Parameters ---------- haystack : Table @@ -58,6 +60,8 @@ cpdef Column upper_bound( ): """Find largest indices in haystack where needles may be inserted to retain order. + For details, see :cpp:func:`upper_bound`. + Parameters ---------- haystack : Table @@ -92,6 +96,8 @@ cpdef Column upper_bound( cpdef Column contains(Column haystack, Column needles): """Check whether needles are present in haystack. + For details, see :cpp:func:`contains`. + Parameters ---------- haystack : Table diff --git a/python/pylibcudf/pylibcudf/sorting.pyx b/python/pylibcudf/pylibcudf/sorting.pyx index bd173eebacb..42289d54bca 100644 --- a/python/pylibcudf/pylibcudf/sorting.pyx +++ b/python/pylibcudf/pylibcudf/sorting.pyx @@ -16,6 +16,8 @@ from .table cimport Table cpdef Column sorted_order(Table source_table, list column_order, list null_precedence): """Computes the row indices required to sort the table. + For details, see :cpp:func:`sorted_order`. + Parameters ---------- source_table : Table @@ -52,6 +54,8 @@ cpdef Column stable_sorted_order( """Computes the row indices required to sort the table, preserving order of equal elements. + For details, see :cpp:func:`stable_sorted_order`. + Parameters ---------- source_table : Table @@ -90,6 +94,8 @@ cpdef Column rank( ): """Computes the rank of each element in the column. + For details, see :cpp:func:`rank`. + Parameters ---------- input_view : Column @@ -128,6 +134,8 @@ cpdef Column rank( cpdef bool is_sorted(Table tbl, list column_order, list null_precedence): """Checks if the table is sorted. + For details, see :cpp:func:`is_sorted`. + Parameters ---------- tbl : Table @@ -165,6 +173,8 @@ cpdef Table segmented_sort_by_key( ): """Sorts the table by key, within segments. + For details, see :cpp:func:`segmented_sort_by_key`. + Parameters ---------- values : Table @@ -209,6 +219,8 @@ cpdef Table stable_segmented_sort_by_key( """Sorts the table by key preserving order of equal elements, within segments. + For details, see :cpp:func:`stable_segmented_sort_by_key`. + Parameters ---------- values : Table @@ -251,6 +263,8 @@ cpdef Table sort_by_key( ): """Sorts the table by key. + For details, see :cpp:func:`sort_by_key`. + Parameters ---------- values : Table @@ -290,6 +304,8 @@ cpdef Table stable_sort_by_key( ): """Sorts the table by key preserving order of equal elements. + For details, see :cpp:func:`stable_sort_by_key`. + Parameters ---------- values : Table @@ -324,6 +340,8 @@ cpdef Table stable_sort_by_key( cpdef Table sort(Table source_table, list column_order, list null_precedence): """Sorts the table. + For details, see :cpp:func:`sort`. + Parameters ---------- source_table : Table @@ -355,6 +373,8 @@ cpdef Table sort(Table source_table, list column_order, list null_precedence): cpdef Table stable_sort(Table source_table, list column_order, list null_precedence): """Sorts the table preserving order of equal elements. + For details, see :cpp:func:`stable_sort`. + Parameters ---------- source_table : Table diff --git a/python/pylibcudf/pylibcudf/stream_compaction.pyx b/python/pylibcudf/pylibcudf/stream_compaction.pyx index b574bfa9fa2..d5475ea79d5 100644 --- a/python/pylibcudf/pylibcudf/stream_compaction.pyx +++ b/python/pylibcudf/pylibcudf/stream_compaction.pyx @@ -25,6 +25,8 @@ from .table cimport Table cpdef Table drop_nulls(Table source_table, list keys, size_type keep_threshold): """Filters out rows from the input table based on the presence of nulls. + For details, see :cpp:func:`drop_nulls`. + Parameters ---------- source_table : Table @@ -53,6 +55,8 @@ cpdef Table drop_nulls(Table source_table, list keys, size_type keep_threshold): cpdef Table drop_nans(Table source_table, list keys, size_type keep_threshold): """Filters out rows from the input table based on the presence of NaNs. + For details, see :cpp:func:`drop_nans`. + Parameters ---------- source_table : Table @@ -81,6 +85,8 @@ cpdef Table drop_nans(Table source_table, list keys, size_type keep_threshold): cpdef Table apply_boolean_mask(Table source_table, Column boolean_mask): """Filters out rows from the input table based on a boolean mask. + For details, see :cpp:func:`apply_boolean_mask`. + Parameters ---------- source_table : Table @@ -111,6 +117,8 @@ cpdef Table unique( ): """Filter duplicate consecutive rows from the input table. + For details, see :cpp:func:`unique`. + Parameters ---------- input : Table @@ -153,6 +161,8 @@ cpdef Table distinct( ): """Get the distinct rows from the input table. + For details, see :cpp:func:`distinct`. + Parameters ---------- input : Table @@ -191,6 +201,8 @@ cpdef Column distinct_indices( ): """Get the indices of the distinct rows from the input table. + For details, see :cpp:func:`distinct_indices`. + Parameters ---------- input : Table @@ -226,6 +238,8 @@ cpdef Table stable_distinct( ): """Get the distinct rows from the input table, preserving input order. + For details, see :cpp:func:`stable_distinct`. + Parameters ---------- input : Table @@ -263,6 +277,8 @@ cpdef size_type unique_count( ): """Returns the number of unique consecutive elements in the input column. + For details, see :cpp:func:`unique_count`. + Parameters ---------- source : Column @@ -294,6 +310,8 @@ cpdef size_type distinct_count( ): """Returns the number of distinct elements in the input column. + For details, see :cpp:func:`distinct_count`. + Parameters ---------- source : Column diff --git a/python/pylibcudf/pylibcudf/strings/findall.pyx b/python/pylibcudf/pylibcudf/strings/findall.pyx index 03ecb13a50e..3a6b87504b3 100644 --- a/python/pylibcudf/pylibcudf/strings/findall.pyx +++ b/python/pylibcudf/pylibcudf/strings/findall.pyx @@ -13,7 +13,7 @@ cpdef Column findall(Column input, RegexProgram pattern): Returns a lists column of strings for each matching occurrence using the regex_program pattern within each string. - For details, see For details, see :cpp:func:`cudf::strings::findall`. + For details, see :cpp:func:`cudf::strings::findall`. Parameters ---------- diff --git a/python/pylibcudf/pylibcudf/transform.pyx b/python/pylibcudf/pylibcudf/transform.pyx index bcd6185521a..de425a27c15 100644 --- a/python/pylibcudf/pylibcudf/transform.pyx +++ b/python/pylibcudf/pylibcudf/transform.pyx @@ -20,6 +20,8 @@ from .utils cimport int_to_bitmask_ptr cpdef tuple[gpumemoryview, int] nans_to_nulls(Column input): """Create a null mask preserving existing nulls and converting nans to null. + For details, see :cpp:func:`nans_to_nulls`. + Parameters ---------- input : Column From 8e784243c48e8420b7a75790fb42fc0ffbf6896a Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Wed, 25 Sep 2024 14:16:14 -0500 Subject: [PATCH 3/7] Optimization of tdigest merge aggregation. (#16780) Fixes https://github.com/rapidsai/cudf/issues/16625 This PR fixes a slow implementation of the centroid merging step during the tdigest merge aggregation. Previously it was doing a linear march over the individual tdigests per group and merging them one by one. This led to terrible performance for large numbers of groups. In principle though, all this really was doing was a segmented sort of centroid values. So that's what this PR changes it to. Speedup for 1,000,000 input tidests with 1,000,000 individual groups is ~1000x, ``` Old --------------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations --------------------------------------------------------------------------------------------------------------- TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time 7473 ms 7472 ms 8 TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time 7433 ms 7431 ms 8 ``` ``` New --------------------------------------------------------------------------------------------------------------- Benchmark Time CPU Iterations --------------------------------------------------------------------------------------------------------------- TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time 6.72 ms 6.79 ms 8 TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time 1.24 ms 1.32 ms 8 ``` Authors: - https://github.com/nvdbaranec - Muhammad Haseeb (https://github.com/mhaseeb123) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Nghia Truong (https://github.com/ttnghia) - Mike Wilson (https://github.com/hyperbolic2346) URL: https://github.com/rapidsai/cudf/pull/16780 --- cpp/benchmarks/CMakeLists.txt | 5 + cpp/benchmarks/quantiles/tdigest.cu | 123 +++++++++++ .../quantiles/tdigest/tdigest_aggregation.cu | 192 ++++++++++-------- 3 files changed, 232 insertions(+), 88 deletions(-) create mode 100644 cpp/benchmarks/quantiles/tdigest.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index abc6f74fccf..4113e38dcf4 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -230,6 +230,11 @@ ConfigureNVBench(STRUCT_CREATION_NVBENCH structs/create_structs.cpp) # -------------------------------------------------------------------------------- ConfigureBench(QUANTILES_BENCH quantiles/quantiles.cpp) +# ################################################################################################## +# * tdigest benchmark +# -------------------------------------------------------------------------------- +ConfigureNVBench(TDIGEST_NVBENCH quantiles/tdigest.cu) + # ################################################################################################## # * type_dispatcher benchmark --------------------------------------------------------------------- ConfigureBench(TYPE_DISPATCHER_BENCH type_dispatcher/type_dispatcher.cu) diff --git a/cpp/benchmarks/quantiles/tdigest.cu b/cpp/benchmarks/quantiles/tdigest.cu new file mode 100644 index 00000000000..9d37dbc9a26 --- /dev/null +++ b/cpp/benchmarks/quantiles/tdigest.cu @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include + +#include +#include +#include + +#include + +void bm_tdigest_merge(nvbench::state& state) +{ + auto const num_tdigests = static_cast(state.get_int64("num_tdigests")); + auto const tdigest_size = static_cast(state.get_int64("tdigest_size")); + auto const tdigests_per_group = + static_cast(state.get_int64("tdigests_per_group")); + auto const max_centroids = static_cast(state.get_int64("max_centroids")); + auto const num_groups = num_tdigests / tdigests_per_group; + auto const total_centroids = num_tdigests * tdigest_size; + + auto stream = cudf::get_default_stream(); + auto mr = rmm::mr::get_current_device_resource(); + + constexpr int base_value = 5; + + // construct inner means/weights + auto val_iter = cudf::detail::make_counting_transform_iterator( + 0, cuda::proclaim_return_type([tdigest_size](cudf::size_type i) { + return static_cast(base_value + (i % tdigest_size)); + })); + auto one_iter = thrust::make_constant_iterator(1); + cudf::test::fixed_width_column_wrapper means(val_iter, val_iter + total_centroids); + cudf::test::fixed_width_column_wrapper weights(one_iter, one_iter + total_centroids); + std::vector> inner_struct_children; + inner_struct_children.push_back(means.release()); + inner_struct_children.push_back(weights.release()); + cudf::test::structs_column_wrapper inner_struct(std::move(inner_struct_children)); + + // construct the tdigest lists themselves + auto offset_iter = cudf::detail::make_counting_transform_iterator( + 0, cuda::proclaim_return_type([tdigest_size](cudf::size_type i) { + return i * tdigest_size; + })); + cudf::test::fixed_width_column_wrapper offsets(offset_iter, offset_iter + num_tdigests + 1); + auto list_col = cudf::make_lists_column( + num_tdigests, offsets.release(), inner_struct.release(), 0, {}, stream, mr); + + // min and max columns + auto min_iter = thrust::make_constant_iterator(base_value); + auto max_iter = thrust::make_constant_iterator(base_value + (tdigest_size - 1)); + cudf::test::fixed_width_column_wrapper mins(min_iter, min_iter + num_tdigests); + cudf::test::fixed_width_column_wrapper maxes(max_iter, max_iter + num_tdigests); + + // assemble the whole thing + std::vector> tdigest_children; + tdigest_children.push_back(std::move(list_col)); + tdigest_children.push_back(mins.release()); + tdigest_children.push_back(maxes.release()); + cudf::test::structs_column_wrapper tdigest(std::move(tdigest_children)); + + rmm::device_uvector group_offsets(num_groups + 1, stream, mr); + rmm::device_uvector group_labels(num_tdigests, stream, mr); + auto group_offset_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [tdigests_per_group] __device__(cudf::size_type i) { return i * tdigests_per_group; })); + thrust::copy(rmm::exec_policy_nosync(stream, mr), + group_offset_iter, + group_offset_iter + num_groups + 1, + group_offsets.begin()); + auto group_label_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [tdigests_per_group] __device__(cudf::size_type i) { return i / tdigests_per_group; })); + thrust::copy(rmm::exec_policy_nosync(stream, mr), + group_label_iter, + group_label_iter + num_tdigests, + group_labels.begin()); + + state.add_element_count(total_centroids); + + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, + [&](nvbench::launch& launch, auto& timer) { + timer.start(); + auto result = cudf::tdigest::detail::group_merge_tdigest( + tdigest, group_offsets, group_labels, num_groups, max_centroids, stream, mr); + timer.stop(); + }); +} + +NVBENCH_BENCH(bm_tdigest_merge) + .set_name("TDigest many tiny groups") + .add_int64_axis("num_tdigests", {500'000}) + .add_int64_axis("tdigest_size", {1, 1000}) + .add_int64_axis("tdigests_per_group", {1}) + .add_int64_axis("max_centroids", {10000, 1000}); + +NVBENCH_BENCH(bm_tdigest_merge) + .set_name("TDigest many small groups") + .add_int64_axis("num_tdigests", {500'000}) + .add_int64_axis("tdigest_size", {1, 1000}) + .add_int64_axis("tdigests_per_group", {3}) + .add_int64_axis("max_centroids", {10000, 1000}); diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..e1c1d2e3002 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -1021,6 +1021,76 @@ struct group_key_func { } }; +// merges all the tdigests within each group. returns a table containing 2 columns: +// the sorted means and weights. +template +std::pair, rmm::device_uvector> generate_merged_centroids( + tdigest_column_view const& tdv, + GroupOffsetIter group_offsets, + size_type num_groups, + rmm::cuda_stream_view stream) +{ + auto temp_mr = cudf::get_current_device_resource_ref(); + + auto const total_merged_centroids = tdv.means().size(); + + // output is the merged centroids (means, weights) + rmm::device_uvector output_means(total_merged_centroids, stream, temp_mr); + rmm::device_uvector output_weights(total_merged_centroids, stream, temp_mr); + + // each group represents a collection of tdigest columns. each row is 1 tdigest. + // within each group, we want to sort all the centroids within all the tdigests + // in that group, using the means as the key. the "outer offsets" represent the indices of the + // tdigests, and the "inner offsets" represents the list of centroids for a particular tdigest. + // + // rows + // ---- centroid 0 --------- + // tdigest 0 centroid 1 + // ---- centroid 2 group 0 + // tdigest 1 centroid 3 + // ---- centroid 4 --------- + // tdigest 2 centroid 5 + // ---- centroid 6 group 1 + // tdigest 3 centroid 7 + // centroid 8 + // ---- centroid 9 -------- + auto inner_offsets = tdv.centroids().offsets(); + auto centroid_offsets = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [group_offsets, inner_offsets = tdv.centroids().offsets().begin()] __device__( + size_type i) { return inner_offsets[group_offsets[i]]; })); + + // perform the sort using the means as the key + size_t temp_size; + CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(nullptr, + temp_size, + tdv.means().begin(), + output_means.begin(), + tdv.weights().begin(), + output_weights.begin(), + total_merged_centroids, + num_groups, + centroid_offsets, + centroid_offsets + 1, + stream.value())); + + rmm::device_buffer temp_mem(temp_size, stream, temp_mr); + CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(temp_mem.data(), + temp_size, + tdv.means().begin(), + output_means.begin(), + tdv.weights().begin(), + output_weights.begin(), + total_merged_centroids, + num_groups, + centroid_offsets, + centroid_offsets + 1, + stream.value())); + + return {std::move(output_means), std::move(output_weights)}; +} + template std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, HGroupOffsetIter h_outer_offsets, @@ -1032,59 +1102,6 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an - // algorithm like a super-merge that takes two layers of keys: one which identifies the outer - // grouping of tdigests, and one which identifies the inner groupings of the tdigests within the - // outer groups. - // TODO: investigate replacing the iterative merge with a single stable_sort_by_key. - - // bring tdigest offsets back to the host - auto tdigest_offsets = tdv.centroids().offsets(); - std::vector h_inner_offsets(tdigest_offsets.size()); - cudaMemcpyAsync(h_inner_offsets.data(), - tdigest_offsets.begin(), - sizeof(size_type) * tdigest_offsets.size(), - cudaMemcpyDefault, - stream); - - stream.synchronize(); - - // extract all means and weights into a table - cudf::table_view tdigests_unsliced({tdv.means(), tdv.weights()}); - - // generate the merged (but not yet compressed) tdigests for each group. - std::vector> tdigests; - tdigests.reserve(num_groups); - std::transform(h_outer_offsets, - h_outer_offsets + num_groups, - std::next(h_outer_offsets), - std::back_inserter(tdigests), - [&](auto tdigest_start, auto tdigest_end) { - // the range of tdigests in this group - auto const num_tdigests = tdigest_end - tdigest_start; - - // slice each tdigest from the input - std::vector unmerged_tdigests; - unmerged_tdigests.reserve(num_tdigests); - auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start); - std::transform( - offset_iter, - offset_iter + num_tdigests, - std::next(offset_iter), - std::back_inserter(unmerged_tdigests), - [&](size_type start, size_type end) { - return cudf::detail::slice(tdigests_unsliced, {start, end}, stream); - }); - - // merge - return cudf::detail::merge(unmerged_tdigests, - {0}, - {order::ASCENDING}, - {}, - stream, - cudf::get_current_device_resource_ref()); - }); - // generate min and max values auto merged_min_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); @@ -1121,7 +1138,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto group_num_weights = cudf::detail::make_counting_transform_iterator( 0, group_num_weights_func{group_offsets, - tdigest_offsets.begin()}); + tdv.centroids().offsets().begin()}); thrust::replace_if(rmm::exec_policy(stream), merged_min_col->mutable_view().begin(), merged_min_col->mutable_view().end(), @@ -1135,29 +1152,33 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_is_empty{}, 0); - // concatenate all the merged tdigests back into one table. - std::vector tdigest_views; - tdigest_views.reserve(num_groups); - std::transform(tdigests.begin(), - tdigests.end(), - std::back_inserter(tdigest_views), - [](std::unique_ptr const& t) { return t->view(); }); - auto merged = - cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto temp_mr = cudf::get_current_device_resource_ref(); + + // merge the centroids + auto [merged_means, merged_weights] = + generate_merged_centroids(tdv, group_offsets, num_groups, stream); + size_t const num_centroids = tdv.means().size(); + CUDF_EXPECTS(merged_means.size() == num_centroids, + "Unexpected number of centroids in merged result"); // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); - auto cumulative_weights = cudf::make_numeric_column( - data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); - auto keys = cudf::detail::make_counting_transform_iterator( - 0, - group_key_func{ - group_labels, tdigest_offsets.begin(), tdigest_offsets.size()}); + rmm::device_uvector cumulative_weights(merged_weights.size(), stream, temp_mr); + + // generate group keys for all centroids in the entire column + rmm::device_uvector group_keys(num_centroids, stream, temp_mr); + auto iter = thrust::make_counting_iterator(0); + auto inner_offsets = tdv.centroids().offsets(); + thrust::transform(rmm::exec_policy(stream), + iter, + iter + num_centroids, + group_keys.begin(), + group_key_func{ + group_labels, inner_offsets.begin(), inner_offsets.size()}); thrust::inclusive_scan_by_key(rmm::exec_policy(stream), - keys, - keys + cumulative_weights->size(), - merged_weights.begin(), - cumulative_weights->mutable_view().begin()); + group_keys.begin(), + group_keys.begin() + num_centroids, + merged_weights.begin(), + cumulative_weights.begin()); auto const delta = max_centroids; @@ -1166,37 +1187,32 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, delta, num_groups, nearest_value_centroid_weights{ - cumulative_weights->view().begin(), - group_offsets, - tdigest_offsets.begin()}, - centroid_group_info{cumulative_weights->view().begin(), - group_offsets, - tdigest_offsets.begin()}, + cumulative_weights.begin(), group_offsets, inner_offsets.begin()}, + centroid_group_info{ + cumulative_weights.begin(), group_offsets, inner_offsets.begin()}, cumulative_centroid_weight{ - cumulative_weights->view().begin(), + cumulative_weights.begin(), group_labels, group_offsets, - {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + {inner_offsets.begin(), static_cast(inner_offsets.size())}}, false, stream, mr); // input centroid values auto centroids = cudf::detail::make_counting_transform_iterator( - 0, - make_weighted_centroid{merged->get_column(0).view().begin(), - merged_weights.begin()}); + 0, make_weighted_centroid{merged_means.begin(), merged_weights.begin()}); // compute the tdigest return compute_tdigests( delta, centroids, - centroids + merged->num_rows(), + centroids + merged_means.size(), cumulative_centroid_weight{ - cumulative_weights->view().begin(), + cumulative_weights.begin(), group_labels, group_offsets, - {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + {inner_offsets.begin(), static_cast(inner_offsets.size())}}, std::move(merged_min_col), std::move(merged_max_col), group_cluster_wl, From f7c5d32a833dcc6b9b35756b89a0eb19b8bc9a40 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 25 Sep 2024 14:37:37 -0500 Subject: [PATCH 4/7] Display deltas for `cudf.pandas` test summary (#16864) This PR displays delta's for CPU and GPU usage metrics that are extracted from `cudf.pandas` pytests. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/cudf/pull/16864 --- .github/workflows/pr.yaml | 18 +++++- .../pandas-tests/job-summary.py | 64 +++++++++++++++---- 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index b4c449ce5d8..766df59594b 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -50,6 +50,7 @@ jobs: test_java: ${{ steps.changed-files.outputs.java_any_changed == 'true' }} test_notebooks: ${{ steps.changed-files.outputs.notebooks_any_changed == 'true' }} test_python: ${{ steps.changed-files.outputs.python_any_changed == 'true' }} + test_cudf_pandas: ${{ steps.changed-files.outputs.cudf_pandas_any_changed == 'true' }} steps: - name: Get PR info id: get-pr-info @@ -82,6 +83,7 @@ jobs: - '!java/**' - '!notebooks/**' - '!python/**' + - '!ci/cudf_pandas_scripts/**' java: - '**' - '!CONTRIBUTING.md' @@ -90,11 +92,13 @@ jobs: - '!img/**' - '!notebooks/**' - '!python/**' + - '!ci/cudf_pandas_scripts/**' notebooks: - '**' - '!CONTRIBUTING.md' - '!README.md' - '!java/**' + - '!ci/cudf_pandas_scripts/**' python: - '**' - '!CONTRIBUTING.md' @@ -103,6 +107,16 @@ jobs: - '!img/**' - '!java/**' - '!notebooks/**' + - '!ci/cudf_pandas_scripts/**' + cudf_pandas: + - '**' + - 'ci/cudf_pandas_scripts/**' + - '!CONTRIBUTING.md' + - '!README.md' + - '!docs/**' + - '!img/**' + - '!java/**' + - '!notebooks/**' checks: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-24.10 @@ -289,7 +303,7 @@ jobs: needs: [wheel-build-cudf, changed-files] secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.10 - if: needs.changed-files.outputs.test_python == 'true' + if: needs.changed-files.outputs.test_python == 'true' || needs.changed-files.outputs.test_cudf_pandas == 'true' with: # This selects "ARCH=amd64 + the latest supported Python + CUDA". matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) @@ -300,7 +314,7 @@ jobs: needs: [wheel-build-cudf, changed-files] secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.10 - if: needs.changed-files.outputs.test_python == 'true' + if: needs.changed-files.outputs.test_python == 'true' || needs.changed-files.outputs.test_cudf_pandas == 'true' with: # This selects "ARCH=amd64 + the latest supported Python + CUDA". matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) diff --git a/ci/cudf_pandas_scripts/pandas-tests/job-summary.py b/ci/cudf_pandas_scripts/pandas-tests/job-summary.py index 7a12db927e5..485b2ac8a51 100644 --- a/ci/cudf_pandas_scripts/pandas-tests/job-summary.py +++ b/ci/cudf_pandas_scripts/pandas-tests/job-summary.py @@ -67,20 +67,33 @@ def emoji_failed(x): # convert pr_results to a pandas DataFrame and then a markdown table pr_df = pd.DataFrame.from_dict(pr_results, orient="index").sort_index() main_df = pd.DataFrame.from_dict(main_results, orient="index").sort_index() -diff_df = pr_df - main_df -total_usage = pr_df['_slow_function_call'] + pr_df['_fast_function_call'] -pr_df['CPU Usage'] = ((pr_df['_slow_function_call']/total_usage)*100.0).round(1) -pr_df['GPU Usage'] = ((pr_df['_fast_function_call']/total_usage)*100.0).round(1) +total_usage = main_df["_slow_function_call"] + main_df["_fast_function_call"] +main_df["CPU Usage"] = ((main_df["_slow_function_call"] / total_usage) * 100.0).round(1) +main_df["GPU Usage"] = ((main_df["_fast_function_call"] / total_usage) * 100.0).round(1) + +total_usage = pr_df["_slow_function_call"] + pr_df["_fast_function_call"] +pr_df["CPU Usage"] = ((pr_df["_slow_function_call"] / total_usage) * 100.0).round(1) +pr_df["GPU Usage"] = ((pr_df["_fast_function_call"] / total_usage) * 100.0).round(1) + +cpu_usage_mean = pr_df["CPU Usage"].mean().round(2) +gpu_usage_mean = pr_df["GPU Usage"].mean().round(2) + +gpu_usage_rate_change = abs(pr_df["GPU Usage"].mean() - main_df["GPU Usage"].mean()) +pr_df["CPU Usage"] = pr_df["CPU Usage"].fillna(0) +pr_df["GPU Usage"] = pr_df["GPU Usage"].fillna(0) +main_df["CPU Usage"] = main_df["CPU Usage"].fillna(0) +main_df["GPU Usage"] = main_df["GPU Usage"].fillna(0) -cpu_usage_mean = pr_df['CPU Usage'].mean().round(2) -gpu_usage_mean = pr_df['GPU Usage'].mean().round(2) +diff_df = pr_df - main_df +diff_df["CPU Usage"] = diff_df["CPU Usage"].round(1).fillna(0) +diff_df["GPU Usage"] = diff_df["GPU Usage"].round(1).fillna(0) -# Add '%' suffix to 'CPU Usage' and 'GPU Usage' columns -pr_df['CPU Usage'] = pr_df['CPU Usage'].fillna(0).astype(str) + '%' -pr_df['GPU Usage'] = pr_df['GPU Usage'].fillna(0).astype(str) + '%' +# Add '%' suffix to "CPU Usage" and "GPU Usage" columns +pr_df["CPU Usage"] = pr_df["CPU Usage"].astype(str) + "%" +pr_df["GPU Usage"] = pr_df["GPU Usage"].astype(str) + "%" -pr_df = pr_df[["total", "passed", "failed", "skipped", 'CPU Usage', 'GPU Usage']] -diff_df = diff_df[["total", "passed", "failed", "skipped"]] +pr_df = pr_df[["total", "passed", "failed", "skipped", "CPU Usage", "GPU Usage"]] +diff_df = diff_df[["total", "passed", "failed", "skipped", "CPU Usage", "GPU Usage"]] diff_df.columns = diff_df.columns + "_diff" diff_df["passed_diff"] = diff_df["passed_diff"].map(emoji_passed) diff_df["failed_diff"] = diff_df["failed_diff"].map(emoji_failed) @@ -99,13 +112,36 @@ def emoji_failed(x): "passed_diff": "Passed delta", "failed_diff": "Failed delta", "skipped_diff": "Skipped delta", + "CPU Usage_diff": "CPU Usage delta", + "GPU Usage_diff": "GPU Usage delta", } ) -df = df.sort_values(by=["Failed tests", "Skipped tests"], ascending=False) - +df = df.sort_values(by=["CPU Usage delta", "Total tests"], ascending=False) +df["CPU Usage delta"] = df["CPU Usage delta"].map(emoji_failed) +df["GPU Usage delta"] = df["GPU Usage delta"].map(emoji_passed) +df = df[ + [ + "Total tests", + "CPU Usage delta", + "GPU Usage delta", + "Passed tests", + "Failed tests", + "Skipped tests", + "CPU Usage", + "GPU Usage", + "Total delta", + "Passed delta", + "Failed delta", + "Skipped delta", + ] +] print(comment) print() -print(f"Average CPU and GPU usage for the tests: {cpu_usage_mean}% and {gpu_usage_mean}%") +print( + f"Average GPU usage: {gpu_usage_mean}% {'an increase' if gpu_usage_rate_change > 0 else 'a decrease'} by {gpu_usage_rate_change}%" +) +print() +print(f"Average CPU usage: {cpu_usage_mean}%") print() print("Here are the results of running the Pandas tests against this PR:") print() From 987fea3d9c48ad567cb236ae1882f284f3711dd1 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 25 Sep 2024 15:53:25 -0400 Subject: [PATCH 5/7] JSON tree algorithms refactor I: CSR data structure for column tree (#15979) Part of #15903. 1. Introduces the Compressed Sparse Row (CSR) format to store the adjacency information of the column tree. 2. Analogous to `reduce_to_column_tree`, `reduce_to_column_tree_csr` reduces node tree representation to column tree stored in CSR format. TODO: - [x] Correctness test Authors: - Shruti Shivakumar (https://github.com/shrshi) - Vukasin Milovanovic (https://github.com/vuule) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Vukasin Milovanovic (https://github.com/vuule) - Nghia Truong (https://github.com/ttnghia) - Karthikeyan (https://github.com/karthikeyann) - Kyle Edwards (https://github.com/KyleFromNVIDIA) URL: https://github.com/rapidsai/cudf/pull/15979 --- cpp/CMakeLists.txt | 1 + cpp/src/io/json/column_tree_construction.cu | 304 ++++++++++++++++ cpp/src/io/json/json_column.cu | 48 +-- cpp/src/io/json/nested_json.hpp | 62 +++- cpp/tests/CMakeLists.txt | 1 + cpp/tests/io/json/json_tree_csr.cu | 370 ++++++++++++++++++++ 6 files changed, 758 insertions(+), 28 deletions(-) create mode 100644 cpp/src/io/json/column_tree_construction.cu create mode 100644 cpp/tests/io/json/json_tree_csr.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 84b462bb884..136f43ee706 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -380,6 +380,7 @@ add_library( src/io/functions.cpp src/io/json/host_tree_algorithms.cu src/io/json/json_column.cu + src/io/json/column_tree_construction.cu src/io/json/json_normalization.cu src/io/json/json_tree.cu src/io/json/nested_json_gpu.cu diff --git a/cpp/src/io/json/column_tree_construction.cu b/cpp/src/io/json/column_tree_construction.cu new file mode 100644 index 00000000000..c4fe7926706 --- /dev/null +++ b/cpp/src/io/json/column_tree_construction.cu @@ -0,0 +1,304 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "nested_json.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cudf::io::json { + +using row_offset_t = size_type; + +#ifdef CSR_DEBUG_PRINT +template +void print(device_span d_vec, std::string name, rmm::cuda_stream_view stream) +{ + stream.synchronize(); + auto h_vec = cudf::detail::make_std_vector_sync(d_vec, stream); + std::cout << name << " = "; + for (auto e : h_vec) { + std::cout << e << " "; + } + std::cout << std::endl; +} +#endif + +namespace experimental::detail { + +struct level_ordering { + device_span node_levels; + device_span col_ids; + device_span parent_node_ids; + __device__ bool operator()(NodeIndexT lhs_node_id, NodeIndexT rhs_node_id) const + { + auto lhs_parent_col_id = parent_node_ids[lhs_node_id] == parent_node_sentinel + ? parent_node_sentinel + : col_ids[parent_node_ids[lhs_node_id]]; + auto rhs_parent_col_id = parent_node_ids[rhs_node_id] == parent_node_sentinel + ? parent_node_sentinel + : col_ids[parent_node_ids[rhs_node_id]]; + + return (node_levels[lhs_node_id] < node_levels[rhs_node_id]) || + (node_levels[lhs_node_id] == node_levels[rhs_node_id] && + lhs_parent_col_id < rhs_parent_col_id) || + (node_levels[lhs_node_id] == node_levels[rhs_node_id] && + lhs_parent_col_id == rhs_parent_col_id && col_ids[lhs_node_id] < col_ids[rhs_node_id]); + } +}; + +struct parent_nodeids_to_colids { + device_span rev_mapped_col_ids; + __device__ auto operator()(NodeIndexT parent_node_id) -> NodeIndexT + { + return parent_node_id == parent_node_sentinel ? parent_node_sentinel + : rev_mapped_col_ids[parent_node_id]; + } +}; + +/** + * @brief Reduces node tree representation to column tree CSR representation. + * + * @param node_tree Node tree representation of JSON string + * @param original_col_ids Column ids of nodes + * @param row_offsets Row offsets of nodes + * @param is_array_of_arrays Whether the tree is an array of arrays + * @param row_array_parent_col_id Column id of row array, if is_array_of_arrays is true + * @param stream CUDA stream used for device memory operations and kernel launches + * @return A tuple of column tree representation of JSON string, column ids of columns, and + * max row offsets of columns + */ +std::tuple reduce_to_column_tree( + tree_meta_t& node_tree, + device_span original_col_ids, + device_span sorted_col_ids, + device_span ordered_node_ids, + device_span row_offsets, + bool is_array_of_arrays, + NodeIndexT row_array_parent_col_id, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + + if (original_col_ids.empty()) { + rmm::device_uvector empty_row_idx(0, stream); + rmm::device_uvector empty_col_idx(0, stream); + rmm::device_uvector empty_column_categories(0, stream); + rmm::device_uvector empty_max_row_offsets(0, stream); + rmm::device_uvector empty_mapped_col_ids(0, stream); + return std::tuple{compressed_sparse_row{std::move(empty_row_idx), std::move(empty_col_idx)}, + column_tree_properties{std::move(empty_column_categories), + std::move(empty_max_row_offsets), + std::move(empty_mapped_col_ids)}}; + } + + auto [unpermuted_tree, unpermuted_col_ids, unpermuted_max_row_offsets] = + cudf::io::json::detail::reduce_to_column_tree(node_tree, + original_col_ids, + sorted_col_ids, + ordered_node_ids, + row_offsets, + is_array_of_arrays, + row_array_parent_col_id, + stream); + + NodeIndexT num_columns = unpermuted_col_ids.size(); + + auto mapped_col_ids = cudf::detail::make_device_uvector_async( + unpermuted_col_ids, stream, cudf::get_current_device_resource_ref()); + rmm::device_uvector rev_mapped_col_ids(num_columns, stream); + rmm::device_uvector reordering_index(unpermuted_col_ids.size(), stream); + + thrust::sequence( + rmm::exec_policy_nosync(stream), reordering_index.begin(), reordering_index.end()); + // Reorder nodes and column ids in level-wise fashion + thrust::sort_by_key( + rmm::exec_policy_nosync(stream), + reordering_index.begin(), + reordering_index.end(), + mapped_col_ids.begin(), + level_ordering{ + unpermuted_tree.node_levels, unpermuted_col_ids, unpermuted_tree.parent_node_ids}); + + { + auto mapped_col_ids_copy = cudf::detail::make_device_uvector_async( + mapped_col_ids, stream, cudf::get_current_device_resource_ref()); + thrust::sequence( + rmm::exec_policy_nosync(stream), rev_mapped_col_ids.begin(), rev_mapped_col_ids.end()); + thrust::sort_by_key(rmm::exec_policy_nosync(stream), + mapped_col_ids_copy.begin(), + mapped_col_ids_copy.end(), + rev_mapped_col_ids.begin()); + } + + rmm::device_uvector parent_col_ids(num_columns, stream); + thrust::transform_output_iterator parent_col_ids_it(parent_col_ids.begin(), + parent_nodeids_to_colids{rev_mapped_col_ids}); + rmm::device_uvector max_row_offsets(num_columns, stream); + rmm::device_uvector column_categories(num_columns, stream); + thrust::copy_n( + rmm::exec_policy_nosync(stream), + thrust::make_zip_iterator(thrust::make_permutation_iterator( + unpermuted_tree.parent_node_ids.begin(), reordering_index.begin()), + thrust::make_permutation_iterator(unpermuted_max_row_offsets.begin(), + reordering_index.begin()), + thrust::make_permutation_iterator( + unpermuted_tree.node_categories.begin(), reordering_index.begin())), + num_columns, + thrust::make_zip_iterator( + parent_col_ids_it, max_row_offsets.begin(), column_categories.begin())); + +#ifdef CSR_DEBUG_PRINT + print(reordering_index, "h_reordering_index", stream); + print(mapped_col_ids, "h_mapped_col_ids", stream); + print(rev_mapped_col_ids, "h_rev_mapped_col_ids", stream); + print(parent_col_ids, "h_parent_col_ids", stream); + print(max_row_offsets, "h_max_row_offsets", stream); +#endif + + auto construct_row_idx = [&stream](NodeIndexT num_columns, + device_span parent_col_ids) { + auto row_idx = cudf::detail::make_zeroed_device_uvector_async( + static_cast(num_columns + 1), stream, cudf::get_current_device_resource_ref()); + // Note that the first element of csr_parent_col_ids is -1 (parent_node_sentinel) + // children adjacency + + auto num_non_leaf_columns = thrust::unique_count( + rmm::exec_policy_nosync(stream), parent_col_ids.begin() + 1, parent_col_ids.end()); + rmm::device_uvector non_leaf_nodes(num_non_leaf_columns, stream); + rmm::device_uvector non_leaf_nodes_children(num_non_leaf_columns, stream); + thrust::reduce_by_key(rmm::exec_policy_nosync(stream), + parent_col_ids.begin() + 1, + parent_col_ids.end(), + thrust::make_constant_iterator(1), + non_leaf_nodes.begin(), + non_leaf_nodes_children.begin(), + thrust::equal_to()); + + thrust::scatter(rmm::exec_policy_nosync(stream), + non_leaf_nodes_children.begin(), + non_leaf_nodes_children.end(), + non_leaf_nodes.begin(), + row_idx.begin() + 1); + + if (num_columns > 1) { + thrust::transform_inclusive_scan( + rmm::exec_policy_nosync(stream), + thrust::make_zip_iterator(thrust::make_counting_iterator(1), row_idx.begin() + 1), + thrust::make_zip_iterator(thrust::make_counting_iterator(1) + num_columns, row_idx.end()), + row_idx.begin() + 1, + cuda::proclaim_return_type([] __device__(auto a) { + auto n = thrust::get<0>(a); + auto idx = thrust::get<1>(a); + return n == 1 ? idx : idx + 1; + }), + thrust::plus{}); + } else { + auto single_node = 1; + row_idx.set_element_async(1, single_node, stream); + } + +#ifdef CSR_DEBUG_PRINT + print(row_idx, "h_row_idx", stream); +#endif + return row_idx; + }; + + auto construct_col_idx = [&stream](NodeIndexT num_columns, + device_span parent_col_ids, + device_span row_idx) { + rmm::device_uvector col_idx((num_columns - 1) * 2, stream); + thrust::fill(rmm::exec_policy_nosync(stream), col_idx.begin(), col_idx.end(), -1); + // excluding root node, construct scatter map + rmm::device_uvector map(num_columns - 1, stream); + thrust::inclusive_scan_by_key(rmm::exec_policy_nosync(stream), + parent_col_ids.begin() + 1, + parent_col_ids.end(), + thrust::make_constant_iterator(1), + map.begin()); + thrust::for_each_n(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(1), + num_columns - 1, + [row_idx = row_idx.begin(), + map = map.begin(), + parent_col_ids = parent_col_ids.begin()] __device__(auto i) { + auto parent_col_id = parent_col_ids[i]; + if (parent_col_id == 0) + --map[i - 1]; + else + map[i - 1] += row_idx[parent_col_id]; + }); + thrust::scatter(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(1), + thrust::make_counting_iterator(1) + num_columns - 1, + map.begin(), + col_idx.begin()); + + // Skip the parent of root node + thrust::scatter(rmm::exec_policy_nosync(stream), + parent_col_ids.begin() + 1, + parent_col_ids.end(), + row_idx.begin() + 1, + col_idx.begin()); + +#ifdef CSR_DEBUG_PRINT + print(col_idx, "h_col_idx", stream); +#endif + + return col_idx; + }; + + /* + 5. CSR construction: + a. Sort column levels and get their ordering + b. For each column node coln iterated according to sorted_column_levels; do + i. Find nodes that have coln as the parent node -> set adj_coln + ii. row idx[coln] = size of adj_coln + 1 + iii. col idx[coln] = adj_coln U {parent_col_id[coln]} + */ + auto row_idx = construct_row_idx(num_columns, parent_col_ids); + auto col_idx = construct_col_idx(num_columns, parent_col_ids, row_idx); + + return std::tuple{ + compressed_sparse_row{std::move(row_idx), std::move(col_idx)}, + column_tree_properties{ + std::move(column_categories), std::move(max_row_offsets), std::move(mapped_col_ids)}}; +} + +} // namespace experimental::detail +} // namespace cudf::io::json diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index b08fd139113..dfd9285f682 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -47,7 +47,6 @@ namespace cudf::io::json::detail { -// DEBUG prints auto to_cat = [](auto v) -> std::string { switch (v) { case NC_STRUCT: return " S"; @@ -106,18 +105,19 @@ void print_tree(host_span input, */ std::tuple, rmm::device_uvector> reduce_to_column_tree(tree_meta_t& tree, - device_span original_col_ids, - device_span sorted_col_ids, - device_span ordered_node_ids, - device_span row_offsets, + device_span original_col_ids, + device_span sorted_col_ids, + device_span ordered_node_ids, + device_span row_offsets, bool is_array_of_arrays, NodeIndexT const row_array_parent_col_id, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); + // 1. column count for allocation - auto const num_columns = - thrust::unique_count(rmm::exec_policy(stream), sorted_col_ids.begin(), sorted_col_ids.end()); + auto const num_columns = thrust::unique_count( + rmm::exec_policy_nosync(stream), sorted_col_ids.begin(), sorted_col_ids.end()); // 2. reduce_by_key {col_id}, {row_offset}, max. rmm::device_uvector unique_col_ids(num_columns, stream); @@ -162,30 +162,34 @@ reduce_to_column_tree(tree_meta_t& tree, }); // 4. unique_copy parent_node_ids, ranges - rmm::device_uvector column_levels(0, stream); // not required + rmm::device_uvector column_levels(num_columns, stream); // not required rmm::device_uvector parent_col_ids(num_columns, stream); rmm::device_uvector col_range_begin(num_columns, stream); // Field names rmm::device_uvector col_range_end(num_columns, stream); rmm::device_uvector unique_node_ids(num_columns, stream); - thrust::unique_by_key_copy(rmm::exec_policy(stream), + thrust::unique_by_key_copy(rmm::exec_policy_nosync(stream), sorted_col_ids.begin(), sorted_col_ids.end(), ordered_node_ids.begin(), thrust::make_discard_iterator(), unique_node_ids.begin()); + thrust::copy_n( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::make_zip_iterator( + thrust::make_permutation_iterator(tree.node_levels.begin(), unique_node_ids.begin()), thrust::make_permutation_iterator(tree.parent_node_ids.begin(), unique_node_ids.begin()), thrust::make_permutation_iterator(tree.node_range_begin.begin(), unique_node_ids.begin()), thrust::make_permutation_iterator(tree.node_range_end.begin(), unique_node_ids.begin())), unique_node_ids.size(), - thrust::make_zip_iterator( - parent_col_ids.begin(), col_range_begin.begin(), col_range_end.begin())); + thrust::make_zip_iterator(column_levels.begin(), + parent_col_ids.begin(), + col_range_begin.begin(), + col_range_end.begin())); // convert parent_node_ids to parent_col_ids thrust::transform( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), parent_col_ids.begin(), parent_col_ids.end(), parent_col_ids.begin(), @@ -203,18 +207,17 @@ reduce_to_column_tree(tree_meta_t& tree, column_categories[parent_col_id] == NC_LIST && (!is_array_of_arrays || parent_col_id != row_array_parent_col_id)); }; + // Mixed types in List children go to different columns, // so all immediate children of list column should have same max_row_offsets. // create list's children max_row_offsets array. (initialize to zero) // atomicMax on children max_row_offsets array. // gather the max_row_offsets from children row offset array. { - rmm::device_uvector list_parents_children_max_row_offsets(num_columns, stream); - thrust::fill(rmm::exec_policy(stream), - list_parents_children_max_row_offsets.begin(), - list_parents_children_max_row_offsets.end(), - 0); - thrust::for_each(rmm::exec_policy(stream), + auto list_parents_children_max_row_offsets = + cudf::detail::make_zeroed_device_uvector_async( + static_cast(num_columns), stream, cudf::get_current_device_resource_ref()); + thrust::for_each(rmm::exec_policy_nosync(stream), unique_col_ids.begin(), unique_col_ids.end(), [column_categories = column_categories.begin(), @@ -230,8 +233,9 @@ reduce_to_column_tree(tree_meta_t& tree, ref.fetch_max(max_row_offsets[col_id], cuda::std::memory_order_relaxed); } }); + thrust::gather_if( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), parent_col_ids.begin(), parent_col_ids.end(), parent_col_ids.begin(), @@ -246,7 +250,7 @@ reduce_to_column_tree(tree_meta_t& tree, // copy lists' max_row_offsets to children. // all structs should have same size. thrust::transform_if( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), unique_col_ids.begin(), unique_col_ids.end(), max_row_offsets.begin(), @@ -272,7 +276,7 @@ reduce_to_column_tree(tree_meta_t& tree, // For Struct and List (to avoid copying entire strings when mixed type as string is enabled) thrust::transform_if( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), col_range_begin.begin(), col_range_begin.end(), column_categories.begin(), diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 83f71e657a7..93ef2b46be1 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -185,6 +185,55 @@ struct device_json_column { } }; +namespace experimental { +/* + * @brief Sparse graph adjacency matrix stored in Compressed Sparse Row (CSR) format. + */ +struct compressed_sparse_row { + rmm::device_uvector row_idx; + rmm::device_uvector col_idx; +}; + +/* + * @brief Auxiliary column tree properties that are required to construct the device json + * column subtree, but not required for the final cudf column construction. + */ +struct column_tree_properties { + rmm::device_uvector categories; + rmm::device_uvector max_row_offsets; + rmm::device_uvector mapped_ids; +}; + +namespace detail { +/** + * @brief Reduce node tree into column tree by aggregating each property of column. + * + * @param node_tree Node tree representation of JSON string + * @param original_col_ids Column ids of nodes + * @param sorted_col_ids Sorted column ids of nodes + * @param ordered_node_ids Node ids of nodes sorted by column ids + * @param row_offsets Row offsets of nodes + * @param is_array_of_arrays Whether the tree is an array of arrays + * @param row_array_parent_col_id Column id of row array, if is_array_of_arrays is true + * @param stream CUDA stream used for device memory operations and kernel launches + * @return Tuple of compressed_sparse_row struct storing adjacency information of the column tree, + * and column_tree_properties struct storing properties of each node i.e. column category, max + * number of rows in the column, and column id + */ +CUDF_EXPORT +std::tuple reduce_to_column_tree( + tree_meta_t& node_tree, + device_span original_col_ids, + device_span sorted_col_ids, + device_span ordered_node_ids, + device_span row_offsets, + bool is_array_of_arrays, + NodeIndexT row_array_parent_col_id, + rmm::cuda_stream_view stream); + +} // namespace detail +} // namespace experimental + namespace detail { // TODO: return device_uvector instead of passing pre-allocated memory @@ -303,7 +352,7 @@ get_array_children_indices(TreeDepthT row_array_children_level, /** * @brief Reduces node tree representation to column tree representation. * - * @param tree Node tree representation of JSON string + * @param node_tree Node tree representation of JSON string * @param original_col_ids Column ids of nodes * @param sorted_col_ids Sorted column ids of nodes * @param ordered_node_ids Node ids of nodes sorted by column ids @@ -314,12 +363,13 @@ get_array_children_indices(TreeDepthT row_array_children_level, * @return A tuple of column tree representation of JSON string, column ids of columns, and * max row offsets of columns */ +CUDF_EXPORT std::tuple, rmm::device_uvector> -reduce_to_column_tree(tree_meta_t& tree, - device_span original_col_ids, - device_span sorted_col_ids, - device_span ordered_node_ids, - device_span row_offsets, +reduce_to_column_tree(tree_meta_t& node_tree, + device_span original_col_ids, + device_span sorted_col_ids, + device_span ordered_node_ids, + device_span row_offsets, bool is_array_of_arrays, NodeIndexT const row_array_parent_col_id, rmm::cuda_stream_view stream); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 288fa84a73d..b67d922d377 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -329,6 +329,7 @@ ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cp ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp) ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu) +ConfigureTest(JSON_TREE_CSR io/json/json_tree_csr.cu) ConfigureTest( DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp GPUS 1 diff --git a/cpp/tests/io/json/json_tree_csr.cu b/cpp/tests/io/json/json_tree_csr.cu new file mode 100644 index 00000000000..a336b327732 --- /dev/null +++ b/cpp/tests/io/json/json_tree_csr.cu @@ -0,0 +1,370 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/json/nested_json.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +namespace cuio_json = cudf::io::json; + +struct h_tree_meta_t { + std::vector node_categories; + std::vector parent_node_ids; + std::vector node_range_begin; + std::vector node_range_end; +}; + +struct h_column_tree { + // position of nnzs + std::vector row_idx; + std::vector col_idx; + // node properties + std::vector categories; + std::vector column_ids; +}; + +// debug printing +template +void print(cudf::host_span vec, std::string name) +{ + std::cout << name << " = "; + for (auto e : vec) { + std::cout << e << " "; + } + std::cout << std::endl; +} + +bool check_equality(cuio_json::tree_meta_t& d_a, + cudf::device_span d_a_max_row_offsets, + cuio_json::experimental::compressed_sparse_row& d_b_csr, + cuio_json::experimental::column_tree_properties& d_b_ctp, + rmm::cuda_stream_view stream) +{ + // convert from tree_meta_t to column_tree_csr + stream.synchronize(); + + h_tree_meta_t a{cudf::detail::make_std_vector_async(d_a.node_categories, stream), + cudf::detail::make_std_vector_async(d_a.parent_node_ids, stream), + cudf::detail::make_std_vector_async(d_a.node_range_begin, stream), + cudf::detail::make_std_vector_async(d_a.node_range_end, stream)}; + + h_column_tree b{cudf::detail::make_std_vector_async(d_b_csr.row_idx, stream), + cudf::detail::make_std_vector_async(d_b_csr.col_idx, stream), + cudf::detail::make_std_vector_async(d_b_ctp.categories, stream), + cudf::detail::make_std_vector_async(d_b_ctp.mapped_ids, stream)}; + + auto a_max_row_offsets = cudf::detail::make_std_vector_async(d_a_max_row_offsets, stream); + auto b_max_row_offsets = cudf::detail::make_std_vector_async(d_b_ctp.max_row_offsets, stream); + + stream.synchronize(); + + auto num_nodes = a.parent_node_ids.size(); + if (num_nodes > 1) { + if (b.row_idx.size() != num_nodes + 1) { return false; } + + for (auto pos = b.row_idx[0]; pos < b.row_idx[1]; pos++) { + auto v = b.col_idx[pos]; + if (a.parent_node_ids[b.column_ids[v]] != b.column_ids[0]) { return false; } + } + for (size_t u = 1; u < num_nodes; u++) { + auto v = b.col_idx[b.row_idx[u]]; + if (a.parent_node_ids[b.column_ids[u]] != b.column_ids[v]) { return false; } + + for (auto pos = b.row_idx[u] + 1; pos < b.row_idx[u + 1]; pos++) { + v = b.col_idx[pos]; + if (a.parent_node_ids[b.column_ids[v]] != b.column_ids[u]) { return false; } + } + } + for (size_t u = 0; u < num_nodes; u++) { + if (a.node_categories[b.column_ids[u]] != b.categories[u]) { return false; } + } + for (size_t u = 0; u < num_nodes; u++) { + if (a_max_row_offsets[b.column_ids[u]] != b_max_row_offsets[u]) { return false; } + } + } else if (num_nodes == 1) { + if (b.row_idx.size() != num_nodes + 1) { return false; } + + if (b.row_idx[0] != 0 || b.row_idx[1] != 1) return false; + if (!b.col_idx.empty()) return false; + for (size_t u = 0; u < num_nodes; u++) { + if (a.node_categories[b.column_ids[u]] != b.categories[u]) { return false; } + } + + for (size_t u = 0; u < num_nodes; u++) { + if (a_max_row_offsets[b.column_ids[u]] != b_max_row_offsets[u]) { return false; } + } + } + return true; +} + +void run_test(std::string const& input, bool enable_lines = true) +{ + auto const stream = cudf::get_default_stream(); + cudf::string_scalar d_scalar(input, true, stream); + auto d_input = cudf::device_span{d_scalar.data(), + static_cast(d_scalar.size())}; + + cudf::io::json_reader_options options{}; + options.enable_lines(enable_lines); + options.enable_mixed_types_as_string(true); + + // Parse the JSON and get the token stream + auto const [tokens_gpu, token_indices_gpu] = cudf::io::json::detail::get_token_stream( + d_input, options, stream, cudf::get_current_device_resource_ref()); + + // Get the JSON's tree representation + auto gpu_tree = + cuio_json::detail::get_tree_representation(tokens_gpu, + token_indices_gpu, + options.is_enabled_mixed_types_as_string(), + stream, + cudf::get_current_device_resource_ref()); + + bool const is_array_of_arrays = [&]() { + std::array h_node_categories = {cuio_json::NC_ERR, cuio_json::NC_ERR}; + auto const size_to_copy = std::min(size_t{2}, gpu_tree.node_categories.size()); + CUDF_CUDA_TRY(cudaMemcpyAsync(h_node_categories.data(), + gpu_tree.node_categories.data(), + sizeof(cuio_json::node_t) * size_to_copy, + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + if (options.is_enabled_lines()) return h_node_categories[0] == cuio_json::NC_LIST; + return h_node_categories[0] == cuio_json::NC_LIST and + h_node_categories[1] == cuio_json::NC_LIST; + }(); + + auto tup = + cuio_json::detail::records_orient_tree_traversal(d_input, + gpu_tree, + is_array_of_arrays, + options.is_enabled_lines(), + stream, + rmm::mr::get_current_device_resource()); + auto& gpu_col_id = std::get<0>(tup); + auto& gpu_row_offsets = std::get<1>(tup); + + auto const num_nodes = gpu_col_id.size(); + rmm::device_uvector sorted_col_ids(gpu_col_id.size(), stream); // make a copy + thrust::copy( + rmm::exec_policy(stream), gpu_col_id.begin(), gpu_col_id.end(), sorted_col_ids.begin()); + + // sort by {col_id} on {node_ids} stable + rmm::device_uvector node_ids(gpu_col_id.size(), stream); + thrust::sequence(rmm::exec_policy(stream), node_ids.begin(), node_ids.end()); + thrust::stable_sort_by_key( + rmm::exec_policy(stream), sorted_col_ids.begin(), sorted_col_ids.end(), node_ids.begin()); + + cudf::size_type const row_array_parent_col_id = [&]() { + cudf::size_type value = cuio_json::parent_node_sentinel; + auto const list_node_index = options.is_enabled_lines() ? 0 : 1; + CUDF_CUDA_TRY(cudaMemcpyAsync(&value, + gpu_col_id.data() + list_node_index, + sizeof(cudf::size_type), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + return value; + }(); + + auto [d_column_tree, d_unique_col_ids, d_max_row_offsets] = + cudf::io::json::detail::reduce_to_column_tree(gpu_tree, + gpu_col_id, + sorted_col_ids, + node_ids, + gpu_row_offsets, + is_array_of_arrays, + row_array_parent_col_id, + stream); + + auto [d_column_tree_csr, d_column_tree_properties] = + cudf::io::json::experimental::detail::reduce_to_column_tree(gpu_tree, + gpu_col_id, + sorted_col_ids, + node_ids, + gpu_row_offsets, + is_array_of_arrays, + row_array_parent_col_id, + stream); + + auto iseq = check_equality( + d_column_tree, d_max_row_offsets, d_column_tree_csr, d_column_tree_properties, stream); + // assert equality between csr and meta formats + ASSERT_TRUE(iseq); +} + +struct JsonColumnTreeTests : public cudf::test::BaseFixture {}; + +TEST_F(JsonColumnTreeTests, JSONL_Small) +{ + std::string const input = + R"( {} + { "a": { "y" : 6, "z": [] }} + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} )"; // Prepare input & output buffers + run_test(input); +} + +TEST_F(JsonColumnTreeTests, JSONL_Large) +{ + std::string const input = + R"( {} + {} + { "a": { "y" : 6, "z": [] }} + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} + { "a": { "y" : 6, "z": [] }} + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} + { "a": { "y" : 6, "z": [] }} + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} + { "a": { "y" : 6, "z": [] }} + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} )"; + run_test(input); +} + +TEST_F(JsonColumnTreeTests, JSONL_ListofStruct) +{ + std::string const input = R"( + { "Root": { "Key": [ { "EE": "A" } ] } } + { "Root": { "Key": { } } } + { "Root": { "Key": [{ "YY": 1}] } } + )"; + run_test(input); +} + +TEST_F(JsonColumnTreeTests, JSONL_MissingEntries) +{ + std::string json_stringl = R"( + {"a": 1, "b": {"0": "abc", "1": [-1.]}, "c": true} + {"a": 1, "b": {"0": "abc" }, "c": false} + {"a": 1, "b": {}} + {"a": 1, "c": null} + )"; + run_test(json_stringl); +} + +TEST_F(JsonColumnTreeTests, JSONL_MoreMissingEntries) +{ + std::string json_stringl = R"( + { "foo1": [1,2,3], "bar": 123 } + { "foo2": { "a": 1 }, "bar": 456 } + { "foo1": [1,2,3], "bar": 123 } + { "foo2": { "a": 1 }, "bar": 456 } + { "foo1": [1,2,3], "bar": 123 } + { "foo2": { "a": 1 }, "bar": 456 } + )"; + run_test(json_stringl); +} + +TEST_F(JsonColumnTreeTests, JSONL_StillMoreMissingEntries) +{ + std::string json_stringl = R"( + { "foo1": [1,2,3], "bar": 123 } + { "foo2": { "a": 1 }, "bar": 456 } + { "foo1": ["123","456"], "bar": 123 } + { "foo2": { "b": 5 }, "car": 456 } + { "foo1": [1,2,3], "bar": 123 } + { "foo2": { "a": 1 }, "bar": 456 } + )"; + run_test(json_stringl); +} + +TEST_F(JsonColumnTreeTests, JSON_MissingEntries) +{ + std::string json_string = R"([ + {"a": 1, "b": {"0": "abc", "1": [-1.]}, "c": true}, + {"a": 1, "b": {"0": "abc" }, "c": false}, + {"a": 1, "b": {}}, + {"a": 1, "c": null} + ])"; + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSON_StructOfStructs) +{ + std::string json_string = + R"([ + {}, + { "a": { "y" : 6, "z": [] }}, + { "a" : { "x" : 8, "y": 9 }, "b" : {"x": 10 , "z": 11 }} + ])"; // Prepare input & output buffers + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSONL_ArrayOfArrays_NestedList) +{ + std::string json_string = + R"([123, [1,2,3]] + [456, null, { "a": 1 }])"; + run_test(json_string); +} + +TEST_F(JsonColumnTreeTests, JSON_ArrayofArrays_NestedList) +{ + std::string json_string = R"([[[1,2,3], null, 123], + [null, { "a": 1 }, 456 ]])"; + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSON_CornerCase_Empty) +{ + std::string json_string = R"([])"; + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSONL_CornerCase_List) +{ + std::string json_string = R"([123])"; + run_test(json_string, true); +} + +TEST_F(JsonColumnTreeTests, JSON_CornerCase_EmptyNestedList) +{ + std::string json_string = R"([[[]]])"; + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSON_CornerCase_EmptyNestedLists) +{ + std::string json_string = R"([[], [], []])"; + run_test(json_string, false); +} + +TEST_F(JsonColumnTreeTests, JSONL_CornerCase_ListofLists) +{ + std::string json_string = R"([[1, 2, 3], [4, 5, null], []])"; + run_test(json_string, true); +} + +TEST_F(JsonColumnTreeTests, JSONL_CornerCase_EmptyListOfLists) +{ + std::string json_string = R"([[]])"; + run_test(json_string, true); +} From ba4afae921f6d1906a201636c084a82a8586bb36 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Wed, 25 Sep 2024 16:03:41 -0500 Subject: [PATCH 6/7] Make tests deterministic (#16910) This PR is a first pass of making tests deterministic, I noticed one of CI job failed due to an overflow error related to random data generation. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Richard (Rick) Zamora (https://github.com/rjzamora) URL: https://github.com/rapidsai/cudf/pull/16910 --- python/cudf/cudf/tests/test_array_function.py | 26 +++++++++---------- .../test_avro_reader_fastavro_integration.py | 3 ++- python/cudf/cudf/tests/test_groupby.py | 7 +++++ .../dask_cudf/tests/test_reductions.py | 2 +- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/tests/test_array_function.py b/python/cudf/cudf/tests/test_array_function.py index 773141ee71a..979c936a182 100644 --- a/python/cudf/cudf/tests/test_array_function.py +++ b/python/cudf/cudf/tests/test_array_function.py @@ -33,9 +33,10 @@ def __array_function__(self, *args, **kwargs): missing_arrfunc_reason = "NEP-18 support is not available in NumPy" +np.random.seed(0) + @pytest.mark.skipif(missing_arrfunc_cond, reason=missing_arrfunc_reason) -@pytest.mark.parametrize("np_ar", [np.random.random(100)]) @pytest.mark.parametrize( "func", [ @@ -47,7 +48,8 @@ def __array_function__(self, *args, **kwargs): lambda x: np.linalg.norm(x), ], ) -def test_array_func_cudf_series(np_ar, func): +def test_array_func_cudf_series(func): + np_ar = np.random.random(100) cudf_ser = cudf.Series(np_ar) expect = func(np_ar) got = func(cudf_ser) @@ -58,9 +60,6 @@ def test_array_func_cudf_series(np_ar, func): @pytest.mark.skipif(missing_arrfunc_cond, reason=missing_arrfunc_reason) -@pytest.mark.parametrize( - "pd_df", [pd.DataFrame(np.random.uniform(size=(100, 10)))] -) @pytest.mark.parametrize( "func", [ @@ -74,7 +73,8 @@ def test_array_func_cudf_series(np_ar, func): lambda x: np.prod(x, axis=1), ], ) -def test_array_func_cudf_dataframe(pd_df, func): +def test_array_func_cudf_dataframe(func): + pd_df = pd.DataFrame(np.random.uniform(size=(100, 10))) cudf_df = cudf.from_pandas(pd_df) expect = func(pd_df) got = func(cudf_df) @@ -82,9 +82,6 @@ def test_array_func_cudf_dataframe(pd_df, func): @pytest.mark.skipif(missing_arrfunc_cond, reason=missing_arrfunc_reason) -@pytest.mark.parametrize( - "pd_df", [pd.DataFrame(np.random.uniform(size=(100, 10)))] -) @pytest.mark.parametrize( "func", [ @@ -93,21 +90,22 @@ def test_array_func_cudf_dataframe(pd_df, func): lambda x: np.linalg.det(x), ], ) -def test_array_func_missing_cudf_dataframe(pd_df, func): +def test_array_func_missing_cudf_dataframe(func): + pd_df = pd.DataFrame(np.random.uniform(size=(100, 10))) cudf_df = cudf.from_pandas(pd_df) with pytest.raises(TypeError): func(cudf_df) @pytest.mark.skipif(missing_arrfunc_cond, reason=missing_arrfunc_reason) -@pytest.mark.parametrize("np_ar", [np.random.random(100)]) @pytest.mark.parametrize( "func", [ lambda x: np.unique(x), ], ) -def test_array_func_cudf_index(np_ar, func): +def test_array_func_cudf_index(func): + np_ar = np.random.random(100) cudf_index = cudf.Index(cudf.Series(np_ar)) expect = func(np_ar) got = func(cudf_index) @@ -118,7 +116,6 @@ def test_array_func_cudf_index(np_ar, func): @pytest.mark.skipif(missing_arrfunc_cond, reason=missing_arrfunc_reason) -@pytest.mark.parametrize("np_ar", [np.random.random(100)]) @pytest.mark.parametrize( "func", [ @@ -127,7 +124,8 @@ def test_array_func_cudf_index(np_ar, func): lambda x: np.linalg.det(x), ], ) -def test_array_func_missing_cudf_index(np_ar, func): +def test_array_func_missing_cudf_index(func): + np_ar = np.random.random(100) cudf_index = cudf.Index(cudf.Series(np_ar)) with pytest.raises(TypeError): func(cudf_index) diff --git a/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py b/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py index 9d69e626c3d..5acdf36de80 100644 --- a/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py +++ b/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py @@ -236,6 +236,7 @@ def test_avro_compression(rows, codec): }, ], rows, + seed=0, ) expected_df = cudf.DataFrame.from_arrow(df) @@ -599,7 +600,7 @@ def test_avro_reader_multiblock( else: assert dtype in ("float32", "float64") avro_type = "float" if dtype == "float32" else "double" - + np.random.seed(0) # We don't use rand_dataframe() here, because it increases the # execution time of each test by a factor of 10 or more (it appears # to use a very costly approach to generating random data). diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 0aaa71e50d7..848bc259e7b 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2470,6 +2470,7 @@ def test_groupby_2keys_rank(nelem, method, ascending, na_option, pct): ], rows=nelem, use_threads=False, + seed=0, ) pdf = t.to_pandas() pdf.columns = ["x", "y", "z"] @@ -2602,6 +2603,7 @@ def test_groupby_shift_row_mixed_numerics( ], rows=nelem, use_threads=False, + seed=0, ) pdf = t.to_pandas() gdf = cudf.from_pandas(pdf) @@ -2639,6 +2641,7 @@ def test_groupby_shift_row_mixed(nelem, shift_perc, direction): ], rows=nelem, use_threads=False, + seed=0, ) pdf = t.to_pandas() gdf = cudf.from_pandas(pdf) @@ -2687,6 +2690,7 @@ def test_groupby_shift_row_mixed_fill( ], rows=nelem, use_threads=False, + seed=0, ) pdf = t.to_pandas() gdf = cudf.from_pandas(pdf) @@ -2732,6 +2736,7 @@ def test_groupby_shift_row_zero_shift(nelem, fill_value): ], rows=nelem, use_threads=False, + seed=0, ) gdf = cudf.from_pandas(t.to_pandas()) @@ -2782,6 +2787,7 @@ def test_groupby_diff_row_mixed_numerics(nelem, shift_perc, direction): ], rows=nelem, use_threads=False, + seed=0, ) pdf = t.to_pandas() gdf = cudf.from_pandas(pdf) @@ -2815,6 +2821,7 @@ def test_groupby_diff_row_zero_shift(nelem): ], rows=nelem, use_threads=False, + seed=0, ) gdf = cudf.from_pandas(t.to_pandas()) diff --git a/python/dask_cudf/dask_cudf/tests/test_reductions.py b/python/dask_cudf/dask_cudf/tests/test_reductions.py index 88b15718382..d03e92319be 100644 --- a/python/dask_cudf/dask_cudf/tests/test_reductions.py +++ b/python/dask_cudf/dask_cudf/tests/test_reductions.py @@ -13,6 +13,7 @@ def _make_random_frame(nelem, npartitions=2): + np.random.seed(0) df = pd.DataFrame( { "x": np.random.randint(0, 5, size=nelem), @@ -38,7 +39,6 @@ def wrapped(series): @pytest.mark.parametrize("reducer", _reducers) def test_series_reduce(reducer): reducer = _get_reduce_fn(reducer) - np.random.seed(0) size = 10 df, gdf = _make_random_frame(size) From d1b411a273486c0e4205384589d33372b6e32a59 Mon Sep 17 00:00:00 2001 From: Bradley Dice Date: Wed, 25 Sep 2024 17:59:35 -0500 Subject: [PATCH 7/7] Fix 24.10 to 24.12 forward merge (#16876) Fixes forward merge into 24.12. Some 24.10 references were left behind. Authors: - Bradley Dice (https://github.com/bdice) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16876 --- .github/workflows/pr.yaml | 2 +- ci/release/update-version.sh | 1 + ci/test_cudf_polars_polars_tests.sh | 2 +- docs/dask_cudf/source/best_practices.rst | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 76440403105..a65cae34653 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -262,7 +262,7 @@ jobs: cudf-polars-polars-tests: needs: wheel-build-cudf-polars secrets: inherit - uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.10 + uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.12 with: # This selects "ARCH=amd64 + the latest supported Python + CUDA". matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index f73e88bc0c8..870901d223b 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -82,6 +82,7 @@ for FILE in .github/workflows/*.yaml .github/workflows/*.yml; do sed_runner "s/dask-cuda.git@branch-[^\"\s]\+/dask-cuda.git@branch-${NEXT_SHORT_TAG}/g" "${FILE}" done sed_runner "s/branch-[0-9]\+\.[0-9]\+/branch-${NEXT_SHORT_TAG}/g" ci/test_wheel_cudf_polars.sh +sed_runner "s/branch-[0-9]\+\.[0-9]\+/branch-${NEXT_SHORT_TAG}/g" ci/test_cudf_polars_polars_tests.sh # Java files NEXT_FULL_JAVA_TAG="${NEXT_SHORT_TAG}.${PATCH_PEP440}-SNAPSHOT" diff --git a/ci/test_cudf_polars_polars_tests.sh b/ci/test_cudf_polars_polars_tests.sh index bfc8fd37565..55399d0371a 100755 --- a/ci/test_cudf_polars_polars_tests.sh +++ b/ci/test_cudf_polars_polars_tests.sh @@ -10,7 +10,7 @@ set -eou pipefail # files in cudf_polars/pylibcudf", rather than "are there changes # between upstream and this branch which touch cudf_polars/pylibcudf" # TODO: is the target branch exposed anywhere in an environment variable? -if [ -n "$(git diff --name-only origin/branch-24.10...HEAD -- python/cudf_polars/ python/cudf/cudf/_lib/pylibcudf/)" ]; +if [ -n "$(git diff --name-only origin/branch-24.12...HEAD -- python/cudf_polars/ python/cudf/cudf/_lib/pylibcudf/)" ]; then HAS_CHANGES=1 rapids-logger "PR has changes in cudf-polars/pylibcudf, test fails treated as failure" diff --git a/docs/dask_cudf/source/best_practices.rst b/docs/dask_cudf/source/best_practices.rst index 142124163af..6cd098da56d 100644 --- a/docs/dask_cudf/source/best_practices.rst +++ b/docs/dask_cudf/source/best_practices.rst @@ -81,7 +81,7 @@ representations, native cuDF spilling may be insufficient. For these cases, `JIT-unspill `__ is likely to produce better protection from out-of-memory (OOM) errors. Please see `Dask-CUDA's spilling documentation -`__ for further details +`__ for further details and guidance. Use RMM