Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-file and Parquet-aware prefetching from remote storage #16657

Merged
merged 12 commits into from
Sep 4, 2024
Merged
15 changes: 15 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,26 @@ def read_parquet(
)
filepath_or_buffer = paths if paths else filepath_or_buffer

# Prepare remote-IO options
prefetch_options = kwargs.pop("prefetch_options", {})
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
if not ioutils._is_local_filesystem(fs):
method = prefetch_options.get("method", "parquet")
if method == "parquet":
prefetch_options = prefetch_options.update(
{
"method": method,
"columns": columns,
# All paths must have the same row-group selection
"row_groups": row_groups[0] if row_groups else None,
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
}
)

filepaths_or_buffers = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
fs=fs,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
prefetch_options=prefetch_options,
)

# Warn user if they are not using cudf for IO
Expand Down
27 changes: 27 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ def test_read_parquet(
assert_eq(expect, got2)


@pytest.mark.parametrize("method", [None, "all", "parquet"])
def test_read_parquet_prefetch_options(
s3_base,
s3so,
pdf,
method,
):
fname = "test_parquet_reader_prefetch_options.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
if method is None:
prefetch_options = {}
else:
prefetch_options = {"method": method}
got = cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
prefetch_options=prefetch_options,
columns=["String"],
)
expect = pdf[["String"]]
assert_eq(expect, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("index", [None, "Integer"])
Expand Down
121 changes: 105 additions & 16 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial

try:
import fsspec.parquet as fsspec_parquet

except ImportError:
fsspec_parquet = None

_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024

Expand Down Expand Up @@ -1620,6 +1626,7 @@ def get_reader_filepath_or_buffer(
warn_on_raw_text_input=None,
warn_meta=None,
expand_dir_pattern=None,
prefetch_options=None,
):
"""{docstring}"""

Expand Down Expand Up @@ -1690,26 +1697,15 @@ def get_reader_filepath_or_buffer(
raw_text_input = True

elif fs is not None:
# TODO: We can use cat_ranges and/or parquet-aware logic
# to copy all remote data into host memory at once here.
# The current solution iterates over files, and copies
# ALL data from each file (even when we are performing
# partial IO, and don't need the entire file)
if len(paths) == 0:
raise FileNotFoundError(
f"{input_sources} could not be resolved to any files"
)
filepaths_or_buffers = [
BytesIO(
_fsspec_data_transfer(
fpath,
fs=fs,
mode=mode,
bytes_per_thread=bytes_per_thread,
)
)
for fpath in paths
]
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)
else:
raw_text_input = True

Expand Down Expand Up @@ -2099,3 +2095,96 @@ def _read_byte_ranges(

for worker in workers:
worker.join()


def _get_remote_bytes_all(
remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT
):
if (
len(remote_paths) >= 8 # Heuristic to avoid fs.sizes
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
or max(sizes := fs.sizes(remote_paths)) <= blocksize
):
# Don't bother braking up individual files
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
return fs.cat_ranges(remote_paths, None, None)
else:
# Construct list of paths, starts, and ends
paths, starts, ends = [], [], []
for i, remote_path in enumerate(remote_paths):
for j in range(0, sizes[i], blocksize):
paths.append(remote_path)
starts.append(j)
ends.append(min(j + blocksize, sizes[i]))
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

# Collect the byte ranges
chunks = fs.cat_ranges(paths, starts, ends)

# Construct local byte buffers
buffers = []
path_counts = np.unique(paths, return_counts=True)[1]
for i, remote_path in enumerate(remote_paths):
bytes_arr = bytearray()
for j in range(path_counts[i]):
bytes_arr.extend(chunks.pop(0))
buffers.append(bytes(bytes_arr))
return buffers
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


def _get_remote_bytes_parquet(
remote_paths,
fs,
*,
columns=None,
row_groups=None,
blocksize=_BYTES_PER_THREAD_DEFAULT,
):
if fsspec_parquet is None or (columns is None and row_groups is None):
return _get_remote_bytes_all(remote_paths, fs, blocksize=blocksize)

sizes = fs.sizes(remote_paths)
data = fsspec_parquet._get_parquet_byte_ranges(
vyasr marked this conversation as resolved.
Show resolved Hide resolved
remote_paths,
fs,
columns=columns,
row_groups=row_groups,
max_block=blocksize,
)

buffers = []
for size, path in zip(sizes, remote_paths):
path_data = data.pop(path)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
buf = np.zeros(size, dtype="b")
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for range_offset in list(path_data.keys()):
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
chunk = path_data.pop(range_offset)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
buf[range_offset[0] : range_offset[1]] = np.frombuffer(
chunk, dtype="b"
)
buffers.append(buf.tobytes())
return buffers


def _prefetch_remote_buffers(
paths,
fs,
*,
method="all",
**prefetch_options,
):
# Gather bytes ahead of time for remote filesystems
if fs and paths and not _is_local_filesystem(fs):
try:
prefetcher = {
"parquet": _get_remote_bytes_parquet,
"all": _get_remote_bytes_all,
}[method]
except KeyError:
raise NotImplementedError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since this is an internal function I wouldn't bother with exception handling. The only callers should be internal, so if we provide an invalid method we can be responsible for tracking down the problem when the KeyError is observed. Alternatively, convert the method to an enum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user can technically pass in prefetch_options={"method": "foo"}, and it's probably best to return a clear error message. (Though, ValueError seems better than NotImplementedError in this case)

f"{method} is not a supported remote-data prefetcher"
)
return prefetcher(
paths,
fs,
**prefetch_options,
)

else:
return paths
Loading