diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index e3f4f04eb85..344b03c631d 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -373,22 +373,37 @@ def percentile_cudf(a, q, interpolation="linear"): @pyarrow_schema_dispatch.register((cudf.DataFrame,)) -def _get_pyarrow_schema_cudf(obj, preserve_index=True, **kwargs): +def _get_pyarrow_schema_cudf(obj, preserve_index=None, **kwargs): if kwargs: warnings.warn( "Ignoring the following arguments to " f"`pyarrow_schema_dispatch`: {list(kwargs)}" ) - return meta_nonempty(obj).to_arrow(preserve_index=preserve_index).schema + + return _cudf_to_table( + meta_nonempty(obj), preserve_index=preserve_index + ).schema @to_pyarrow_table_dispatch.register(cudf.DataFrame) -def _cudf_to_table(obj, preserve_index=True, **kwargs): +def _cudf_to_table(obj, preserve_index=None, **kwargs): if kwargs: warnings.warn( "Ignoring the following arguments to " f"`to_pyarrow_table_dispatch`: {list(kwargs)}" ) + + # TODO: Remove this logic when cudf#14159 is resolved + # (see: https://github.com/rapidsai/cudf/issues/14159) + if preserve_index and isinstance(obj.index, cudf.RangeIndex): + obj = obj.copy() + obj.index.name = ( + obj.index.name + if obj.index.name is not None + else "__index_level_0__" + ) + obj.index = obj.index._as_int_index() + return obj.to_arrow(preserve_index=preserve_index) @@ -401,7 +416,15 @@ def _table_to_cudf(obj, table, self_destruct=None, **kwargs): f"Ignoring the following arguments to " f"`from_pyarrow_table_dispatch`: {list(kwargs)}" ) - return obj.from_arrow(table) + result = obj.from_arrow(table) + + # TODO: Remove this logic when cudf#14159 is resolved + # (see: https://github.com/rapidsai/cudf/issues/14159) + if "__index_level_0__" in result.index.names: + assert len(result.index.names) == 1 + result.index.name = None + + return result @union_categoricals_dispatch.register((cudf.Series, cudf.BaseIndex)) diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index e841f2d8830..d6c9c1be73c 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -6,7 +6,7 @@ import numpy as np import tlz as toolz -import dask +from dask import config from dask.base import tokenize from dask.dataframe import methods from dask.dataframe.core import DataFrame, Index, Series @@ -18,6 +18,8 @@ from cudf.api.types import is_categorical_dtype from cudf.utils.utils import _dask_cudf_nvtx_annotate +_SHUFFLE_SUPPORT = ("tasks", "p2p") # "disk" not supported + @_dask_cudf_nvtx_annotate def set_index_post(df, index_name, drop, column_dtype): @@ -307,15 +309,25 @@ def sort_values( return df4 +def get_default_shuffle_method(): + # Note that `dask.utils.get_default_shuffle_method` + # will return "p2p" by default when a distributed + # client is present. Dask-cudf supports "p2p", but + # will not use it by default (yet) + default = config.get("dataframe.shuffle.method", "tasks") + if default not in _SHUFFLE_SUPPORT: + default = "tasks" + return default + + def _get_shuffle_type(shuffle): # Utility to set the shuffle-kwarg default - # and to validate user-specified options. - # The only supported options is currently "tasks" - shuffle = shuffle or dask.config.get("shuffle", "tasks") - if shuffle != "tasks": + # and to validate user-specified options + shuffle = shuffle or get_default_shuffle_method() + if shuffle not in _SHUFFLE_SUPPORT: raise ValueError( - f"Dask-cudf only supports in-memory shuffling with " - f"'tasks'. Got shuffle={shuffle}" + "Dask-cudf only supports the following shuffle " + f"methods: {_SHUFFLE_SUPPORT}. Got shuffle={shuffle}" ) return shuffle diff --git a/python/dask_cudf/dask_cudf/tests/test_dispatch.py b/python/dask_cudf/dask_cudf/tests/test_dispatch.py index cf49b1df4f4..c64e25fd437 100644 --- a/python/dask_cudf/dask_cudf/tests/test_dispatch.py +++ b/python/dask_cudf/dask_cudf/tests/test_dispatch.py @@ -22,18 +22,25 @@ def test_is_categorical_dispatch(): assert is_categorical_dtype(cudf.Index([1, 2, 3], dtype="category")) -def test_pyarrow_conversion_dispatch(): +@pytest.mark.parametrize("preserve_index", [True, False]) +def test_pyarrow_conversion_dispatch(preserve_index): from dask.dataframe.dispatch import ( from_pyarrow_table_dispatch, to_pyarrow_table_dispatch, ) df1 = cudf.DataFrame(np.random.randn(10, 3), columns=list("abc")) - df2 = from_pyarrow_table_dispatch(df1, to_pyarrow_table_dispatch(df1)) + df2 = from_pyarrow_table_dispatch( + df1, to_pyarrow_table_dispatch(df1, preserve_index=preserve_index) + ) assert type(df1) == type(df2) assert_eq(df1, df2) + # Check that preserve_index does not produce a RangeIndex + if preserve_index: + assert not isinstance(df2.index, cudf.RangeIndex) + @pytest.mark.parametrize("index", [None, [1, 2] * 5]) def test_deterministic_tokenize(index): diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index e24feaa2ea4..db3f3695648 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. import numba.cuda import pytest @@ -77,3 +77,23 @@ def test_str_series_roundtrip(): actual = dask_series.compute() assert_eq(actual, expected) + + +def test_p2p_shuffle(): + # Check that we can use `shuffle="p2p"` + with dask_cuda.LocalCUDACluster(n_workers=1) as cluster: + with Client(cluster): + ddf = ( + dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-08", + dtypes={"x": int}, + ) + .reset_index(drop=True) + .to_backend("cudf") + ) + dd.assert_eq( + ddf.sort_values("x", shuffle="p2p").compute(), + ddf.compute().sort_values("x"), + check_index=False, + )