Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve pyarrow-free remote-IO performance #16166

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b5c38bb
start simplifying remote-io optimizations
rjzamora Jun 13, 2024
7361e92
add basic json support
rjzamora Jun 14, 2024
7472446
only use _fsspec_data_transfer for file-like objects
rjzamora Jun 14, 2024
8b51952
add read_text support for byte_range opt
rjzamora Jun 14, 2024
778ceeb
start experimental dask changes
rjzamora Jun 14, 2024
ec5d140
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 1, 2024
b55fce9
simplify
rjzamora Jul 1, 2024
719d422
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 2, 2024
0874388
improve backward compatibility during deprecation cycle (in dask-cudf)
rjzamora Jul 2, 2024
f12b20f
remove deprecation warnings
rjzamora Jul 2, 2024
e265d40
refactor
rjzamora Jul 2, 2024
85a1cf6
update metadata reader
rjzamora Jul 2, 2024
c3f48cc
resolve json behavior
rjzamora Jul 2, 2024
373b37a
json fix
rjzamora Jul 2, 2024
2b00674
remove open_file_options from test_read_parquet_filters
rjzamora Jul 2, 2024
9d3f3f6
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 2, 2024
6658f34
update
rjzamora Jul 2, 2024
8c2a2e2
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 3, 2024
9791e2d
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 8, 2024
edf376a
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 10, 2024
cda3f4b
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 12, 2024
d6fae35
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 14, 2024
0db7865
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 15, 2024
9465846
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 16, 2024
26d08fe
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 22, 2024
81f2a7d
fix tests
rjzamora Jul 22, 2024
00c47fa
Merge remote-tracking branch 'upstream/branch-24.10' into simplify-re…
rjzamora Jul 30, 2024
5867f05
register read_parquet function to CudfDXBackendEntrypoint
rjzamora Jul 30, 2024
1f73dbe
Merge remote-tracking branch 'upstream/branch-24.10' into dx-parquet-…
rjzamora Aug 12, 2024
17be6b0
add read_csv def
rjzamora Aug 12, 2024
04aa983
formatting
rjzamora Aug 12, 2024
28fd1f8
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 12, 2024
5d8c80d
simplify imports
rjzamora Aug 12, 2024
23c0cb3
Merge remote-tracking branch 'upstream/branch-24.10' into dx-parquet-…
rjzamora Aug 12, 2024
8bf252e
Merge branch 'dx-parquet-dispatch' of github.com:rjzamora/cudf into d…
rjzamora Aug 12, 2024
b857c0c
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 12, 2024
d10e9d7
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 13, 2024
ffcc137
Apply suggestions from code review
rjzamora Aug 13, 2024
491c140
Merge remote-tracking branch 'upstream/branch-24.10' into simplify-re…
rjzamora Aug 13, 2024
9313d54
Merge branch 'dx-parquet-dispatch' into simplify-remote-io
rjzamora Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def read_csv(
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
prefetch_read_ahead=None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about this prefetch_read_ahead option. This basically determines how many bytes we will read beyond byte_range to make sure we capture a delimiter (for example).

):
"""{docstring}"""

Expand Down Expand Up @@ -81,9 +82,31 @@ def read_csv(
"`read_csv` does not yet support reading multiple files"
)

# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
)

# Prefetch remote data if possible
if fs and paths and not use_python_file_object:
filepath_or_buffer, info = ioutils.prefetch_remote_buffers(
paths,
fs,
bytes_per_thread=bytes_per_thread,
prefetcher="contiguous",
prefetcher_options={
"byte_range": byte_range,
"read_ahead": prefetch_read_ahead,
},
)
assert len(filepath_or_buffer) == 1
filepath_or_buffer = filepath_or_buffer[0]
byte_range = info.get("byte_range", byte_range)

filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
fs=fs,
iotypes=(BytesIO, StringIO, NativeFile),
use_python_file_object=use_python_file_object,
storage_options=storage_options,
Expand Down
27 changes: 21 additions & 6 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def read_json(
mixed_types_as_string=False,
prune_columns=False,
on_bad_lines="error",
prefetch_read_ahead=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -67,16 +68,30 @@ def read_json(
if not is_list_like(path_or_buf):
path_or_buf = [path_or_buf]

# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=path_or_buf, storage_options=storage_options
)

# Prefetch remote data if possible
if fs and paths:
path_or_buf, info = ioutils.prefetch_remote_buffers(
paths,
fs,
expand_paths="*.json",
prefetcher="contiguous",
prefetcher_options={
"byte_range": byte_range,
"read_ahead": prefetch_read_ahead,
},
)
byte_range = info.get("byte_range", byte_range)

filepaths_or_buffers = []
for source in path_or_buf:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
path_or_data=source, storage_options=storage_options, fs=fs
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])

Expand Down
24 changes: 18 additions & 6 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,37 @@ def read_orc(
"A list of stripes must be provided for each input source"
)

# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
)

# Prefetch remote data if possible
if fs and paths and not use_python_file_object:
# TODO: Add prefetcher for partial IO
filepath_or_buffer, _ = ioutils.prefetch_remote_buffers(
paths,
fs,
bytes_per_thread=bytes_per_thread,
expand_paths="*.orc",
prefetcher="contiguous",
)

filepaths_or_buffers = []
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
path_or_data=source, storage_options=storage_options, fs=fs
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = stringify_path(source)
source = fs.sep.join([source, "*.orc"])

tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=use_python_file_object,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
Expand Down
31 changes: 28 additions & 3 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,18 @@ def read_parquet_metadata(filepath_or_buffer):
path_or_data=filepath_or_buffer, storage_options=None
)

# Check if filepath or buffer
filepath_or_buffer = paths if paths else filepath_or_buffer
if fs and paths:
filepath_or_buffer, _ = ioutils.prefetch_remote_buffers(
paths,
fs,
prefetcher="parquet",
prefetcher_options={
"columns": [],
"row_groups": [],
},
)
else:
filepath_or_buffer = paths if paths else filepath_or_buffer

# List of filepaths or buffers
filepaths_or_buffers = []
Expand Down Expand Up @@ -609,7 +619,22 @@ def read_parquet(
categorical_partitions=categorical_partitions,
dataset_kwargs=dataset_kwargs,
)
filepath_or_buffer = paths if paths else filepath_or_buffer

# Prefetch remote data if possible
if fs and paths and not use_python_file_object:
filepath_or_buffer, _ = ioutils.prefetch_remote_buffers(
paths,
fs,
bytes_per_thread=bytes_per_thread,
prefetcher="parquet",
prefetcher_options={
"columns": columns,
# All paths must have the same row-group selection
"row_groups": row_groups[0] if row_groups else None,
},
)
else:
filepath_or_buffer = paths if paths else filepath_or_buffer

filepaths_or_buffers = []
if use_python_file_object:
Expand Down
22 changes: 22 additions & 0 deletions python/cudf/cudf/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,37 @@ def read_text(
compression=None,
compression_offsets=None,
storage_options=None,
prefetch_read_ahead=None,
):
"""{docstring}"""

if delimiter is None:
raise ValueError("delimiter needs to be provided")

# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
)

# Prefetch remote data if possible
if fs and paths:
filepath_or_buffer, info = ioutils.prefetch_remote_buffers(
paths,
fs,
prefetcher="contiguous",
prefetcher_options={
"byte_range": byte_range,
"read_ahead": prefetch_read_ahead,
},
)
assert len(filepath_or_buffer) == 1
filepath_or_buffer = filepath_or_buffer[0]
byte_range = info.get("byte_range", byte_range)

filepath_or_buffer, _ = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=None,
fs=fs,
iotypes=(BytesIO, StringIO),
storage_options=storage_options,
)
Expand Down
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache):
open_file_options={"precache_options": {"method": precache}},
)

# Check that default case doesn't warn and is correct
if precache is None:
default = cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
filters=filters,
)
assert_eq(pdf_ext.iloc[:0], default.reset_index(drop=True))

# All row-groups should be filtered out
assert_eq(pdf_ext.iloc[:0], got.reset_index(drop=True))

Expand Down
Loading
Loading