Skip to content

Commit

Permalink
Replace NativeFile dependency in dask-cudf Parquet reader (#16569)
Browse files Browse the repository at this point in the history
Replaces `read_parquet` logic that currently depends on `NativeFile` for remote-storage access.

**NOTE**: ~It is possible to remove `NativeFile` usage without adding the new `_prefetch_remote_buffers` logic.~ ~However, I'd like to replace the cudf data-transfer logic soon anyway.~

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #16569
  • Loading branch information
rjzamora authored Aug 16, 2024
1 parent e690d9d commit e197d72
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 86 deletions.
21 changes: 21 additions & 0 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,25 @@ def _unsupported_kwargs(old, new, kwargs):
)


def _raise_unsupported_parquet_kwargs(
open_file_options=None, filesystem=None, **kwargs
):
import fsspec

if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)

if filesystem not in ("fsspec", None) and not isinstance(
filesystem, fsspec.AbstractFileSystem
):
raise ValueError(
f"filesystem={filesystem} is not supported by the 'cudf' backend."
)


# Register cudf->pandas
to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch()

Expand Down Expand Up @@ -573,6 +592,7 @@ def from_dict(
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dd.read_parquet,
*args,
Expand Down Expand Up @@ -665,6 +685,7 @@ def read_parquet(*args, engine=None, **kwargs):

from dask_cudf.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet, *args, engine=CudfEngine, **kwargs
)
Expand Down
102 changes: 39 additions & 63 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
import itertools
import warnings
from contextlib import ExitStack
from functools import partial
from io import BufferedWriter, BytesIO, IOBase

Expand All @@ -22,18 +21,13 @@
import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.io import write_to_dataset
from cudf.io.parquet import (
_apply_post_filters,
_default_open_file_options,
_normalize_filters,
)
from cudf.io.parquet import _apply_post_filters, _normalize_filters
from cudf.utils.dtypes import cudf_dtype_from_pa_type
from cudf.utils.ioutils import (
_ROW_GROUP_SIZE_BYTES_DEFAULT,
_fsspec_data_transfer,
_is_local_filesystem,
_open_remote_files,
)
from cudf.utils.utils import maybe_filter_deprecation


class CudfEngine(ArrowDatasetEngine):
Expand Down Expand Up @@ -98,63 +92,45 @@ def _read_paths(

dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"
with ExitStack() as stack:
# Non-local filesystem handling
paths_or_fobs = paths
if not _is_local_filesystem(fs):
paths_or_fobs = _open_remote_files(
paths_or_fobs,
fs,
context_stack=stack,
**_default_open_file_options(
open_file_options, columns, row_groups
),
)

# Filter out deprecation warning unless the user
# specifies open_file_options and/or use_python_file_object.
# Otherwise, the FutureWarning is out of their control.
with maybe_filter_deprecation(
(
not open_file_options
and "use_python_file_object" not in kwargs
),
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i]
if row_groups
else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
]
# Non-local filesystem handling
paths_or_fobs = paths
if not _is_local_filesystem(fs):
paths_or_fobs = [
_fsspec_data_transfer(fpath, fs=fs) for fpath in paths
]

# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i] if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
else:
raise err
for i, pof in enumerate(paths_or_fobs)
]
)
else:
raise err

# Apply filters (if any are defined)
df = _apply_post_filters(df, filters)
Expand Down
64 changes: 41 additions & 23 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from contextlib import contextmanager
from io import BytesIO

import fsspec
import pandas as pd
import pyarrow.fs as pa_fs
import pytest

from dask.dataframe import assert_eq
Expand Down Expand Up @@ -135,35 +135,53 @@ def test_read_csv_warns(s3_base, s3so):
assert df.a.sum().compute() == 4


@pytest.mark.parametrize(
"open_file_options",
[
{"precache_options": {"method": None}},
{"precache_options": {"method": "parquet"}},
{"open_file_func": None},
],
)
def test_read_parquet_open_file_options(s3_base, s3so, open_file_options, pdf):
def test_read_parquet_open_file_options_raises():
with pytest.raises(ValueError):
dask_cudf.read_parquet(
"s3://my/path",
open_file_options={"precache_options": {"method": "parquet"}},
)


def test_read_parquet_filesystem(s3_base, s3so, pdf):
fname = "test_parquet_filesystem.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(
s3_base=s3_base, bucket="daskparquet", files={"file.parq": buffer}
):
if "open_file_func" in open_file_options:
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
path = f"s3://{bucket}/{fname}"

# Cannot pass filesystem="arrow"
with pytest.raises(ValueError):
dask_cudf.read_parquet(
path,
storage_options=s3so,
filesystem="arrow",
)
open_file_options["open_file_func"] = fs.open_input_file

# Can pass filesystem="fsspec"
df = dask_cudf.read_parquet(
"s3://daskparquet/*.parq",
path,
storage_options=s3so,
open_file_options=open_file_options,
filesystem="fsspec",
)
with pytest.warns(FutureWarning):
assert df.a.sum().compute() == 10
with pytest.warns(FutureWarning):
assert df.b.sum().compute() == 9
assert df.b.sum().compute() == 9


def test_read_parquet_filesystem_explicit(s3_base, s3so, pdf):
fname = "test_parquet_filesystem_explicit.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
path = f"s3://{bucket}/{fname}"
fs = fsspec.core.get_fs_token_paths(
path, mode="rb", storage_options=s3so
)[0]
df = dask_cudf.read_parquet(path, filesystem=fs)
assert df.b.sum().compute() == 9


def test_read_parquet(s3_base, s3so, pdf):
Expand Down

0 comments on commit e197d72

Please sign in to comment.