Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve pyarrow-free remote-IO performance #16166

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b5c38bb
start simplifying remote-io optimizations
rjzamora Jun 13, 2024
7361e92
add basic json support
rjzamora Jun 14, 2024
7472446
only use _fsspec_data_transfer for file-like objects
rjzamora Jun 14, 2024
8b51952
add read_text support for byte_range opt
rjzamora Jun 14, 2024
778ceeb
start experimental dask changes
rjzamora Jun 14, 2024
ec5d140
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 1, 2024
b55fce9
simplify
rjzamora Jul 1, 2024
719d422
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 2, 2024
0874388
improve backward compatibility during deprecation cycle (in dask-cudf)
rjzamora Jul 2, 2024
f12b20f
remove deprecation warnings
rjzamora Jul 2, 2024
e265d40
refactor
rjzamora Jul 2, 2024
85a1cf6
update metadata reader
rjzamora Jul 2, 2024
c3f48cc
resolve json behavior
rjzamora Jul 2, 2024
373b37a
json fix
rjzamora Jul 2, 2024
2b00674
remove open_file_options from test_read_parquet_filters
rjzamora Jul 2, 2024
9d3f3f6
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 2, 2024
6658f34
update
rjzamora Jul 2, 2024
8c2a2e2
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 3, 2024
9791e2d
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 8, 2024
edf376a
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 10, 2024
cda3f4b
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 12, 2024
d6fae35
Merge branch 'branch-24.08' into simplify-remote-io
rjzamora Jul 14, 2024
0db7865
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 15, 2024
9465846
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 16, 2024
26d08fe
Merge remote-tracking branch 'upstream/branch-24.08' into simplify-re…
rjzamora Jul 22, 2024
81f2a7d
fix tests
rjzamora Jul 22, 2024
00c47fa
Merge remote-tracking branch 'upstream/branch-24.10' into simplify-re…
rjzamora Jul 30, 2024
5867f05
register read_parquet function to CudfDXBackendEntrypoint
rjzamora Jul 30, 2024
1f73dbe
Merge remote-tracking branch 'upstream/branch-24.10' into dx-parquet-…
rjzamora Aug 12, 2024
17be6b0
add read_csv def
rjzamora Aug 12, 2024
04aa983
formatting
rjzamora Aug 12, 2024
28fd1f8
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 12, 2024
5d8c80d
simplify imports
rjzamora Aug 12, 2024
23c0cb3
Merge remote-tracking branch 'upstream/branch-24.10' into dx-parquet-…
rjzamora Aug 12, 2024
8bf252e
Merge branch 'dx-parquet-dispatch' of github.com:rjzamora/cudf into d…
rjzamora Aug 12, 2024
b857c0c
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 12, 2024
d10e9d7
Merge branch 'branch-24.10' into dx-parquet-dispatch
rjzamora Aug 13, 2024
ffcc137
Apply suggestions from code review
rjzamora Aug 13, 2024
491c140
Merge remote-tracking branch 'upstream/branch-24.10' into simplify-re…
rjzamora Aug 13, 2024
9313d54
Merge branch 'dx-parquet-dispatch' into simplify-remote-io
rjzamora Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
rjzamora committed Jul 2, 2024
commit e265d40188d20db4db514677fc975c9cee944075
19 changes: 11 additions & 8 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def read_csv(
use_python_file_object=False,
storage_options=None,
bytes_per_thread=None,
prefetch_read_ahead=None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about this prefetch_read_ahead option. This basically determines how many bytes we will read beyond byte_range to make sure we capture a delimiter (for example).

):
"""{docstring}"""

Expand Down Expand Up @@ -81,20 +82,22 @@ def read_csv(
"`read_csv` does not yet support reading multiple files"
)

# Check if this is a remote file
# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
)
if (
fs
and paths
and not (ioutils._is_local_filesystem(fs) or use_python_file_object)
):
filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines(

# Prefetch remote data if possible
if not use_python_file_object:
filepath_or_buffer, byte_range = ioutils.prefetch_remote_buffers(
paths,
fs,
byte_range=byte_range,
bytes_per_thread=bytes_per_thread,
prefetcher="contiguous",
prefetcher_options={
"byte_range": byte_range,
"read_ahead": prefetch_read_ahead,
},
)
assert len(filepath_or_buffer) == 1
filepath_or_buffer = filepath_or_buffer[0]
Expand Down
34 changes: 14 additions & 20 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def read_json(
mixed_types_as_string=False,
prune_columns=False,
on_bad_lines="error",
prefetch_read_ahead=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -67,35 +68,28 @@ def read_json(
if not is_list_like(path_or_buf):
path_or_buf = [path_or_buf]

# Check if this is remote data
# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=path_or_buf, storage_options=storage_options
)
if fs and paths and not ioutils._is_local_filesystem(fs):
expanded_paths = []
for path in paths:
if ioutils.is_directory(path_or_data=path, fs=fs):
expanded_paths.extend(
fs.glob(fs.sep.join([path, "*.json"]))
)
else:
expanded_paths.append(path)
path_or_buf, byte_range = ioutils._get_remote_bytes_lines(
expanded_paths,
fs,
byte_range=byte_range,
)

# Prefetch remote data if possible
path_or_buf, byte_range = ioutils.prefetch_remote_buffers(
paths,
fs,
expand_paths="*.json",
prefetcher="contiguous",
prefetcher_options={
"byte_range": byte_range,
"read_ahead": prefetch_read_ahead,
},
)

filepaths_or_buffers = []
for source in path_or_buf:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])

Expand Down
24 changes: 18 additions & 6 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def read_orc(
num_rows=None,
use_index=True,
timestamp_type=None,
use_python_file_object=True,
use_python_file_object=False,
storage_options=None,
bytes_per_thread=None,
):
Expand Down Expand Up @@ -319,22 +319,34 @@ def read_orc(
"A list of stripes must be provided for each input source"
)

# Extract filesystem up front
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=storage_options
)

# Prefetch remote data if possible
if not use_python_file_object:
# TODO: Add prefetcher="orc"
filepath_or_buffer, _ = ioutils.prefetch_remote_buffers(
paths,
fs,
bytes_per_thread=bytes_per_thread,
expand_paths="*.orc",
prefetcher="contiguous",
)

filepaths_or_buffers = []
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = stringify_path(source)
source = fs.sep.join([source, "*.orc"])

tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=use_python_file_object,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
Expand Down
19 changes: 9 additions & 10 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,19 +604,18 @@ def read_parquet(
dataset_kwargs=dataset_kwargs,
)

# For remote data, we can transfer the necessary
# bytes directly into host memory
if (
fs
and paths
and not (ioutils._is_local_filesystem(fs) or use_python_file_object)
):
filepath_or_buffer = ioutils._get_remote_bytes_parquet(
# Prefetch remote data if possible
if not use_python_file_object:
filepath_or_buffer, _ = ioutils.prefetch_remote_buffers(
paths,
fs,
bytes_per_thread=bytes_per_thread,
columns=columns,
row_groups=row_groups,
prefetcher="parquet",
prefetcher_options={
"columns": columns,
# All paths must have the same row-group selection
"row_groups": row_groups[0] if row_groups else None,
},
)
else:
filepath_or_buffer = paths if paths else filepath_or_buffer
Expand Down
67 changes: 58 additions & 9 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024
_OVER_READ_DEFAULT = 1024 * 1024
_READ_AHEAD_DEFAULT = 1024 * 1024

_docstring_remote_sources = """
- cuDF supports local and remote data stores. See configuration details for
Expand Down Expand Up @@ -1614,7 +1614,7 @@ def _get_remote_bytes_parquet(
if fsspec_parquet is None or (columns is None and row_groups is None):
return _get_remote_bytes(
remote_paths, fs, bytes_per_thread=bytes_per_thread
)
), None

sizes = fs.sizes(remote_paths)
data = fsspec_parquet._get_parquet_byte_ranges(
Expand All @@ -1634,15 +1634,15 @@ def _get_remote_bytes_parquet(
chunk, dtype="b"
)
buffers.append(buf.tobytes())
return buffers
return buffers, None


def _get_remote_bytes_lines(
def _get_remote_bytes_contiguous(
remote_paths,
fs,
*,
byte_range=None,
over_read=_OVER_READ_DEFAULT,
read_ahead=None,
bytes_per_thread=None,
):
# Use byte_range to set remote_starts and remote_ends
Expand All @@ -1664,7 +1664,7 @@ def _get_remote_bytes_lines(
fs,
remote_starts=remote_starts,
remote_ends=remote_ends,
over_read=over_read,
read_ahead=read_ahead,
bytes_per_thread=bytes_per_thread,
use_proxy_files=False,
offset=offset,
Expand All @@ -1673,7 +1673,8 @@ def _get_remote_bytes_lines(
# Adjust byte_range to trim unnecessary bytes.
# Note that we keep the byte-range shifted by one
# byte so that the libcudf reader still follows the
# correct code path
# correct code path.
# TODO: Get rid of this strange workaround!
if offset:
byte_range = (offset, byte_range[1])

Expand All @@ -1686,14 +1687,17 @@ def _get_remote_bytes(
*,
remote_starts=None,
remote_ends=None,
over_read=_OVER_READ_DEFAULT,
read_ahead=None,
bytes_per_thread=None,
use_proxy_files=True,
offset=0,
):
if isinstance(remote_paths, str):
remote_paths = [remote_paths]

if read_ahead is None:
read_ahead = _READ_AHEAD_DEFAULT

if remote_starts:
assert len(remote_starts) == len(remote_paths)
else:
Expand All @@ -1703,7 +1707,7 @@ def _get_remote_bytes(
if remote_ends:
assert len(remote_ends) == len(remote_paths)
for i in range(len(remote_ends)):
remote_ends[i] = min(remote_ends[i] + over_read, sizes[i])
remote_ends[i] = min(remote_ends[i] + read_ahead, sizes[i])
else:
remote_ends = sizes

Expand Down Expand Up @@ -1833,6 +1837,51 @@ def _open_remote_files(
]


def prefetch_remote_buffers(
paths,
fs,
*,
expand_paths=False,
bytes_per_thread=_BYTES_PER_THREAD_DEFAULT,
prefetcher=None,
prefetcher_options=None,
):
# TODO: Add Docstring
# Gather bytes ahead of time for remote filesystems
if fs and paths and not _is_local_filesystem(fs):
prefetchers = {
"parquet": _get_remote_bytes_parquet,
"contiguous": _get_remote_bytes_contiguous,
}
if prefetcher not in prefetchers:
raise NotImplementedError(
f"{prefetcher} is not a supported remote-data prefetcher"
)

if expand_paths:
expanded_paths = []
if expand_paths is True:
expand_paths = "*"
for path in paths:
if is_directory(path_or_data=path, fs=fs):
expanded_paths.extend(
fs.glob(fs.sep.join([path, expand_paths]))
)
else:
expanded_paths.append(path)
else:
expanded_paths = paths

return prefetchers.get(prefetcher)(
expanded_paths,
fs,
bytes_per_thread=bytes_per_thread,
**(prefetcher_options or {}),
)
else:
return paths, None


@doc_get_reader_filepath_or_buffer()
def get_reader_filepath_or_buffer(
path_or_data,
Expand Down
14 changes: 10 additions & 4 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
from cudf.utils.dtypes import cudf_dtype_from_pa_type
from cudf.utils.ioutils import (
_ROW_GROUP_SIZE_BYTES_DEFAULT,
_get_remote_bytes_parquet,
_is_local_filesystem,
_open_remote_files,
prefetch_remote_buffers,
)


Expand Down Expand Up @@ -115,11 +115,17 @@ def _read_paths(
else:
# Use fsspec to collect byte ranges for all
# files ahead of time
paths_or_fobs = _get_remote_bytes_parquet(
paths_or_fobs, _ = prefetch_remote_buffers(
paths,
fs,
columns=columns,
row_groups=row_groups[0],
prefetcher="parquet",
prefetcher_options={
"columns": columns,
# All paths must have the same row-group selection
"row_groups": row_groups[0]
if row_groups
else None,
},
)

# Use cudf to read in data
Expand Down