Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential dask-expr compatitbility errors #434

Open
wilsonbb opened this issue Apr 24, 2024 · 1 comment
Open

Potential dask-expr compatitbility errors #434

wilsonbb opened this issue Apr 24, 2024 · 1 comment

Comments

@wilsonbb
Copy link
Collaborator

We're seeing some consistent unit test failures with dask-expr 1.0.12. See the following example

tests/tape_tests/test_ensemble.py:1944: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:417: in __repr__
    data = self._repr_data().to_string(max_rows=5)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:3993: in _repr_data
    index = self._repr_divisions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:[257](https://github.com/lincc-frameworks/tape/actions/runs/8821291865/job/24216829647#step:5:258)2: in _repr_divisions
    name = f"npartitions={self.npartitions}"
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_collection.py:344: in npartitions
    return self.expr.npartitions
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:647: in _divisions
    return _get_divisions_map_partitions(
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in _get_divisions_map_partitions
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask/dataframe/core.py:7330: in <genexpr>
    divisions = max((d.divisions for d in dfs), key=len)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:528: in _divisions
    if not self._broadcast_dep(arg):
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:519: in _broadcast_dep
    return dep.npartitions == 1 and dep.ndim < self.ndim
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:397: in npartitions
    return len(self.divisions) - 1
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_expr.py:381: in divisions
    return tuple(self._divisions())
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:901: in _divisions
    if self.need_to_shuffle:
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/functools.py:1001: in __get__
    val = self.func(instance)
/opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/dask_expr/_groupby.py:920: in need_to_shuffle
    if any(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <set_iterator object at 0x7f43fc3c36c0>

    if any(
>       set(self._by_columns) >= set(cols)
        for cols in self.frame.unique_partition_mapping_columns_from_shuffle
    ):
E   TypeError: 'NoneType' object is not iterable

We're also seeing more intermittent failures on earlier versions, but it is less clear if this is tied to specific versions of dask-expr or if dask-expr is involved. Example


________________________ test_batch_by_band[bounds-on1] ________________________

parquet_ensemble = <tape.ensemble.Ensemble object at 0x7fbac85f8250>
func_label = 'bounds', on = ['ps1_objid', 'filterName']

    @pytest.mark.parametrize("on", [None, ["ps1_objid", "filterName"], ["filterName", "ps1_objid"]])
    @pytest.mark.parametrize("func_label", ["mean", "bounds"])
    def test_batch_by_band(parquet_ensemble, func_label, on):
        """
        Test that ensemble.batch(by_band=True) works as intended.
        """
    
        if func_label == "mean":
    
            def my_mean(flux):
                """returns a single value"""
                return np.mean(flux)
    
            res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=True)
    
            parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
            filter_res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=False)
    
            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)
    
            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["result_g", "result_r"]])
    
            # These should be equivalent
            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")
            assert (
                res.loc[8847[293](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:294)5274829959]["result_g"]
                .compute()
                .equals(filter_res.loc[88472935274829959]["result"].compute())
            )
    
        elif func_label == "bounds":
    
            def my_bounds(flux):
                """returns a series"""
                return pd.Series({"min": np.min(flux), "max": np.max(flux)})
    
            res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=True, meta={"min": float, "max": float}
            )
    
            parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble()
            filter_res = parquet_ensemble.batch(
                my_bounds, "psFlux", on=on, by_band=False, meta={"min": float, "max": float}
            )
    
            # An EnsembleFrame should be returned
            assert isinstance(res, EnsembleFrame)
    
            # Make sure we get all the expected columns
            assert all([col in res.columns for col in ["max_g", "max_r", "min_g", "min_r"]])
    
            # These should be equivalent
    
            # [expr] need this TODO: investigate typing issue
            filter_res.index = filter_res.index.astype("int")
    
>           assert (
                res.loc[884729352748[299](https://github.com/lincc-frameworks/tape/actions/runs/8821459751/job/24217361704?pr=433#step:5:300)59]["max_g"]
                .compute()
                .equals(filter_res.loc[88472935274829959]["max"].compute())
            )
E           AssertionError: assert False
E            +  where False = <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32>(ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32)
E            +    where <bound method NDFrame.equals of ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32> = ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32.equals
E            +      where ps1_objid\n88472935274829959    0.0\nName: max_g, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']>()
E            +        where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...Expr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=MapPartitions(TapeFrame), iindexer=slice(88472935274829959, 88472935274829959, None)))['max_g'].compute
E            +    and   ps1_objid\n88472935274829959   NaN\nName: max, dtype: float32 = <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']>()
E            +      where <bound method FrameBase.compute of Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 e...s(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max']> = Dask Series Structure:\nnpartitions=1\n    object\n       ...\nDask Name: getitem, 11 expressions\nExpr=(LocUnknown(frame=A...ns(TapeFrame), other='ps1_objid')), dtypes='int')), iindexer=slice(88472935274829959, 88472935274829959, None)))['max'].compute
@wilsonbb
Copy link
Collaborator Author

So #433 prevents the first error from showing up in our project but the latter error seems to have been present since the initial PR that added dask-expr to TAPE. Some useful examples

  1. PIn to old dask and dask-expr versions #436 is on an old branch only up to the commit where we merged in dask-expr to TAPE but with some changes to pin dask and dask-expr to the version we used at the time (as seen from the listed installed dependencies here https://github.com/lincc-frameworks/tape/actions/runs/8442515644/job/23124089726) and pinning to python<=3.11.8. Here we are still seeing the above failures for test_batch_by_band https://github.com/lincc-frameworks/tape/actions/runs/8853784507/job/24315393322?pr=436
  2. Test without dask-expr and without < python 3.11.9 #439 does the same changes of pinning to python<=3.11.8 and is built on the final PR to TAPE before we added dask-expr. This seems to be running without the above failures https://github.com/lincc-frameworks/tape/actions/runs/8853992787/job/24316049932?pr=439
  3. On the current build, if I remove all tests except for the failing test, test_batch_by_band we don't typically see failures. An example of this is https://github.com/lincc-frameworks/tape/pull/435/files. Whatever flakiness is happening seems to be related to maybe test order or concurrent tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant