-
-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: minimal support for dask.dataframe query planning (dask-expr) #285
ENH: minimal support for dask.dataframe query planning (dask-expr) #285
Conversation
0699da4
to
80976e6
Compare
@jorisvandenbossche can you ping me once you want a review of this? I'll do some reading on dask-expr in the meantime. |
I think this should be ready enough for a review |
I spent a bit of time looking at the split-out failure, but didn't make much progress. I'll take another look later. |
Hi ,thanks for the efforts on supporting |
Some discussion at dask/dask-expr#1024. |
@jorisvandenbossche I can't push to your branch, but this diff fixes the test diff --git a/dask_geopandas/expr.py b/dask_geopandas/expr.py
index 99d71d2..136953d 100644
--- a/dask_geopandas/expr.py
+++ b/dask_geopandas/expr.py
@@ -200,6 +200,7 @@ class _Frame(dx.FrameBase, OperatorMethodMixin):
@classmethod
def _bind_elemwise_operator_method(cls, name, op, original, *args, **kwargs):
"""bind operator method like GeoSeries.distance to this class"""
+
# name must be explicitly passed for div method whose name is truediv
def meth(self, other, *args, **kwargs):
meta = _emulate(op, self, other)
@@ -505,7 +506,6 @@ class _Frame(dx.FrameBase, OperatorMethodMixin):
return distances
def geohash(self, as_string=True, precision=12):
-
"""
Calculate geohash based on the middle points of the geometry bounds
for a given precision.
@@ -842,7 +842,7 @@ def from_dask_dataframe(df, geometry=None):
# it via a keyword-argument due to https://github.com/dask/dask/issues/8308.
# Instead, we assign the geometry column using regular dataframe operations,
# then refer to that column by name in `map_partitions`.
- if isinstance(geometry, dd.core.Series):
+ if isinstance(geometry, dx.Series):
name = geometry.name if geometry.name is not None else "geometry"
return df.assign(**{name: geometry}).map_partitions(
geopandas.GeoDataFrame, geometry=name
diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py
index b28a0c7..fbde582 100644
--- a/dask_geopandas/tests/test_core.py
+++ b/dask_geopandas/tests/test_core.py
@@ -390,27 +390,44 @@ def test_rename_geometry_error(geodf_points):
dask_obj.rename_geometry("value1")
-# TODO to_dask_dataframe is now defined on the dask-expr collection, converting
-# to an old-style dd.core.DataFrame (so doing something different as we did here)
-@pytest.mark.xfail(
- dask_geopandas.backends.QUERY_PLANNING_ON, reason="Need to update test for expr"
-)
def test_from_dask_dataframe_with_dask_geoseries():
df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
dask_obj = dd.from_pandas(df, npartitions=2)
dask_obj = dask_geopandas.from_dask_dataframe(
dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y")
)
- # Check that the geometry isn't concatenated and embedded a second time in
- # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197
- k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame"))
- deps = dask_obj.dask.dependencies[k]
- assert len(deps) == 1
+
+ if dask_geopandas.backends.QUERY_PLANNING_ON:
+ deps = dask_obj.expr.dependencies()
+ assert len(deps) == 1
+ dep = deps[0]
+ other = list(dask_obj.dask.values())[0][3]["geometry"].dependencies()[0]
+ assert dep is other
+
+ else:
+ # Check that the geometry isn't concatenated and embedded a second time in
+ # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197
+ k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame"))
+ deps = dask_obj.dask.dependencies[k]
+ assert len(deps) == 1
expected = df.set_geometry(geopandas.points_from_xy(df["x"], df["y"]))
+ dask_obj.geometry.compute()
assert_geoseries_equal(dask_obj.geometry.compute(), expected.geometry)
+def test_set_geometry_to_dask_series():
+ df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
+
+ dask_obj = dd.from_pandas(df, npartitions=2)
+ dask_obj = dask_geopandas.from_dask_dataframe(
+ dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y")
+ )
+ expected = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.x, df.y))
+ result = dask_obj.geometry.compute()
+ assert_geoseries_equal(result, expected.geometry)
+
+
def test_from_dask_dataframe_with_column_name():
df = pd.DataFrame({"x": [0, 1, 2, 3], "y": [1, 2, 3, 4]})
df["geoms"] = geopandas.points_from_xy(df["x"], df["y"]) |
PR to your branch at jorisvandenbossche#1 that includes that diff and a fix for a couple more issues. The final xfail with dask-expr is from |
@TomAugspurger thanks for the fixes! |
dask_geopandas/expr.py
Outdated
def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs): | ||
def to_dask_dataframe(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger I renamed this back to the original name, but now this is fine because dask-expr upstream renamed the to_dask_dataframe
they added to to_legacy_dataframe
, so there is no longer a name clash.
to_legacy_dataframe
will convert to a legacy dask.dataframe collection, which is needed for certain implementations that still use the original implementation (e.g. dask still does this for parquet IO right now).
While our to_dask_dataframe
is meant to convert your dask-geopandas object to a dask object (regardless of it being a legacy collection or a new expression).
(and so naming this implementation to_legacy_dataframe
actually broke the parquet tests, because it is not doing what dask is expecting, i.e. it doesn't return a legacy collection, just the same object but where the partitions are pd.DataFrames instead of GeoDataFrames)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, makes sense, thanks.
@martinfleis unless you have time on the short term, I would suggest we already merge this to have a working main branch with latest dask, and further fixes/improvements can be done in follow-ups |
@jorisvandenbossche Go ahead |
Tackling #284, for now in a minimal way, i.e. keeping as much as possible the current code based on "legacy" dask and then turn that into an expression.
For example, for sjoin/clip/file IO, we manually build up the graph to create a collection using HighLevelGraph, for now this graph is just turned into an expression with
from_graph
. Long term, we can turn each of those into proper expression classes.I am not yet fully sure how to best organize the code. For now, the new file
expr.py
is essentially duplicating a lot ofcore.py
. I organized the commits such that the first just does this copy, and then you can look at the second commit to see the actual changes that were needed tocore.py
(inexpr.py
) to make it work with dask-expr -> d28521fThe problem is that I can't do this directly, because we want to keep supporting legacy dask for a while as well (our code is actually also still using this for intermediate results)
Some other remarks:
split_out
keyword indissolve
), this is xfailed for nowspatial_partitions
handling is not yet very robust. For now this just takes the same approach as incore.py
to attach this to the collection. But longer term we should attach this to the underlying expression of the collection. The current version "works" (the tests are passing), as long as you only do an operation that needs them directly after an operation that sets them.map_partitions
like before. Longer term, we should turn those into custom expressions (this can then also allow better optimizations in the expression tree)Closes #287