From e197d72f2daafb2f4804f823019b1ca7810ed560 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 16 Aug 2024 09:45:30 -0700 Subject: [PATCH] Replace `NativeFile` dependency in dask-cudf Parquet reader (#16569) Replaces `read_parquet` logic that currently depends on `NativeFile` for remote-storage access. **NOTE**: ~It is possible to remove `NativeFile` usage without adding the new `_prefetch_remote_buffers` logic.~ ~However, I'd like to replace the cudf data-transfer logic soon anyway.~ Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/cudf/pull/16569 --- python/dask_cudf/dask_cudf/backends.py | 21 ++++ python/dask_cudf/dask_cudf/io/parquet.py | 102 +++++++----------- .../dask_cudf/dask_cudf/io/tests/test_s3.py | 64 +++++++---- 3 files changed, 101 insertions(+), 86 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 82ea2ac033a..a65ae819b44 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -498,6 +498,25 @@ def _unsupported_kwargs(old, new, kwargs): ) +def _raise_unsupported_parquet_kwargs( + open_file_options=None, filesystem=None, **kwargs +): + import fsspec + + if open_file_options is not None: + raise ValueError( + "The open_file_options argument is no longer supported " + "by the 'cudf' backend." + ) + + if filesystem not in ("fsspec", None) and not isinstance( + filesystem, fsspec.AbstractFileSystem + ): + raise ValueError( + f"filesystem={filesystem} is not supported by the 'cudf' backend." + ) + + # Register cudf->pandas to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch() @@ -573,6 +592,7 @@ def from_dict( def read_parquet(*args, engine=None, **kwargs): from dask_cudf.io.parquet import CudfEngine + _raise_unsupported_parquet_kwargs(**kwargs) return _default_backend( dd.read_parquet, *args, @@ -665,6 +685,7 @@ def read_parquet(*args, engine=None, **kwargs): from dask_cudf.io.parquet import CudfEngine + _raise_unsupported_parquet_kwargs(**kwargs) return _default_backend( dx.read_parquet, *args, engine=CudfEngine, **kwargs ) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index f0cab953458..8f52fce7818 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,7 +1,6 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import itertools import warnings -from contextlib import ExitStack from functools import partial from io import BufferedWriter, BytesIO, IOBase @@ -22,18 +21,13 @@ import cudf from cudf.core.column import as_column, build_categorical_column from cudf.io import write_to_dataset -from cudf.io.parquet import ( - _apply_post_filters, - _default_open_file_options, - _normalize_filters, -) +from cudf.io.parquet import _apply_post_filters, _normalize_filters from cudf.utils.dtypes import cudf_dtype_from_pa_type from cudf.utils.ioutils import ( _ROW_GROUP_SIZE_BYTES_DEFAULT, + _fsspec_data_transfer, _is_local_filesystem, - _open_remote_files, ) -from cudf.utils.utils import maybe_filter_deprecation class CudfEngine(ArrowDatasetEngine): @@ -98,63 +92,45 @@ def _read_paths( dataset_kwargs = dataset_kwargs or {} dataset_kwargs["partitioning"] = partitioning or "hive" - with ExitStack() as stack: - # Non-local filesystem handling - paths_or_fobs = paths - if not _is_local_filesystem(fs): - paths_or_fobs = _open_remote_files( - paths_or_fobs, - fs, - context_stack=stack, - **_default_open_file_options( - open_file_options, columns, row_groups - ), - ) - # Filter out deprecation warning unless the user - # specifies open_file_options and/or use_python_file_object. - # Otherwise, the FutureWarning is out of their control. - with maybe_filter_deprecation( - ( - not open_file_options - and "use_python_file_object" not in kwargs - ), - message="Support for reading pyarrow's NativeFile is deprecated", - category=FutureWarning, - ): - # Use cudf to read in data - try: - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - except RuntimeError as err: - # TODO: Remove try/except after null-schema issue is resolved - # (See: https://github.com/rapidsai/cudf/issues/12702) - if len(paths_or_fobs) > 1: - df = cudf.concat( - [ - cudf.read_parquet( - pof, - engine="cudf", - columns=columns, - row_groups=row_groups[i] - if row_groups - else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - for i, pof in enumerate(paths_or_fobs) - ] + # Non-local filesystem handling + paths_or_fobs = paths + if not _is_local_filesystem(fs): + paths_or_fobs = [ + _fsspec_data_transfer(fpath, fs=fs) for fpath in paths + ] + + # Use cudf to read in data + try: + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) + except RuntimeError as err: + # TODO: Remove try/except after null-schema issue is resolved + # (See: https://github.com/rapidsai/cudf/issues/12702) + if len(paths_or_fobs) > 1: + df = cudf.concat( + [ + cudf.read_parquet( + pof, + engine="cudf", + columns=columns, + row_groups=row_groups[i] if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, ) - else: - raise err + for i, pof in enumerate(paths_or_fobs) + ] + ) + else: + raise err # Apply filters (if any are defined) df = _apply_post_filters(df, filters) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index ac3245b3748..99f19917424 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -5,8 +5,8 @@ from contextlib import contextmanager from io import BytesIO +import fsspec import pandas as pd -import pyarrow.fs as pa_fs import pytest from dask.dataframe import assert_eq @@ -135,35 +135,53 @@ def test_read_csv_warns(s3_base, s3so): assert df.a.sum().compute() == 4 -@pytest.mark.parametrize( - "open_file_options", - [ - {"precache_options": {"method": None}}, - {"precache_options": {"method": "parquet"}}, - {"open_file_func": None}, - ], -) -def test_read_parquet_open_file_options(s3_base, s3so, open_file_options, pdf): +def test_read_parquet_open_file_options_raises(): + with pytest.raises(ValueError): + dask_cudf.read_parquet( + "s3://my/path", + open_file_options={"precache_options": {"method": "parquet"}}, + ) + + +def test_read_parquet_filesystem(s3_base, s3so, pdf): + fname = "test_parquet_filesystem.parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) - with s3_context( - s3_base=s3_base, bucket="daskparquet", files={"file.parq": buffer} - ): - if "open_file_func" in open_file_options: - fs = pa_fs.S3FileSystem( - endpoint_override=s3so["client_kwargs"]["endpoint_url"], + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + path = f"s3://{bucket}/{fname}" + + # Cannot pass filesystem="arrow" + with pytest.raises(ValueError): + dask_cudf.read_parquet( + path, + storage_options=s3so, + filesystem="arrow", ) - open_file_options["open_file_func"] = fs.open_input_file + + # Can pass filesystem="fsspec" df = dask_cudf.read_parquet( - "s3://daskparquet/*.parq", + path, storage_options=s3so, - open_file_options=open_file_options, + filesystem="fsspec", ) - with pytest.warns(FutureWarning): - assert df.a.sum().compute() == 10 - with pytest.warns(FutureWarning): - assert df.b.sum().compute() == 9 + assert df.b.sum().compute() == 9 + + +def test_read_parquet_filesystem_explicit(s3_base, s3so, pdf): + fname = "test_parquet_filesystem_explicit.parquet" + bucket = "parquet" + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + path = f"s3://{bucket}/{fname}" + fs = fsspec.core.get_fs_token_paths( + path, mode="rb", storage_options=s3so + )[0] + df = dask_cudf.read_parquet(path, filesystem=fs) + assert df.b.sum().compute() == 9 def test_read_parquet(s3_base, s3so, pdf):