From ae268249f948e9d053ad7faf8bfa1a246b237dd9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 31 Jul 2024 12:29:36 -0700 Subject: [PATCH 1/4] add new ToBackend expr --- dask_expr/_backends.py | 14 +++++++++++++- dask_expr/_expr.py | 11 +++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 9913f372..450397b5 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -5,7 +5,9 @@ from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint +from dask_expr._collection import new_collection from dask_expr._dispatch import get_collection_type +from dask_expr._expr import ToBackend try: import sparse @@ -32,6 +34,14 @@ ) +class ToPandasBackend(ToBackend): + @staticmethod + def operation(df, options): + from dask.dataframe.dispatch import to_pandas_dispatch + + return to_pandas_dispatch(df, **options) + + class PandasBackendEntrypoint(DataFrameBackendEntrypoint): """Pandas-Backend Entrypoint Class for Dask-Expressions @@ -50,7 +60,9 @@ def to_backend(cls, data, **kwargs): if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)): # Already a pandas-backed collection return data - return data.map_partitions(cls.to_backend_dispatch(), **kwargs) + return new_collection( + ToPandasBackend(data, kwargs) + ) # data.map_partitions(cls.to_backend_dispatch(), **kwargs) dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint()) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index c0366432..9d342d41 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1303,6 +1303,17 @@ def operation(df): return df.copy(deep=True) +class ToBackend(Elemwise): + _parameters = ["frame", "options"] + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True + + @staticmethod + def operation(df, options): + raise NotImplementedError() + + class RenameSeries(Elemwise): _parameters = ["frame", "index", "sorted_index"] _defaults = {"sorted_index": False} From 96e2681bdd991b9a7a1fb78199bbb7b6968a721b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 31 Jul 2024 13:37:44 -0700 Subject: [PATCH 2/4] add test coverage --- dask_expr/_backends.py | 21 ++++++++------------- dask_expr/tests/test_collection.py | 9 +++++++++ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index 450397b5..c0608b4e 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -5,7 +5,6 @@ from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint -from dask_expr._collection import new_collection from dask_expr._dispatch import get_collection_type from dask_expr._expr import ToBackend @@ -41,6 +40,11 @@ def operation(df, options): return to_pandas_dispatch(df, **options) + def _simplify_down(self): + if isinstance(self.frame._meta, (pd.DataFrame, pd.Series, pd.Index)): + # We already have pandas data + return self.frame + class PandasBackendEntrypoint(DataFrameBackendEntrypoint): """Pandas-Backend Entrypoint Class for Dask-Expressions @@ -49,20 +53,11 @@ class PandasBackendEntrypoint(DataFrameBackendEntrypoint): and registered 'in-place'. """ - @classmethod - def to_backend_dispatch(cls): - from dask.dataframe.dispatch import to_pandas_dispatch - - return to_pandas_dispatch - @classmethod def to_backend(cls, data, **kwargs): - if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)): - # Already a pandas-backed collection - return data - return new_collection( - ToPandasBackend(data, kwargs) - ) # data.map_partitions(cls.to_backend_dispatch(), **kwargs) + from dask_expr._collection import new_collection + + return new_collection(ToPandasBackend(data, kwargs)) dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint()) diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index f59559ee..1d5a341a 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -2665,3 +2665,12 @@ def test_empty_from_pandas_projection(): df["foo"] = from_pandas(foo, npartitions=1) pdf["foo"] = foo assert_eq(df["foo"], pdf["foo"]) + + +def test_to_backend_simplify(): + with dask.config.set({"dataframe.backend": "pandas"}): + df = from_dict({"x": [1, 2, 3], "y": [4, 5, 6]}, npartitions=2) + df2 = df.to_backend("pandas")[["y"]] + assert str(df2.expr) != str(df[["y"]].expr) + df3 = df2.simplify() + assert str(df3.expr) == str(df[["y"]].expr) From 5af626a745c692fe9be6212577b4c6e498372bad Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 13 Aug 2024 07:17:47 -0700 Subject: [PATCH 3/4] address code review --- dask_expr/_backends.py | 3 +-- dask_expr/_expr.py | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/dask_expr/_backends.py b/dask_expr/_backends.py index c0608b4e..299f3eaf 100644 --- a/dask_expr/_backends.py +++ b/dask_expr/_backends.py @@ -4,6 +4,7 @@ import pandas as pd from dask.backends import CreationDispatch from dask.dataframe.backends import DataFrameBackendEntrypoint +from dask.dataframe.dispatch import to_pandas_dispatch from dask_expr._dispatch import get_collection_type from dask_expr._expr import ToBackend @@ -36,8 +37,6 @@ class ToPandasBackend(ToBackend): @staticmethod def operation(df, options): - from dask.dataframe.dispatch import to_pandas_dispatch - return to_pandas_dispatch(df, **options) def _simplify_down(self): diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 9d342d41..986e65ad 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1304,15 +1304,12 @@ def operation(df): class ToBackend(Elemwise): + operation = None _parameters = ["frame", "options"] _projection_passthrough = True _filter_passthrough = True _preserves_partitioning_information = True - @staticmethod - def operation(df, options): - raise NotImplementedError() - class RenameSeries(Elemwise): _parameters = ["frame", "index", "sorted_index"] From 6399a28caa6afe2734b03ceb0458dc94abd8b589 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 13 Aug 2024 07:19:23 -0700 Subject: [PATCH 4/4] remove redundant operation def --- dask_expr/_expr.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 986e65ad..ba470fa6 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -1304,7 +1304,6 @@ def operation(df): class ToBackend(Elemwise): - operation = None _parameters = ["frame", "options"] _projection_passthrough = True _filter_passthrough = True