From 1ff0833eeaac85465c7d5f897d25b6f7c31f1b4f Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:12:21 -0800 Subject: [PATCH] Move cudf._lib.aggregation to cudf.core._internals --- python/cudf/cudf/_lib/CMakeLists.txt | 1 - python/cudf/cudf/_lib/aggregation.pyx | 245 --------------- python/cudf/cudf/_lib/groupby.pyx | 2 +- python/cudf/cudf/_lib/reduce.pyx | 2 +- .../cudf/cudf/core/_internals/aggregation.py | 288 ++++++++++++++++++ python/cudf/cudf/core/window/rolling.py | 2 +- 6 files changed, 291 insertions(+), 249 deletions(-) delete mode 100644 python/cudf/cudf/_lib/aggregation.pyx create mode 100644 python/cudf/cudf/core/_internals/aggregation.py diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index dd27aae7133..ebd6f3b6f0d 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -13,7 +13,6 @@ # ============================================================================= set(cython_sources - aggregation.pyx column.pyx copying.pyx csv.pyx diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx deleted file mode 100644 index 3c96b90f0a1..00000000000 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ /dev/null @@ -1,245 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -import pandas as pd -from numba.np import numpy_support - -import pylibcudf - -import cudf -from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES -from cudf.utils import cudautils - -_agg_name_map = { - "COUNT_VALID": "COUNT", - "COUNT_ALL": "SIZE", - "VARIANCE": "VAR", - "NTH_ELEMENT": "NTH", - "COLLECT_LIST": "COLLECT", - "COLLECT_SET": "UNIQUE", -} - - -class Aggregation: - def __init__(self, agg): - self.c_obj = agg - - @property - def kind(self): - name = self.c_obj.kind().name - return _agg_name_map.get(name, name) - - @classmethod - def sum(cls): - return cls(pylibcudf.aggregation.sum()) - - @classmethod - def min(cls): - return cls(pylibcudf.aggregation.min()) - - @classmethod - def max(cls): - return cls(pylibcudf.aggregation.max()) - - @classmethod - def idxmin(cls): - return cls(pylibcudf.aggregation.argmin()) - - @classmethod - def idxmax(cls): - return cls(pylibcudf.aggregation.argmax()) - - @classmethod - def mean(cls): - return cls(pylibcudf.aggregation.mean()) - - @classmethod - def count(cls, dropna=True): - return cls(pylibcudf.aggregation.count( - pylibcudf.types.NullPolicy.EXCLUDE - if dropna else pylibcudf.types.NullPolicy.INCLUDE - )) - - @classmethod - def ewma(cls, com=1.0, adjust=True): - return cls(pylibcudf.aggregation.ewma( - com, - pylibcudf.aggregation.EWMHistory.INFINITE - if adjust else pylibcudf.aggregation.EWMHistory.FINITE - )) - - @classmethod - def size(cls): - return cls(pylibcudf.aggregation.count(pylibcudf.types.NullPolicy.INCLUDE)) - - @classmethod - def collect(cls): - return cls( - pylibcudf.aggregation.collect_list(pylibcudf.types.NullPolicy.INCLUDE) - ) - - @classmethod - def nunique(cls, dropna=True): - return cls(pylibcudf.aggregation.nunique( - pylibcudf.types.NullPolicy.EXCLUDE - if dropna else pylibcudf.types.NullPolicy.INCLUDE - )) - - @classmethod - def nth(cls, size): - return cls(pylibcudf.aggregation.nth_element(size)) - - @classmethod - def product(cls): - return cls(pylibcudf.aggregation.product()) - prod = product - - @classmethod - def sum_of_squares(cls): - return cls(pylibcudf.aggregation.sum_of_squares()) - - @classmethod - def var(cls, ddof=1): - return cls(pylibcudf.aggregation.variance(ddof)) - - @classmethod - def std(cls, ddof=1): - return cls(pylibcudf.aggregation.std(ddof)) - - @classmethod - def median(cls): - return cls(pylibcudf.aggregation.median()) - - @classmethod - def quantile(cls, q=0.5, interpolation="linear"): - if not pd.api.types.is_list_like(q): - q = [q] - - return cls(pylibcudf.aggregation.quantile( - q, pylibcudf.types.Interpolation[interpolation.upper()] - )) - - @classmethod - def unique(cls): - return cls(pylibcudf.aggregation.collect_set( - pylibcudf.types.NullPolicy.INCLUDE, - pylibcudf.types.NullEquality.EQUAL, - pylibcudf.types.NanEquality.ALL_EQUAL, - - )) - - @classmethod - def first(cls): - return cls( - pylibcudf.aggregation.nth_element(0, pylibcudf.types.NullPolicy.EXCLUDE) - ) - - @classmethod - def last(cls): - return cls( - pylibcudf.aggregation.nth_element(-1, pylibcudf.types.NullPolicy.EXCLUDE) - ) - - @classmethod - def corr(cls, method, min_periods): - return cls(pylibcudf.aggregation.correlation( - pylibcudf.aggregation.CorrelationType[method.upper()], - min_periods - - )) - - @classmethod - def cov(cls, min_periods, ddof=1): - return cls(pylibcudf.aggregation.covariance( - min_periods, - ddof - )) - - # scan aggregations - @classmethod - def cumcount(cls): - return cls.count(False) - - cumsum = sum - cummin = min - cummax = max - cumprod = product - - @classmethod - def rank(cls, method, ascending, na_option, pct): - return cls(pylibcudf.aggregation.rank( - pylibcudf.aggregation.RankMethod[method.upper()], - (pylibcudf.types.Order.ASCENDING if ascending else - pylibcudf.types.Order.DESCENDING), - (pylibcudf.types.NullPolicy.EXCLUDE if na_option == "keep" else - pylibcudf.types.NullPolicy.INCLUDE), - (pylibcudf.types.NullOrder.BEFORE - if (na_option == "top") == ascending else - pylibcudf.types.NullOrder.AFTER), - (pylibcudf.aggregation.RankPercentage.ZERO_NORMALIZED - if pct else - pylibcudf.aggregation.RankPercentage.NONE) - - )) - - # Reduce aggregations - @classmethod - def any(cls): - return cls(pylibcudf.aggregation.any()) - - @classmethod - def all(cls): - return cls(pylibcudf.aggregation.all()) - - # Rolling aggregations - @classmethod - def from_udf(cls, op, *args, **kwargs): - # Handling UDF type - nb_type = numpy_support.from_dtype(kwargs['dtype']) - type_signature = (nb_type[:],) - ptx_code, output_dtype = cudautils.compile_udf(op, type_signature) - output_np_dtype = cudf.dtype(output_dtype) - if output_np_dtype not in SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES: - raise TypeError(f"Result of window function has unsupported dtype {op[1]}") - - return cls( - pylibcudf.aggregation.udf( - ptx_code, - pylibcudf.DataType(SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES[output_np_dtype]), - ) - ) - - -def make_aggregation(op, kwargs=None): - r""" - Parameters - ---------- - op : str or callable - If callable, must meet one of the following requirements: - - * Is of the form lambda x: x.agg(*args, **kwargs), where - `agg` is the name of a supported aggregation. Used to - to specify aggregations that take arguments, e.g., - `lambda x: x.quantile(0.5)`. - * Is a user defined aggregation function that operates on - group values. In this case, the output dtype must be - specified in the `kwargs` dictionary. - \*\*kwargs : dict, optional - Any keyword arguments to be passed to the op. - - Returns - ------- - Aggregation - """ - if kwargs is None: - kwargs = {} - - if isinstance(op, str): - return getattr(Aggregation, op)(**kwargs) - elif callable(op): - if op is list: - return Aggregation.collect() - elif "dtype" in kwargs: - return Aggregation.from_udf(op, **kwargs) - else: - return op(Aggregation) - raise TypeError(f"Unknown aggregation {op}") diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index 4e712be6738..80a77ef2267 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -20,7 +20,7 @@ from cudf._lib.scalar import as_device_scalar import pylibcudf -from cudf._lib.aggregation import make_aggregation +from cudf.core._internals.aggregation import make_aggregation # The sets below define the possible aggregations that can be performed on # different dtypes. These strings must be elements of the AggregationKind enum. diff --git a/python/cudf/cudf/_lib/reduce.pyx b/python/cudf/cudf/_lib/reduce.pyx index 944753d28b8..2850cab93a1 100644 --- a/python/cudf/cudf/_lib/reduce.pyx +++ b/python/cudf/cudf/_lib/reduce.pyx @@ -10,7 +10,7 @@ from cudf._lib.types cimport dtype_to_pylibcudf_type, is_decimal_type_id import pylibcudf -from cudf._lib.aggregation import make_aggregation +from cudf.core._internals.aggregation import make_aggregation @acquire_spill_lock() diff --git a/python/cudf/cudf/core/_internals/aggregation.py b/python/cudf/cudf/core/_internals/aggregation.py new file mode 100644 index 00000000000..fe8ea5a947a --- /dev/null +++ b/python/cudf/cudf/core/_internals/aggregation.py @@ -0,0 +1,288 @@ +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +from numba.np import numpy_support + +import pylibcudf as plc + +import cudf +from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES +from cudf.api.types import is_scalar +from cudf.utils import cudautils + +if TYPE_CHECKING: + from collections.abc import Callable + + from typing_extensions import Self + +_agg_name_map = { + "COUNT_VALID": "COUNT", + "COUNT_ALL": "SIZE", + "VARIANCE": "VAR", + "NTH_ELEMENT": "NTH", + "COLLECT_LIST": "COLLECT", + "COLLECT_SET": "UNIQUE", +} + + +class Aggregation: + def __init__(self, agg: plc.aggregation.Aggregation) -> None: + self.c_obj = agg + + @property + def kind(self) -> str: + name = self.c_obj.kind().name + return _agg_name_map.get(name, name) + + @classmethod + def sum(cls) -> Self: + return cls(plc.aggregation.sum()) + + @classmethod + def min(cls) -> Self: + return cls(plc.aggregation.min()) + + @classmethod + def max(cls) -> Self: + return cls(plc.aggregation.max()) + + @classmethod + def idxmin(cls) -> Self: + return cls(plc.aggregation.argmin()) + + @classmethod + def idxmax(cls) -> Self: + return cls(plc.aggregation.argmax()) + + @classmethod + def mean(cls) -> Self: + return cls(plc.aggregation.mean()) + + @classmethod + def count(cls, dropna: bool = True) -> Self: + return cls( + plc.aggregation.count( + plc.types.NullPolicy.EXCLUDE + if dropna + else plc.types.NullPolicy.INCLUDE + ) + ) + + @classmethod + def ewma(cls, com: float = 1.0, adjust: bool = True) -> Self: + return cls( + plc.aggregation.ewma( + com, + plc.aggregation.EWMHistory.INFINITE + if adjust + else plc.aggregation.EWMHistory.FINITE, + ) + ) + + @classmethod + def size(cls) -> Self: + return cls(plc.aggregation.count(plc.types.NullPolicy.INCLUDE)) + + @classmethod + def collect(cls) -> Self: + return cls(plc.aggregation.collect_list(plc.types.NullPolicy.INCLUDE)) + + @classmethod + def nunique(cls, dropna: bool = True) -> Self: + return cls( + plc.aggregation.nunique( + plc.types.NullPolicy.EXCLUDE + if dropna + else plc.types.NullPolicy.INCLUDE + ) + ) + + @classmethod + def nth(cls, size: int) -> Self: + return cls(plc.aggregation.nth_element(size)) + + @classmethod + def product(cls) -> Self: + return cls(plc.aggregation.product()) + + prod = product + + @classmethod + def sum_of_squares(cls) -> Self: + return cls(plc.aggregation.sum_of_squares()) + + @classmethod + def var(cls, ddof: int = 1) -> Self: + return cls(plc.aggregation.variance(ddof)) + + @classmethod + def std(cls, ddof: int = 1) -> Self: + return cls(plc.aggregation.std(ddof)) + + @classmethod + def median(cls) -> Self: + return cls(plc.aggregation.median()) + + @classmethod + def quantile( + cls, + q: float | list[float] = 0.5, + interpolation: Literal[ + "linear", "lower", "higher", "midpoint", "nearest" + ] = "linear", + ) -> Self: + return cls( + plc.aggregation.quantile( + [q] if is_scalar(q) else q, + plc.types.Interpolation[interpolation.upper()], + ) + ) + + @classmethod + def unique(cls) -> Self: + return cls( + plc.aggregation.collect_set( + plc.types.NullPolicy.INCLUDE, + plc.types.NullEquality.EQUAL, + plc.types.NanEquality.ALL_EQUAL, + ) + ) + + @classmethod + def first(cls) -> Self: + return cls( + plc.aggregation.nth_element(0, plc.types.NullPolicy.EXCLUDE) + ) + + @classmethod + def last(cls) -> Self: + return cls( + plc.aggregation.nth_element(-1, plc.types.NullPolicy.EXCLUDE) + ) + + @classmethod + def corr(cls, method, min_periods) -> Self: + return cls( + plc.aggregation.correlation( + plc.aggregation.CorrelationType[method.upper()], min_periods + ) + ) + + @classmethod + def cov(cls, min_periods: int, ddof: int = 1) -> Self: + return cls(plc.aggregation.covariance(min_periods, ddof)) + + # scan aggregations + @classmethod + def cumcount(cls) -> Self: + return cls.count(False) + + cumsum = sum + cummin = min + cummax = max + cumprod = product + + @classmethod + def rank( + cls, + method: Literal["first", "average", "min", "max", "dense"], + ascending: bool, + na_option: Literal["keep", "top", "bottom"], + pct: bool, + ) -> Self: + return cls( + plc.aggregation.rank( + plc.aggregation.RankMethod[method.upper()], + ( + plc.types.Order.ASCENDING + if ascending + else plc.types.Order.DESCENDING + ), + ( + plc.types.NullPolicy.EXCLUDE + if na_option == "keep" + else plc.types.NullPolicy.INCLUDE + ), + ( + plc.types.NullOrder.BEFORE + if (na_option == "top") == ascending + else plc.types.NullOrder.AFTER + ), + ( + plc.aggregation.RankPercentage.ZERO_NORMALIZED + if pct + else plc.aggregation.RankPercentage.NONE + ), + ) + ) + + # Reduce aggregations + @classmethod + def any(cls) -> Self: + return cls(plc.aggregation.any()) + + @classmethod + def all(cls) -> Self: + return cls(plc.aggregation.all()) + + # Rolling aggregations + @classmethod + def from_udf(cls, op, *args, **kwargs) -> Self: + # Handling UDF type + nb_type = numpy_support.from_dtype(kwargs["dtype"]) + type_signature = (nb_type[:],) + ptx_code, output_dtype = cudautils.compile_udf(op, type_signature) + output_np_dtype = cudf.dtype(output_dtype) + if output_np_dtype not in SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES: + raise TypeError( + f"Result of window function has unsupported dtype {op[1]}" + ) + + return cls( + plc.aggregation.udf( + ptx_code, + plc.DataType( + SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES[output_np_dtype] + ), + ) + ) + + +def make_aggregation( + op: str | Callable, kwargs: dict | None = None +) -> Aggregation: + r""" + Parameters + ---------- + op : str or callable + If callable, must meet one of the following requirements: + + * Is of the form lambda x: x.agg(*args, **kwargs), where + `agg` is the name of a supported aggregation. Used to + to specify aggregations that take arguments, e.g., + `lambda x: x.quantile(0.5)`. + * Is a user defined aggregation function that operates on + group values. In this case, the output dtype must be + specified in the `kwargs` dictionary. + \*\*kwargs : dict, optional + Any keyword arguments to be passed to the op. + + Returns + ------- + Aggregation + """ + if kwargs is None: + kwargs = {} + + if isinstance(op, str): + return getattr(Aggregation, op)(**kwargs) + elif callable(op): + if op is list: + return Aggregation.collect() + elif "dtype" in kwargs: + return Aggregation.from_udf(op, **kwargs) + else: + return op(Aggregation) + raise TypeError(f"Unknown aggregation {op}") diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index d2cb5e8c190..a580c35ccbf 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -12,8 +12,8 @@ import cudf from cudf import _lib as libcudf -from cudf._lib.aggregation import make_aggregation from cudf.api.types import is_integer, is_number +from cudf.core._internals.aggregation import make_aggregation from cudf.core.buffer import acquire_spill_lock from cudf.core.column.column import as_column from cudf.core.mixins import Reducible