Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-file and Parquet-aware prefetching from remote storage #16657

Merged
merged 12 commits into from
Sep 4, 2024
Merged
40 changes: 40 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,51 @@ def read_parquet(
)
filepath_or_buffer = paths if paths else filepath_or_buffer

# Prepare remote-IO options
prefetch_options = kwargs.pop("prefetch_options", {})
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
if not ioutils._is_local_filesystem(fs):
# The default prefetch method depends on the
# `row_groups` argument. In most cases we will use
# method="all" by default, because it is fastest
# when we need to read most of the file(s).
# If a (simple) `row_groups` selection is made, we
# use method="parquet" to avoid transferring the
# entire file over the network
method = prefetch_options.get("method")
_row_groups = None
if method in (None, "parquet"):
wence- marked this conversation as resolved.
Show resolved Hide resolved
if row_groups is None:
# Don't use 'parquet' prefetcher for column
# projection alone.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
method = method or "all"
elif all(r == row_groups[0] for r in row_groups):
# Row group selection means we are probably
# reading half the file or less. We should
# avoid a full file transfer by default.
method = "parquet"
_row_groups = row_groups[0]
elif (method := method or "all") == "parquet":
raise ValueError(
"The 'parquet' prefetcher requires a uniform "
"row-group selection for all paths within the "
"same `read_parquet` call. "
"Got: {row_groups}"
)
if method == "parquet":
prefetch_options = prefetch_options.update(
{
"method": method,
"columns": columns,
"row_groups": _row_groups,
}
)

filepaths_or_buffers = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
fs=fs,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
prefetch_options=prefetch_options,
)

# Warn user if they are not using cudf for IO
Expand Down
47 changes: 47 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,53 @@ def test_read_parquet(
assert_eq(expect, got2)


@pytest.mark.parametrize("method", ["all", "parquet"])
@pytest.mark.parametrize("blocksize", [1024 * 1024, 1024])
def test_read_parquet_prefetch_options(
s3_base,
s3so,
pdf,
method,
blocksize,
):
bucket = "parquet"
fname_1 = "test_parquet_reader_prefetch_options_1.parquet"
buffer_1 = BytesIO()
pdf.to_parquet(path=buffer_1)
buffer_1.seek(0)

fname_2 = "test_parquet_reader_prefetch_options_2.parquet"
buffer_2 = BytesIO()
pdf_2 = pdf.copy()
pdf_2["Integer"] += 1
pdf_2.to_parquet(path=buffer_2)
buffer_2.seek(0)

with s3_context(
s3_base=s3_base,
bucket=bucket,
files={
fname_1: buffer_1,
fname_2: buffer_2,
},
):
got = cudf.read_parquet(
[
f"s3://{bucket}/{fname_1}",
f"s3://{bucket}/{fname_2}",
],
storage_options=s3so,
prefetch_options={
"method": method,
"blocksize": blocksize,
},
columns=["String", "Integer"],
)

expect = pd.concat([pdf, pdf_2], ignore_index=True)[["String", "Integer"]]
assert_eq(expect, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("index", [None, "Integer"])
Expand Down
138 changes: 122 additions & 16 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import datetime
import functools
import operator
import os
import urllib
import warnings
Expand All @@ -18,6 +20,12 @@
from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial

try:
import fsspec.parquet as fsspec_parquet

except ImportError:
fsspec_parquet = None

_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024

Expand Down Expand Up @@ -187,6 +195,10 @@
allow_mismatched_pq_schemas : boolean, default False
If True, enables reading (matching) columns specified in `columns` and `filters`
options from the input files with otherwise mismatched schemas.
prefetch_options : dict, default None
WARNING: This is an experimental feature (added in 24.10).
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
Dictionary of options to use to prefetch bytes from remote storage.
These options are passed through to `get_reader_filepath_or_buffer`.

Returns
-------
Expand Down Expand Up @@ -1439,6 +1451,13 @@
Glob pattern to use when expanding directories into file paths
(e.g. "*.json"). If this parameter is not specified, directories
will not be expanded.
prefetch_options : dict, default None
WARNING: This is an experimental feature (added in 24.10).
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
Dictionary of options to use to prefetch bytes from remote storage.
These options are only used when `path_or_data` is a list of remote
paths. If 'method' is set to 'all' (the default), the only supported
option is 'blocksize' (default 256 MB). If method is set to 'parquet',
'columns' and 'row_groups' are also supported (default None).

Returns
-------
Expand Down Expand Up @@ -1620,6 +1639,7 @@ def get_reader_filepath_or_buffer(
warn_on_raw_text_input=None,
warn_meta=None,
expand_dir_pattern=None,
prefetch_options=None,
):
"""{docstring}"""

Expand Down Expand Up @@ -1690,26 +1710,15 @@ def get_reader_filepath_or_buffer(
raw_text_input = True

elif fs is not None:
# TODO: We can use cat_ranges and/or parquet-aware logic
# to copy all remote data into host memory at once here.
# The current solution iterates over files, and copies
# ALL data from each file (even when we are performing
# partial IO, and don't need the entire file)
if len(paths) == 0:
raise FileNotFoundError(
f"{input_sources} could not be resolved to any files"
)
filepaths_or_buffers = [
BytesIO(
_fsspec_data_transfer(
fpath,
fs=fs,
mode=mode,
bytes_per_thread=bytes_per_thread,
)
)
for fpath in paths
]
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)
else:
raw_text_input = True

Expand Down Expand Up @@ -2099,3 +2108,100 @@ def _read_byte_ranges(

for worker in workers:
worker.join()


def _get_remote_bytes_all(
remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT
):
# TODO: Experiment with a heuristic to avoid the fs.sizes
# call when we are reading many files at once (the latency
# of collecting the file sizes is unnecessary in this case)
if max(sizes := fs.sizes(remote_paths)) <= blocksize:
# Don't bother breaking up individual files
return fs.cat_ranges(remote_paths, None, None)
else:
# Construct list of paths, starts, and ends
paths, starts, ends = map(
list,
zip(
*(
(r, j, min(j + blocksize, s))
for r, s in zip(remote_paths, sizes)
for j in range(0, s, blocksize)
)
),
)

# Collect the byte ranges
chunks = fs.cat_ranges(paths, starts, ends)

# Construct local byte buffers
# (Need to make sure path offsets are ordered correctly)
unique_count = dict(zip(*np.unique(paths, return_counts=True)))
offset = np.cumsum([0] + [unique_count[p] for p in remote_paths])
buffers = [
functools.reduce(operator.add, chunks[offset[i] : offset[i + 1]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (non-blocking): I thought reduce(add, foo) is just sum(foo), what am I missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this had me a bit confused as well. It turns out that operator.add will effectively join byte strings, but sum will require the intermediate values to be numeric values:

import operator
assert operator.add(b"asdf", b"jkl;") == b'asdfjkl;'  # Assertion passes

assert sum([b"asdf", b"jkl;"]) == b'asdfjkl;'  # Raises
TypeError: unsupported operand type(s) for +: 'int' and 'bytes'

for i in range(len(remote_paths))
]
return buffers


def _get_remote_bytes_parquet(
remote_paths,
fs,
*,
columns=None,
row_groups=None,
blocksize=_BYTES_PER_THREAD_DEFAULT,
):
if fsspec_parquet is None or (columns is None and row_groups is None):
return _get_remote_bytes_all(remote_paths, fs, blocksize=blocksize)

sizes = fs.sizes(remote_paths)
data = fsspec_parquet._get_parquet_byte_ranges(
vyasr marked this conversation as resolved.
Show resolved Hide resolved
remote_paths,
fs,
columns=columns,
row_groups=row_groups,
max_block=blocksize,
)

buffers = []
for size, path in zip(sizes, remote_paths):
path_data = data[path]
buf = np.empty(size, dtype="b")
for range_offset in path_data.keys():
chunk = path_data[range_offset]
buf[range_offset[0] : range_offset[1]] = np.frombuffer(
chunk, dtype="b"
)
buffers.append(buf.tobytes())
return buffers


def _prefetch_remote_buffers(
paths,
fs,
*,
method="all",
**prefetch_options,
):
# Gather bytes ahead of time for remote filesystems
if fs and paths and not _is_local_filesystem(fs):
try:
prefetcher = {
"parquet": _get_remote_bytes_parquet,
"all": _get_remote_bytes_all,
}[method]
except KeyError:
raise NotImplementedError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this is an internal function I wouldn't bother with exception handling. The only callers should be internal, so if we provide an invalid method we can be responsible for tracking down the problem when the KeyError is observed. Alternatively, convert the method to an enum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user can technically pass in prefetch_options={"method": "foo"}, and it's probably best to return a clear error message. (Though, ValueError seems better than NotImplementedError in this case)

f"{method} is not a supported remote-data prefetcher"
)
return prefetcher(
paths,
fs,
**prefetch_options,
)

else:
return paths
Loading