You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We're seeing some consistent unit test failures with dask-expr 1.0.12. See the following example
tests/tape_tests/test_ensemble.py:1944:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:417: in __repr__
data = self._repr_data().to_string(max_rows=5)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:3993: in _repr_data
index = self._repr_divisions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:[257](https://github.com/lincc-frameworks/tape/actions/runs/8821291865/job/24216829647#step:5:258)2: in _repr_divisions
name = f"npartitions={self.npartitions}"
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:344: in npartitions
return self.expr.npartitions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:647: in _divisions
return _get_divisions_map_partitions(
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in _get_divisions_map_partitions
divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in <genexpr>
divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:528: in _divisions
if not self._broadcast_dep(arg):
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:519: in _broadcast_dep
return dep.npartitions == 1 and dep.ndim < self.ndim
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:901: in _divisions
if self.need_to_shuffle:
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:920: in need_to_shuffle
if any(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.0 = <set_iterator object at 0x7f43fc3c36c0>
if any(
> set(self._by_columns) >= set(cols)
for cols in self.frame.unique_partition_mapping_columns_from_shuffle
):
E TypeError: 'NoneType' object is not iterable
We're also seeing more intermittent failures on earlier versions, but it is less clear if this is tied to specific versions of dask-expr or if dask-expr is involved. Example
________________________ test_batch_by_band[bounds-on1] ________________________
parquet_ensemble = <tape.ensemble.Ensemble object at 0x7fbac85f8250>
func_label = 'bounds', on = ['ps1_objid', 'filterName']
@pytest.mark.parametrize("on", [None, ["ps1_objid", "filterName"], ["filterName", "ps1_objid"]])
@pytest.mark.parametrize("func_label", ["mean", "bounds"])
def test_batch_by_band(parquet_ensemble, func_label, on):
"""
Test that ensemble.batch(by_band=True) works as intended.
"""
if func_label == "mean":
def my_mean(flux):
"""returns a single value"""
return np.mean(flux)
res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=True)
parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
filter_res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=False)
# An EnsembleFrame should be returned
assert isinstance(res, EnsembleFrame)
# Make sure we get all the expected columns
assert all([col in res.columns for col in ["result_g", "result_r"]])
# These should be equivalent
# [expr] need this TODO: investigate typing issue
filter_res.index = filter_res.index.astype("int")
assert (
res.loc[8847[293](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:294)5274829959]["result_g"]
.compute()
.equals(filter_res.loc[88472935274829959]["result"].compute())
)
elif func_label == "bounds":
def my_bounds(flux):
"""returns a series"""
return pd.Series({"min": np.min(flux), "max": np.max(flux)})
res = parquet_ensemble.batch(
my_bounds, "psFlux", on=on, by_band=True, meta={"min": float, "max": float}
)
parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
filter_res = parquet_ensemble.batch(
my_bounds, "psFlux", on=on, by_band=False, meta={"min": float, "max": float}
)
# An EnsembleFrame should be returned
assert isinstance(res, EnsembleFrame)
# Make sure we get all the expected columns
assert all([col in res.columns for col in ["max_g", "max_r", "min_g", "min_r"]])
# These should be equivalent
# [expr] need this TODO: investigate typing issue
filter_res.index = filter_res.index.astype("int")
> assert (
res.loc[884729352748[299](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:300)59]["max_g"]
.compute()
.equals(filter_res.loc[88472935274829959]["max"].compute())
)
E AssertionError: assert False
E + where False = <bound method NDFrame.equals of ps1_objid\n88472935274829959 0.0\nName: max_g, dtype: float32>(ps1_objid\n88472935274829959 NaN\nName: max, dtype: float32)
E + where <bound method NDFrame.equals of ps1_objid\n88472935274829959 0.0\nName: max_g, dtype: float32> = ps1_objid\n88472935274829959 0.0\nName: max_g, dtype: float32.equals
E + where ps1_objid\n88472935274829959 0.0\nName: max_g, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']>()
E + where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']> = Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g'].compute
E + and ps1_objid\n88472935274829959 NaN\nName: max, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']>()
E + where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']> = Dask Series Structure:\nnpartitions=1\n object\n ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=A...ns(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max'].compute
The text was updated successfully, but these errors were encountered:
So #433 prevents the first error from showing up in our project but the latter error seems to have been present since the initial PR that added dask-expr to TAPE. Some useful examples
On the current build, if I remove all tests except for the failing test, test_batch_by_band we don't typically see failures. An example of this is https://github.com/lincc-frameworks/tape/pull/435/files. Whatever flakiness is happening seems to be related to maybe test order or concurrent tests
We're seeing some consistent unit test failures with dask-expr 1.0.12. See the following example
We're also seeing more intermittent failures on earlier versions, but it is less clear if this is tied to specific versions of dask-expr or if dask-expr is involved. Example
The text was updated successfully, but these errors were encountered: