From 60853f61cb3eb56b09296c9dc035f81b670d47ef Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 29 Oct 2024 16:31:23 -0700 Subject: [PATCH] fix patching --- python/dask_cudf/dask_cudf/__init__.py | 2 + python/dask_cudf/dask_cudf/_expr.py | 248 +++++++++++++------------ 2 files changed, 130 insertions(+), 120 deletions(-) diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index c40b36e52cf..496d5d39dea 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -46,10 +46,12 @@ def inner_func(*args, **kwargs): if QUERY_PLANNING_ON: from ._collection import DataFrame, Index, Series # noqa: E402 + from ._expr import _patch_dask_expr groupby_agg = raise_not_implemented_error("groupby_agg") read_text = DataFrame.read_text to_orc = raise_not_implemented_error("to_orc") + _patch_dask_expr() else: from .legacy.core import DataFrame, Index, Series # noqa: F401 diff --git a/python/dask_cudf/dask_cudf/_expr.py b/python/dask_cudf/dask_cudf/_expr.py index 7f2b8a9f7e7..f1e681a00c5 100644 --- a/python/dask_cudf/dask_cudf/_expr.py +++ b/python/dask_cudf/dask_cudf/_expr.py @@ -62,134 +62,142 @@ def _simplify_down(self): ## -# This can be removed after cudf#15176 is addressed. -# See: https://github.com/rapidsai/cudf/issues/15176 -class PatchCumulativeBlockwise(CumulativeBlockwise): - @property - def _args(self) -> list: - return self.operands[:1] - - @property - def _kwargs(self) -> dict: - # Must pass axis and skipna as kwargs in cudf - return {"axis": self.axis, "skipna": self.skipna} - - -CumulativeBlockwise._args = PatchCumulativeBlockwise._args -CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs - - -# The upstream Var code uses `Series.values`, and relies on numpy -# for most of the logic. Unfortunately, cudf -> cupy conversion -# is not supported for data containing null values. Therefore, -# we must implement our own version of Var for now. This logic -# is mostly copied from dask-cudf. - - -class VarCudf(Reduction): - # Uses the parallel version of Welford's online algorithm (Chan '79) - # (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf) - _parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"] - _defaults = { - "skipna": True, - "ddof": 1, - "numeric_only": False, - "split_every": False, - } - - @functools.cached_property - def _meta(self): - return make_meta( - meta_nonempty(self.frame._meta).var( - skipna=self.skipna, numeric_only=self.numeric_only +def _patch_dask_expr(): + # This can be removed after cudf#15176 is addressed. + # See: https://github.com/rapidsai/cudf/issues/15176 + class PatchCumulativeBlockwise(CumulativeBlockwise): + @property + def _args(self) -> list: + return self.operands[:1] + + @property + def _kwargs(self) -> dict: + # Must pass axis and skipna as kwargs in cudf + return {"axis": self.axis, "skipna": self.skipna} + + CumulativeBlockwise._args = PatchCumulativeBlockwise._args + CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs + + # The upstream Var code uses `Series.values`, and relies on numpy + # for most of the logic. Unfortunately, cudf -> cupy conversion + # is not supported for data containing null values. Therefore, + # we must implement our own version of Var for now. This logic + # is mostly copied from dask-cudf. + + class VarCudf(Reduction): + # Uses the parallel version of Welford's online algorithm (Chan '79) + # (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf) + _parameters = [ + "frame", + "skipna", + "ddof", + "numeric_only", + "split_every", + ] + _defaults = { + "skipna": True, + "ddof": 1, + "numeric_only": False, + "split_every": False, + } + + @functools.cached_property + def _meta(self): + return make_meta( + meta_nonempty(self.frame._meta).var( + skipna=self.skipna, numeric_only=self.numeric_only + ) ) - ) - @property - def chunk_kwargs(self): - return dict(skipna=self.skipna, numeric_only=self.numeric_only) - - @property - def combine_kwargs(self): - return {} - - @property - def aggregate_kwargs(self): - return dict(ddof=self.ddof) - - @classmethod - def reduction_chunk(cls, x, skipna=True, numeric_only=False): - kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {} - if skipna or numeric_only: - n = x.count(**kwargs) - kwargs["skipna"] = skipna - avg = x.mean(**kwargs) - else: - # Not skipping nulls, so might as well - # avoid the full `count` operation - n = len(x) - kwargs["skipna"] = skipna - avg = x.sum(**kwargs) / n - if numeric_only: - # Workaround for cudf bug - # (see: https://github.com/rapidsai/cudf/issues/13731) - x = x[n.index] - m2 = ((x - avg) ** 2).sum(**kwargs) - return n, avg, m2 - - @classmethod - def reduction_combine(cls, parts): - n, avg, m2 = parts[0] - for i in range(1, len(parts)): - n_a, avg_a, m2_a = n, avg, m2 - n_b, avg_b, m2_b = parts[i] - n = n_a + n_b - avg = (n_a * avg_a + n_b * avg_b) / n - delta = avg_b - avg_a - m2 = m2_a + m2_b + delta**2 * n_a * n_b / n - return n, avg, m2 - - @classmethod - def reduction_aggregate(cls, vals, ddof=1): - vals = cls.reduction_combine(vals) - n, _, m2 = vals - return m2 / (n - ddof) - - -def _patched_var( - self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False -): - if axis == 0: - if hasattr(self._meta, "to_pandas"): - return VarCudf(self, skipna, ddof, numeric_only, split_every) - else: - return Var(self, skipna, ddof, numeric_only, split_every) - elif axis == 1: - return VarColumns(self, skipna, ddof, numeric_only) - else: - raise ValueError(f"axis={axis} not supported. Please specify 0 or 1") + @property + def chunk_kwargs(self): + return dict(skipna=self.skipna, numeric_only=self.numeric_only) + @property + def combine_kwargs(self): + return {} -Expr.var = _patched_var + @property + def aggregate_kwargs(self): + return dict(ddof=self.ddof) + @classmethod + def reduction_chunk(cls, x, skipna=True, numeric_only=False): + kwargs = ( + {"numeric_only": numeric_only} if is_dataframe_like(x) else {} + ) + if skipna or numeric_only: + n = x.count(**kwargs) + kwargs["skipna"] = skipna + avg = x.mean(**kwargs) + else: + # Not skipping nulls, so might as well + # avoid the full `count` operation + n = len(x) + kwargs["skipna"] = skipna + avg = x.sum(**kwargs) / n + if numeric_only: + # Workaround for cudf bug + # (see: https://github.com/rapidsai/cudf/issues/13731) + x = x[n.index] + m2 = ((x - avg) ** 2).sum(**kwargs) + return n, avg, m2 + + @classmethod + def reduction_combine(cls, parts): + n, avg, m2 = parts[0] + for i in range(1, len(parts)): + n_a, avg_a, m2_a = n, avg, m2 + n_b, avg_b, m2_b = parts[i] + n = n_a + n_b + avg = (n_a * avg_a + n_b * avg_b) / n + delta = avg_b - avg_a + m2 = m2_a + m2_b + delta**2 * n_a * n_b / n + return n, avg, m2 + + @classmethod + def reduction_aggregate(cls, vals, ddof=1): + vals = cls.reduction_combine(vals) + n, _, m2 = vals + return m2 / (n - ddof) + + def _patched_var( + self, + axis=0, + skipna=True, + ddof=1, + numeric_only=False, + split_every=False, + ): + if axis == 0: + if hasattr(self._meta, "to_pandas"): + return VarCudf(self, skipna, ddof, numeric_only, split_every) + else: + return Var(self, skipna, ddof, numeric_only, split_every) + elif axis == 1: + return VarColumns(self, skipna, ddof, numeric_only) + else: + raise ValueError( + f"axis={axis} not supported. Please specify 0 or 1" + ) -# Temporary work-around for missing cudf + categorical support -# See: https://github.com/rapidsai/cudf/issues/11795 -# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06 - -_original_get_divisions = _shuffle_module._get_divisions + Expr.var = _patched_var + # Temporary work-around for missing cudf + categorical support + # See: https://github.com/rapidsai/cudf/issues/11795 + # TODO: Fix RepartitionQuantiles and remove this in cudf>24.06 -def _patched_get_divisions(frame, other, *args, **kwargs): - # NOTE: The following two lines contains the "patch" - # (we simply convert the partitioning column to pandas) - if is_categorical_dtype(other._meta.dtype) and hasattr( - other.frame._meta, "to_pandas" - ): - other = new_collection(other).to_backend("pandas")._expr + _original_get_divisions = _shuffle_module._get_divisions - # Call "original" function - return _original_get_divisions(frame, other, *args, **kwargs) + def _patched_get_divisions(frame, other, *args, **kwargs): + # NOTE: The following two lines contains the "patch" + # (we simply convert the partitioning column to pandas) + if is_categorical_dtype(other._meta.dtype) and hasattr( + other.frame._meta, "to_pandas" + ): + other = new_collection(other).to_backend("pandas")._expr + # Call "original" function + return _original_get_divisions(frame, other, *args, **kwargs) -_shuffle_module._get_divisions = _patched_get_divisions + _shuffle_module._get_divisions = _patched_get_divisions