From 37a51160b42614ef4341718c5d6a30e81d222b5e Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Aug 2024 08:02:08 -0700 Subject: [PATCH] Introduce `ToBackend` expression (#1115) --- dask_expr/_backends.py | 26 ++++++++++++++++---------- dask_expr/_expr.py | 7 +++++++ dask_expr/tests/test_collection.py | 9 +++++++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 9913f372..299f3eaf 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -4,8 +4,10 @@ import pandas as pd from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint +from dask.dataframe.dispatch import to_pandas_dispatch from dask_expr._dispatch import get_collection_type +from dask_expr._expr import ToBackend try: import sparse @@ -32,6 +34,17 @@ ) +class ToPandasBackend(ToBackend): + @staticmethod + def operation(df, options): + return to_pandas_dispatch(df, **options) + + def _simplify_down(self): + if isinstance(self.frame._meta, (pd.DataFrame, pd.Series, pd.Index)): + # We already have pandas data + return self.frame + + class PandasBackendEntrypoint(DataFrameBackendEntrypoint): """Pandas-Backend Entrypoint Class for Dask-Expressions @@ -39,18 +52,11 @@ class PandasBackendEntrypoint(DataFrameBackendEntrypoint): and registered 'in-place'. """ - @classmethod - def to_backend_dispatch(cls): - from dask.dataframe.dispatch import to_pandas_dispatch - - return to_pandas_dispatch - @classmethod def to_backend(cls, data, **kwargs): - if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)): - # Already a pandas-backed collection - return data - return data.map_partitions(cls.to_backend_dispatch(), **kwargs) + from dask_expr._collection import new_collection + + return new_collection(ToPandasBackend(data, kwargs)) dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint()) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index c0366432..ba470fa6 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1303,6 +1303,13 @@ def operation(df): return df.copy(deep=True) +class ToBackend(Elemwise): + _parameters = ["frame", "options"] + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True + + class RenameSeries(Elemwise): _parameters = ["frame", "index", "sorted_index"] _defaults = {"sorted_index": False} diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index f59559ee..1d5a341a 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -2665,3 +2665,12 @@ def test_empty_from_pandas_projection(): df["foo"] = from_pandas(foo, npartitions=1) pdf["foo"] = foo assert_eq(df["foo"], pdf["foo"]) + + +def test_to_backend_simplify(): + with dask.config.set({"dataframe.backend": "pandas"}): + df = from_dict({"x": [1, 2, 3], "y": [4, 5, 6]}, npartitions=2) + df2 = df.to_backend("pandas")[["y"]] + assert str(df2.expr) != str(df[["y"]].expr) + df3 = df2.simplify() + assert str(df3.expr) == str(df[["y"]].expr)