Skip to content

Commit

Permalink
[FEA] Add filesystem argument to cudf.read_parquet (#16577)
Browse files Browse the repository at this point in the history
This PR adds a `filesystem` kwarg to `cudf.read_parquet` (in alignment with [the pandas API](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html)).

When a user has already constructed an `fsspec.AbstractFileSystem` object outside of cudf, they can now pass that object in to `read_parquet` to avoid redundant (and possibly inconsistent) filesystem inference. This PR also makes it possible for us to remove [explicit remote-IO handling from dask-cudf](https://github.com/rapidsai/cudf/blob/623dfceb42eb3e73b352b295898ff3e6cfe7c865/python/dask_cudf/dask_cudf/io/parquet.py#L100) (and consolidate the logic in cudf/ioutils).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16577
  • Loading branch information
rjzamora authored Aug 22, 2024
1 parent 1fd9675 commit 00ff2ee
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 29 deletions.
5 changes: 4 additions & 1 deletion python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def read_parquet(
engine="cudf",
columns=None,
storage_options=None,
filesystem=None,
filters=None,
row_groups=None,
use_pandas_metadata=True,
Expand Down Expand Up @@ -567,7 +568,9 @@ def read_parquet(
# Start by trying construct a filesystem object, so we
# can apply filters on remote file-systems
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
path_or_data=filepath_or_buffer,
storage_options=storage_options,
filesystem=filesystem,
)

# Normalize and validate filters
Expand Down
22 changes: 22 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,28 @@ def test_read_parquet_ext(
assert_eq(expect, got1)


def test_read_parquet_filesystem(s3_base, s3so, pdf):
fname = "data.0.parquet"
# NOTE: Need a unique bucket name when a glob pattern
# is used, otherwise fsspec seems to cache the bucket
# contents, and later tests using the same bucket name
# will fail.
bucket = "test_read_parquet_filesystem"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
fs = get_fs_token_paths("s3://", mode="rb", storage_options=s3so)[0]
with s3_context(
s3_base=s3_base,
bucket=bucket,
files={fname: buffer},
):
# Check that a glob pattern works
path = f"s3://{bucket}/{'data.*.parquet'}"
got = cudf.read_parquet(path, filesystem=fs)
assert_eq(pdf, got)


def test_read_parquet_multi_file(s3_base, s3so, pdf):
fname_1 = "test_parquet_reader_multi_file_1.parquet"
buffer_1 = BytesIO()
Expand Down
54 changes: 42 additions & 12 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import fsspec.implementations.local
import numpy as np
import pandas as pd
from fsspec.core import get_fs_token_paths
from fsspec.core import expand_paths_if_needed, get_fs_token_paths

from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial
Expand Down Expand Up @@ -139,6 +139,9 @@
For other URLs (e.g. starting with "s3://", and "gcs://") the key-value
pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and
``urllib`` for more details.
filesystem : fsspec.AbstractFileSystem, default None
Filesystem object to use when reading the parquet data. This argument
should not be used at the same time as `storage_options`.
filters : list of tuple, list of lists of tuples, default None
If not None, specifies a filter predicate used to filter out row groups
using statistics stored for each row group as Parquet metadata. Row groups
Expand Down Expand Up @@ -1536,11 +1539,18 @@ def is_directory(path_or_data, storage_options=None):
return False


def _get_filesystem_and_paths(path_or_data, storage_options):
def _get_filesystem_and_paths(
path_or_data,
storage_options,
*,
filesystem=None,
):
# Returns a filesystem object and the filesystem-normalized
# paths. If `path_or_data` does not correspond to a path or
# list of paths (or if the protocol is not supported), the
# return will be `None` for the fs and `[]` for the paths.
# If a filesystem object is already available, it can be
# passed with the `filesystem` argument.

fs = None
return_paths = path_or_data
Expand All @@ -1557,16 +1567,36 @@ def _get_filesystem_and_paths(path_or_data, storage_options):
else:
path_or_data = [path_or_data]

try:
fs, _, fs_paths = get_fs_token_paths(
path_or_data, mode="rb", storage_options=storage_options
)
return_paths = fs_paths
except ValueError as e:
if str(e).startswith("Protocol not known"):
return None, []
else:
raise e
if filesystem is None:
try:
fs, _, fs_paths = get_fs_token_paths(
path_or_data, mode="rb", storage_options=storage_options
)
return_paths = fs_paths
except ValueError as e:
if str(e).startswith("Protocol not known"):
return None, []
else:
raise e
else:
if not isinstance(filesystem, fsspec.AbstractFileSystem):
raise ValueError(
f"Expected fsspec.AbstractFileSystem. Got {filesystem}"
)

if storage_options:
raise ValueError(
f"Cannot specify storage_options when an explicit "
f"filesystem object is specified. Got: {storage_options}"
)

fs = filesystem
return_paths = [
fs._strip_protocol(u)
for u in expand_paths_if_needed(
path_or_data, "rb", 1, fs, None
)
]

return fs, return_paths

Expand Down
23 changes: 7 additions & 16 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
from cudf.io import write_to_dataset
from cudf.io.parquet import _apply_post_filters, _normalize_filters
from cudf.utils.dtypes import cudf_dtype_from_pa_type
from cudf.utils.ioutils import (
_ROW_GROUP_SIZE_BYTES_DEFAULT,
_fsspec_data_transfer,
_is_local_filesystem,
)
from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT


class CudfEngine(ArrowDatasetEngine):
Expand Down Expand Up @@ -93,40 +89,35 @@ def _read_paths(
dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"

# Non-local filesystem handling
paths_or_fobs = paths
if not _is_local_filesystem(fs):
paths_or_fobs = [
_fsspec_data_transfer(fpath, fs=fs) for fpath in paths
]

# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
paths,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
filesystem=fs,
**kwargs,
)
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
if len(paths) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
path,
engine="cudf",
columns=columns,
row_groups=row_groups[i] if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
filesystem=fs,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
for i, path in enumerate(paths)
]
)
else:
Expand Down

0 comments on commit 00ff2ee

Please sign in to comment.