From b5c38bb6a7c69de1d7ed5506333e415deb1e9eb8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Jun 2024 14:10:40 -0700 Subject: [PATCH 01/20] start simplifying remote-io optimizations --- python/cudf/cudf/io/csv.py | 18 ++++ python/cudf/cudf/io/json.py | 1 + python/cudf/cudf/io/parquet.py | 16 +++- python/cudf/cudf/utils/ioutils.py | 137 ++++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index f07764e2ce4..63083b2c067 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -81,9 +81,27 @@ def read_csv( "`read_csv` does not yet support reading multiple files" ) + # Start by trying construct a filesystem object, so we + # can check if this is a remote file + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + # For remote data, we can transfer the necessary + # bytes directly into host memory + if paths and not ( + ioutils._is_local_filesystem(fs) or use_python_file_object + ): + filepath_or_buffer, byte_range = ioutils._get_remote_bytes_csv( + paths, + fs, + byte_range=byte_range, + bytes_per_thread=bytes_per_thread, + ) + filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=compression, + fs=fs, iotypes=(BytesIO, StringIO, NativeFile), use_python_file_object=use_python_file_object, storage_options=storage_options, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index fc3387d5117..d66b40d027a 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -83,6 +83,7 @@ def read_json( tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=compression, + # use_python_file_object=True, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, storage_options=storage_options, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index dbdb2093b72..d41b7d09ca8 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -597,7 +597,21 @@ def read_parquet( categorical_partitions=categorical_partitions, dataset_kwargs=dataset_kwargs, ) - filepath_or_buffer = paths if paths else filepath_or_buffer + + # For remote data, we can transfer the necessary + # bytes directly into host memory + if paths and not ( + ioutils._is_local_filesystem(fs) or use_python_file_object + ): + filepath_or_buffer = ioutils._get_remote_bytes_parquet( + paths, + fs, + bytes_per_thread=bytes_per_thread, + columns=columns, + row_groups=row_groups, + ) + else: + filepath_or_buffer = paths if paths else filepath_or_buffer filepaths_or_buffers = [] if use_python_file_object: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0209c692935..b4d0fd15dd3 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -27,6 +27,7 @@ _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 +_READ_AHEAD_DEFAULT = 1024 * 1024 _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for @@ -1599,6 +1600,142 @@ def _set_context(obj, stack): return stack.enter_context(obj) +def _get_remote_bytes_parquet( + remote_paths, + fs, + *, + columns=None, + row_groups=None, + bytes_per_thread=None, +): + if fsspec_parquet is None or (columns is None and row_groups is None): + return _get_remote_bytes( + remote_paths, fs, bytes_per_thread=bytes_per_thread + ) + + sizes = fs.sizes(remote_paths) + data = fsspec_parquet._get_parquet_byte_ranges( + remote_paths, + fs, + columns=columns, + row_groups=row_groups, + max_block=bytes_per_thread or _BYTES_PER_THREAD_DEFAULT, + ) + buffers = [] + for size, path in zip(sizes, remote_paths): + path_data = data.pop(path) + buf = np.zeros(size, dtype="b") + for range_offset in list(path_data.keys()): + chunk = path_data.pop(range_offset) + buf[range_offset[0] : range_offset[1]] = np.frombuffer( + chunk, dtype="b" + ) + buffers.append(buf.tobytes()) + return buffers + + +def _get_remote_bytes_csv( + remote_paths, + fs, + *, + byte_range=None, + read_ahead=_READ_AHEAD_DEFAULT, + bytes_per_thread=None, +): + # CSV reader only supports single file + assert len(remote_paths) == 1 + + # Use byte_range to set remote_starts and remote_ends + remote_starts = None + remote_ends = None + offset = 0 + if byte_range: + start, stop = byte_range + remote_starts = [start] + if start: + offset = 1 + if stop: + remote_ends = [start + stop] + + # Collect buffers + buffers = _get_remote_bytes( + remote_paths, + fs, + remote_starts=remote_starts, + remote_ends=remote_ends, + read_ahead=read_ahead, + bytes_per_thread=bytes_per_thread, + use_proxy_files=False, + offset=offset, + ) + + # Adjust byte_range to trim unnecessary bytes. + # Note that we keep the byte-range shifted by one + # byte so that the libcudf reader still follows the + # correct code path + if offset: + byte_range = (offset, byte_range[1]) + + return buffers[0], byte_range + + +def _get_remote_bytes( + remote_paths, + fs, + *, + remote_starts=None, + remote_ends=None, + read_ahead=_READ_AHEAD_DEFAULT, + bytes_per_thread=None, + use_proxy_files=True, + offset=0, +): + if isinstance(remote_paths, str): + remote_paths = [remote_paths] + + if remote_starts: + assert len(remote_starts) == len(remote_paths) + else: + remote_starts = [0] * len(remote_paths) + + sizes = fs.sizes(remote_paths) + if remote_ends: + assert len(remote_ends) == len(remote_paths) + for i in range(len(remote_ends)): + remote_ends[i] = min(remote_ends[i] + read_ahead, sizes[i]) + else: + remote_ends = sizes + + # Construct list of paths, starts, and stops + paths, starts, ends = [], [], [] + blocksize = bytes_per_thread or _BYTES_PER_THREAD_DEFAULT + for i, remote_path in enumerate(remote_paths): + for j in range(remote_starts[i], remote_ends[i], blocksize): + paths.append(remote_path) + starts.append(j) + ends.append(min(j + blocksize, remote_ends[i])) + + # Collect the byte ranges + chunks = fs.cat_ranges(paths, starts, ends) + + # Construct local byte buffers + buffers = [] + path_counts = np.unique(paths, return_counts=True)[1] + for i, remote_path in enumerate(remote_paths): + size = ( + sizes[i] if use_proxy_files else remote_ends[i] - remote_starts[i] + ) + buf = np.zeros(size + offset, dtype="b") + for j in range(path_counts[i]): + start, end = starts[i + j] + offset, ends[i + j] + offset + if not use_proxy_files: + start -= remote_starts[i] + end -= remote_starts[i] + buf[start:end] = np.frombuffer(chunks.pop(0), dtype="b") + buffers.append(buf.tobytes()) + return buffers + + def _open_remote_files( paths, fs, From 7361e92900081bf5de246c579d91bed985f0d0e0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Jun 2024 20:36:37 -0700 Subject: [PATCH 02/20] add basic json support --- python/cudf/cudf/io/csv.py | 2 ++ python/cudf/cudf/io/json.py | 24 +++++++++++++++++++++++- python/cudf/cudf/utils/ioutils.py | 23 ++++++++++++----------- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 63083b2c067..c0c9627f183 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -97,6 +97,8 @@ def read_csv( byte_range=byte_range, bytes_per_thread=bytes_per_thread, ) + assert len(filepath_or_buffer) == 1 + filepath_or_buffer = filepath_or_buffer[0] filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index d66b40d027a..3cd659acc8b 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -67,6 +67,28 @@ def read_json( if not is_list_like(path_or_buf): path_or_buf = [path_or_buf] + # Start by trying construct a filesystem object, so we + # can check if this is remote data + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=path_or_buf, storage_options=storage_options + ) + # For remote data, we can transfer the necessary + # bytes directly into host memory + if paths and not ioutils._is_local_filesystem(fs): + expanded_paths = [] + for path in paths: + if ioutils.is_directory(path_or_data=path, fs=fs): + expanded_paths.extend( + fs.glob(fs.sep.join([path, "*.json"])) + ) + else: + expanded_paths.append(path) + path_or_buf, byte_range = ioutils._get_remote_bytes_lines( + expanded_paths, + fs, + byte_range=byte_range, + ) + filepaths_or_buffers = [] for source in path_or_buf: if ioutils.is_directory( @@ -83,7 +105,7 @@ def read_json( tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=compression, - # use_python_file_object=True, + fs=fs, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, storage_options=storage_options, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index b4d0fd15dd3..80ed4eec086 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1538,15 +1538,18 @@ def ensure_single_filepath_or_buffer(path_or_data, storage_options=None): return True -def is_directory(path_or_data, storage_options=None): +def is_directory(path_or_data, storage_options=None, fs=None): """Returns True if the provided filepath is a directory""" path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): path_or_data = os.path.expanduser(path_or_data) try: - fs = get_fs_token_paths( - path_or_data, mode="rb", storage_options=storage_options - )[0] + fs = ( + fs + or get_fs_token_paths( + path_or_data, mode="rb", storage_options=storage_options + )[0] + ) except ValueError as e: if str(e).startswith("Protocol not known"): return False @@ -1634,7 +1637,7 @@ def _get_remote_bytes_parquet( return buffers -def _get_remote_bytes_csv( +def _get_remote_bytes_lines( remote_paths, fs, *, @@ -1642,20 +1645,18 @@ def _get_remote_bytes_csv( read_ahead=_READ_AHEAD_DEFAULT, bytes_per_thread=None, ): - # CSV reader only supports single file - assert len(remote_paths) == 1 - # Use byte_range to set remote_starts and remote_ends remote_starts = None remote_ends = None offset = 0 if byte_range: + assert len(remote_paths) == 1 start, stop = byte_range - remote_starts = [start] + remote_starts = [start] * len(remote_paths) if start: offset = 1 if stop: - remote_ends = [start + stop] + remote_ends = [start + stop] * len(remote_paths) # Collect buffers buffers = _get_remote_bytes( @@ -1676,7 +1677,7 @@ def _get_remote_bytes_csv( if offset: byte_range = (offset, byte_range[1]) - return buffers[0], byte_range + return buffers, byte_range def _get_remote_bytes( From 7472446742716a9da45c7d38009431553f225cb0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Jun 2024 21:01:15 -0700 Subject: [PATCH 03/20] only use _fsspec_data_transfer for file-like objects --- python/cudf/cudf/utils/ioutils.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 80ed4eec086..50c330a7942 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1943,17 +1943,20 @@ def get_reader_filepath_or_buffer( **(open_file_options or {}), ) else: - path_or_data = [ - BytesIO( - _fsspec_data_transfer( - fpath, - fs=fs, - mode=mode, - bytes_per_thread=bytes_per_thread, - ) - ) - for fpath in paths - ] + path_or_data = _get_remote_bytes( + paths, fs, bytes_per_thread=bytes_per_thread + ) + # path_or_data = [ + # BytesIO( + # _fsspec_data_transfer( + # fpath, + # fs=fs, + # mode=mode, + # bytes_per_thread=bytes_per_thread, + # ) + # ) + # for fpath in paths + # ] if len(path_or_data) == 1: path_or_data = path_or_data[0] From 8b51952c0d7e864b13b8ffbbbf99274d33161926 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Jun 2024 21:17:03 -0700 Subject: [PATCH 04/20] add read_text support for byte_range opt --- python/cudf/cudf/io/csv.py | 7 ++----- python/cudf/cudf/io/json.py | 5 +---- python/cudf/cudf/io/text.py | 16 +++++++++++++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index c0c9627f183..2f122cd2a60 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -81,17 +81,14 @@ def read_csv( "`read_csv` does not yet support reading multiple files" ) - # Start by trying construct a filesystem object, so we - # can check if this is a remote file + # Check if this is a remote file fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=storage_options ) - # For remote data, we can transfer the necessary - # bytes directly into host memory if paths and not ( ioutils._is_local_filesystem(fs) or use_python_file_object ): - filepath_or_buffer, byte_range = ioutils._get_remote_bytes_csv( + filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( paths, fs, byte_range=byte_range, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 3cd659acc8b..c70abf11c19 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -67,13 +67,10 @@ def read_json( if not is_list_like(path_or_buf): path_or_buf = [path_or_buf] - # Start by trying construct a filesystem object, so we - # can check if this is remote data + # Check if this is remote data fs, paths = ioutils._get_filesystem_and_paths( path_or_data=path_or_buf, storage_options=storage_options ) - # For remote data, we can transfer the necessary - # bytes directly into host memory if paths and not ioutils._is_local_filesystem(fs): expanded_paths = [] for path in paths: diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index 0e19972f6e0..fa9848cc879 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. from io import BytesIO, StringIO @@ -24,9 +24,23 @@ def read_text( if delimiter is None: raise ValueError("delimiter needs to be provided") + # Check if this is a remote file + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + if paths and not ioutils._is_local_filesystem(fs): + filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( + paths, + fs, + byte_range=byte_range, + ) + assert len(filepath_or_buffer) == 1 + filepath_or_buffer = filepath_or_buffer[0] + filepath_or_buffer, _ = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=None, + fs=fs, iotypes=(BytesIO, StringIO), storage_options=storage_options, ) From 778ceeb51dd1f8eb1c42c7c82da95e0c1b34c98d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 14 Jun 2024 12:39:07 -0700 Subject: [PATCH 05/20] start experimental dask changes --- python/cudf/cudf/io/csv.py | 8 +- python/cudf/cudf/io/json.py | 2 +- python/cudf/cudf/io/parquet.py | 20 ++- python/cudf/cudf/io/text.py | 2 +- python/cudf/cudf/utils/ioutils.py | 56 +++------ python/dask_cudf/dask_cudf/io/parquet.py | 150 +++++++++++++++++------ 6 files changed, 150 insertions(+), 88 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 2f122cd2a60..0ca15fb844a 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -50,7 +50,7 @@ def read_csv( comment=None, delim_whitespace=False, byte_range=None, - use_python_file_object=True, + use_python_file_object=False, storage_options=None, bytes_per_thread=None, ): @@ -85,8 +85,10 @@ def read_csv( fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=storage_options ) - if paths and not ( - ioutils._is_local_filesystem(fs) or use_python_file_object + if ( + fs + and paths + and not (ioutils._is_local_filesystem(fs) or use_python_file_object) ): filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( paths, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index c70abf11c19..87a566772c9 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -71,7 +71,7 @@ def read_json( fs, paths = ioutils._get_filesystem_and_paths( path_or_data=path_or_buf, storage_options=storage_options ) - if paths and not ioutils._is_local_filesystem(fs): + if fs and paths and not ioutils._is_local_filesystem(fs): expanded_paths = [] for path in paths: if ioutils.is_directory(path_or_data=path, fs=fs): diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index d41b7d09ca8..b92dc85c7f8 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -331,8 +331,15 @@ def read_parquet_metadata(filepath_or_buffer): path_or_data=filepath_or_buffer, storage_options=None ) - # Check if filepath or buffer - filepath_or_buffer = paths if paths else filepath_or_buffer + if paths and not ioutils._is_local_filesystem(fs): + filepath_or_buffer = ioutils._get_remote_bytes_parquet( + paths, + fs, + columns=[], + row_groups=[], + ) + else: + filepath_or_buffer = paths if paths else filepath_or_buffer # List of filepaths or buffers filepaths_or_buffers = [] @@ -342,7 +349,6 @@ def read_parquet_metadata(filepath_or_buffer): path_or_data=source, compression=None, fs=fs, - use_python_file_object=True, open_file_options=None, storage_options=None, bytes_per_thread=None, @@ -524,7 +530,7 @@ def read_parquet( filters=None, row_groups=None, use_pandas_metadata=True, - use_python_file_object=True, + use_python_file_object=False, categorical_partitions=True, open_file_options=None, bytes_per_thread=None, @@ -600,8 +606,10 @@ def read_parquet( # For remote data, we can transfer the necessary # bytes directly into host memory - if paths and not ( - ioutils._is_local_filesystem(fs) or use_python_file_object + if ( + fs + and paths + and not (ioutils._is_local_filesystem(fs) or use_python_file_object) ): filepath_or_buffer = ioutils._get_remote_bytes_parquet( paths, diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index fa9848cc879..c68d3261d34 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -28,7 +28,7 @@ def read_text( fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=storage_options ) - if paths and not ioutils._is_local_filesystem(fs): + if fs and paths and not ioutils._is_local_filesystem(fs): filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( paths, fs, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 50c330a7942..e41da2aed7f 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -27,7 +27,7 @@ _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 -_READ_AHEAD_DEFAULT = 1024 * 1024 +_OVER_READ_DEFAULT = 1024 * 1024 _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for @@ -1642,7 +1642,7 @@ def _get_remote_bytes_lines( fs, *, byte_range=None, - read_ahead=_READ_AHEAD_DEFAULT, + over_read=_OVER_READ_DEFAULT, bytes_per_thread=None, ): # Use byte_range to set remote_starts and remote_ends @@ -1664,7 +1664,7 @@ def _get_remote_bytes_lines( fs, remote_starts=remote_starts, remote_ends=remote_ends, - read_ahead=read_ahead, + over_read=over_read, bytes_per_thread=bytes_per_thread, use_proxy_files=False, offset=offset, @@ -1686,7 +1686,7 @@ def _get_remote_bytes( *, remote_starts=None, remote_ends=None, - read_ahead=_READ_AHEAD_DEFAULT, + over_read=_OVER_READ_DEFAULT, bytes_per_thread=None, use_proxy_files=True, offset=0, @@ -1703,7 +1703,7 @@ def _get_remote_bytes( if remote_ends: assert len(remote_ends) == len(remote_paths) for i in range(len(remote_ends)): - remote_ends[i] = min(remote_ends[i] + read_ahead, sizes[i]) + remote_ends[i] = min(remote_ends[i] + over_read, sizes[i]) else: remote_ends = sizes @@ -1850,6 +1850,13 @@ def get_reader_filepath_or_buffer( ): """{docstring}""" + if use_python_file_object: + warnings.warn( + "The 'use_python_file_object' parameter is deprecated and " + "will be removed in a future version of cudf.", + FutureWarning, + ) + path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): @@ -1946,17 +1953,6 @@ def get_reader_filepath_or_buffer( path_or_data = _get_remote_bytes( paths, fs, bytes_per_thread=bytes_per_thread ) - # path_or_data = [ - # BytesIO( - # _fsspec_data_transfer( - # fpath, - # fs=fs, - # mode=mode, - # bytes_per_thread=bytes_per_thread, - # ) - # ) - # for fpath in paths - # ] if len(path_or_data) == 1: path_or_data = path_or_data[0] @@ -1966,11 +1962,8 @@ def get_reader_filepath_or_buffer( if use_python_file_object: path_or_data = ArrowPythonFile(path_or_data) else: - path_or_data = BytesIO( - _fsspec_data_transfer( - path_or_data, mode=mode, bytes_per_thread=bytes_per_thread - ) - ) + # Remote file is already open - Just read it + path_or_data = BytesIO(path_or_data.read()) return path_or_data, compression @@ -2263,27 +2256,6 @@ def _fsspec_data_transfer( return buf.tobytes() -def _merge_ranges(byte_ranges, max_block=256_000_000, max_gap=64_000): - # Simple utility to merge small/adjacent byte ranges - new_ranges = [] - if not byte_ranges: - # Early return - return new_ranges - - offset, size = byte_ranges[0] - for new_offset, new_size in byte_ranges[1:]: - gap = new_offset - (offset + size) - if gap > max_gap or (size + new_size + gap) > max_block: - # Gap is too large or total read is too large - new_ranges.append((offset, size)) - offset = new_offset - size = new_size - continue - size += new_size + gap - new_ranges.append((offset, size)) - return new_ranges - - def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): if fs is None: # We have an open fsspec file object diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index ba8b1e89721..5e8ca6c37a0 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -4,6 +4,7 @@ from contextlib import ExitStack from functools import partial from io import BufferedWriter, BytesIO, IOBase +from operator import getitem import numpy as np import pandas as pd @@ -97,21 +98,75 @@ def _read_paths( dataset_kwargs = dataset_kwargs or {} dataset_kwargs["partitioning"] = partitioning or "hive" - with ExitStack() as stack: - # Non-local filesystem handling - paths_or_fobs = paths - if not _is_local_filesystem(fs): - paths_or_fobs = _open_remote_files( - paths_or_fobs, - fs, - context_stack=stack, - **_default_open_file_options( - open_file_options, columns, row_groups - ), - ) - # Use cudf to read in data - try: + if False: + from cudf.utils.ioutils import _get_remote_bytes_parquet + + if False: + paths_or_fobs = paths + if not _is_local_filesystem(fs): + from concurrent.futures import ( + ThreadPoolExecutor as PoolExecutor, + ) + + from dask import config + from dask.base import tokenize + from dask.distributed import get_worker + from dask.threaded import get + from dask.utils import apply + + # from dask.multiprocessing import get + # from concurrent.futures import ProcessPoolExecutor as PoolExecutor + + token = tokenize(paths) + chunk_name = f"read-chunk-{token}" + dsk = { + (chunk_name, i): ( + getitem, + ( + apply, + _get_remote_bytes_parquet, + [[path], fs], + dict( + columns=columns, row_groups=row_groups[i] + ), + ), + 0, + ) + for i, path in enumerate(paths) + } + dsk[chunk_name] = ( + partial( + cudf.read_parquet, + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ), + list(dsk.keys()), + ) + + try: + worker = get_worker() + if not hasattr(worker, "_rapids_executor"): + num_threads = 10 + worker._rapids_executor = PoolExecutor(num_threads) + with config.set(pool=worker._rapids_executor): + df = get(dsk, chunk_name) + except ValueError: + df = get(dsk, chunk_name) + + else: + paths_or_fobs = paths + if not _is_local_filesystem(fs): + paths_or_fobs = _get_remote_bytes_parquet( + paths, + fs, + columns=columns, + row_groups=row_groups[0], + ) + df = cudf.read_parquet( paths_or_fobs, engine="cudf", @@ -121,28 +176,53 @@ def _read_paths( categorical_partitions=False, **kwargs, ) - except RuntimeError as err: - # TODO: Remove try/except after null-schema issue is resolved - # (See: https://github.com/rapidsai/cudf/issues/12702) - if len(paths_or_fobs) > 1: - df = cudf.concat( - [ - cudf.read_parquet( - pof, - engine="cudf", - columns=columns, - row_groups=row_groups[i] - if row_groups - else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - for i, pof in enumerate(paths_or_fobs) - ] + else: + with ExitStack() as stack: + # Non-local filesystem handling + paths_or_fobs = paths + if not _is_local_filesystem(fs): + paths_or_fobs = _open_remote_files( + paths_or_fobs, + fs, + context_stack=stack, + **_default_open_file_options( + open_file_options, columns, row_groups + ), + ) + + # Use cudf to read in data + try: + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, ) - else: - raise err + except RuntimeError as err: + # TODO: Remove try/except after null-schema issue is resolved + # (See: https://github.com/rapidsai/cudf/issues/12702) + if len(paths_or_fobs) > 1: + df = cudf.concat( + [ + cudf.read_parquet( + pof, + engine="cudf", + columns=columns, + row_groups=row_groups[i] + if row_groups + else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) + for i, pof in enumerate(paths_or_fobs) + ] + ) + else: + raise err # Apply filters (if any are defined) df = _apply_post_filters(df, filters) From b55fce9e6adf222b0a522e454748e9c620176673 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 1 Jul 2024 12:29:21 -0700 Subject: [PATCH 06/20] simplify --- python/dask_cudf/dask_cudf/io/parquet.py | 147 +++-------------------- 1 file changed, 18 insertions(+), 129 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index f9bb380e780..08d95e64d8a 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,10 +1,8 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import itertools import warnings -from contextlib import ExitStack from functools import partial from io import BufferedWriter, BytesIO, IOBase -from operator import getitem import numpy as np import pandas as pd @@ -23,16 +21,11 @@ import cudf from cudf.core.column import as_column, build_categorical_column from cudf.io import write_to_dataset -from cudf.io.parquet import ( - _apply_post_filters, - _default_open_file_options, - _normalize_filters, -) +from cudf.io.parquet import _apply_post_filters, _normalize_filters from cudf.utils.dtypes import cudf_dtype_from_pa_type from cudf.utils.ioutils import ( _ROW_GROUP_SIZE_BYTES_DEFAULT, _is_local_filesystem, - _open_remote_files, ) @@ -99,130 +92,26 @@ def _read_paths( dataset_kwargs = dataset_kwargs or {} dataset_kwargs["partitioning"] = partitioning or "hive" - if False: + paths_or_fobs = paths + if not _is_local_filesystem(fs): from cudf.utils.ioutils import _get_remote_bytes_parquet - if False: - paths_or_fobs = paths - if not _is_local_filesystem(fs): - from concurrent.futures import ( - ThreadPoolExecutor as PoolExecutor, - ) - - from dask import config - from dask.base import tokenize - from dask.distributed import get_worker - from dask.threaded import get - from dask.utils import apply - - # from dask.multiprocessing import get - # from concurrent.futures import ProcessPoolExecutor as PoolExecutor - - token = tokenize(paths) - chunk_name = f"read-chunk-{token}" - dsk = { - (chunk_name, i): ( - getitem, - ( - apply, - _get_remote_bytes_parquet, - [[path], fs], - dict( - columns=columns, row_groups=row_groups[i] - ), - ), - 0, - ) - for i, path in enumerate(paths) - } - dsk[chunk_name] = ( - partial( - cudf.read_parquet, - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ), - list(dsk.keys()), - ) - - try: - worker = get_worker() - if not hasattr(worker, "_rapids_executor"): - num_threads = 10 - worker._rapids_executor = PoolExecutor(num_threads) - with config.set(pool=worker._rapids_executor): - df = get(dsk, chunk_name) - except ValueError: - df = get(dsk, chunk_name) - - else: - paths_or_fobs = paths - if not _is_local_filesystem(fs): - paths_or_fobs = _get_remote_bytes_parquet( - paths, - fs, - columns=columns, - row_groups=row_groups[0], - ) - - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - else: - with ExitStack() as stack: - # Non-local filesystem handling - paths_or_fobs = paths - if not _is_local_filesystem(fs): - paths_or_fobs = _open_remote_files( - paths_or_fobs, - fs, - context_stack=stack, - **_default_open_file_options( - open_file_options, columns, row_groups - ), - ) + paths_or_fobs = _get_remote_bytes_parquet( + paths, + fs, + columns=columns, + row_groups=row_groups[0], + ) - # Use cudf to read in data - try: - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - except RuntimeError as err: - # TODO: Remove try/except after null-schema issue is resolved - # (See: https://github.com/rapidsai/cudf/issues/12702) - if len(paths_or_fobs) > 1: - df = cudf.concat( - [ - cudf.read_parquet( - pof, - engine="cudf", - columns=columns, - row_groups=row_groups[i] - if row_groups - else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - for i, pof in enumerate(paths_or_fobs) - ] - ) - else: - raise err + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) # Apply filters (if any are defined) df = _apply_post_filters(df, filters) From 087438861428a100f35490debfd6d3232690b758 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 07:43:34 -0700 Subject: [PATCH 07/20] improve backward compatibility during deprecation cycle (in dask-cudf) --- python/dask_cudf/dask_cudf/io/parquet.py | 58 ++++++++++++++++-------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 08d95e64d8a..22f6e9efe3a 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,6 +1,7 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import itertools import warnings +from contextlib import ExitStack from functools import partial from io import BufferedWriter, BytesIO, IOBase @@ -21,11 +22,17 @@ import cudf from cudf.core.column import as_column, build_categorical_column from cudf.io import write_to_dataset -from cudf.io.parquet import _apply_post_filters, _normalize_filters +from cudf.io.parquet import ( + _apply_post_filters, + _default_open_file_options, + _normalize_filters, +) from cudf.utils.dtypes import cudf_dtype_from_pa_type from cudf.utils.ioutils import ( _ROW_GROUP_SIZE_BYTES_DEFAULT, + _get_remote_bytes_parquet, _is_local_filesystem, + _open_remote_files, ) @@ -91,28 +98,41 @@ def _read_paths( dataset_kwargs = dataset_kwargs or {} dataset_kwargs["partitioning"] = partitioning or "hive" + with ExitStack() as stack: + # Non-local filesystem handling + paths_or_fobs = paths + if not _is_local_filesystem(fs): + if kwargs.get("use_python_file_object", False): + # Use deprecated NativeFile code path in cudf + paths_or_fobs = _open_remote_files( + paths_or_fobs, + fs, + context_stack=stack, + **_default_open_file_options( + open_file_options, columns, row_groups + ), + ) + else: + # Use fsspec to collect byte ranges for all + # files ahead of time + paths_or_fobs = _get_remote_bytes_parquet( + paths, + fs, + columns=columns, + row_groups=row_groups[0], + ) - paths_or_fobs = paths - if not _is_local_filesystem(fs): - from cudf.utils.ioutils import _get_remote_bytes_parquet - - paths_or_fobs = _get_remote_bytes_parquet( - paths, - fs, + # Use cudf to read in data + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", columns=columns, - row_groups=row_groups[0], + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, ) - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - # Apply filters (if any are defined) df = _apply_post_filters(df, filters) From f12b20f93fca3322819b64785e635b0efe42061f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 08:51:45 -0700 Subject: [PATCH 08/20] remove deprecation warnings --- python/cudf/cudf/utils/ioutils.py | 7 ---- python/dask_cudf/dask_cudf/io/parquet.py | 41 ++++++++++++++++++------ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index e41da2aed7f..f7223720b91 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1850,13 +1850,6 @@ def get_reader_filepath_or_buffer( ): """{docstring}""" - if use_python_file_object: - warnings.warn( - "The 'use_python_file_object' parameter is deprecated and " - "will be removed in a future version of cudf.", - FutureWarning, - ) - path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 22f6e9efe3a..e4167132598 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -123,15 +123,38 @@ def _read_paths( ) # Use cudf to read in data - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) + try: + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) + except RuntimeError as err: + # TODO: Remove try/except after null-schema issue is resolved + # (See: https://github.com/rapidsai/cudf/issues/12702) + if len(paths_or_fobs) > 1: + df = cudf.concat( + [ + cudf.read_parquet( + pof, + engine="cudf", + columns=columns, + row_groups=row_groups[i] + if row_groups + else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) + for i, pof in enumerate(paths_or_fobs) + ] + ) + else: + raise err # Apply filters (if any are defined) df = _apply_post_filters(df, filters) From e265d40188d20db4db514677fc975c9cee944075 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 12:12:17 -0700 Subject: [PATCH 09/20] refactor --- python/cudf/cudf/io/csv.py | 19 ++++--- python/cudf/cudf/io/json.py | 34 +++++------- python/cudf/cudf/io/orc.py | 24 ++++++--- python/cudf/cudf/io/parquet.py | 19 ++++--- python/cudf/cudf/utils/ioutils.py | 67 ++++++++++++++++++++---- python/dask_cudf/dask_cudf/io/parquet.py | 14 +++-- 6 files changed, 120 insertions(+), 57 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 7e4917597bb..6be9e2c5fbe 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -53,6 +53,7 @@ def read_csv( use_python_file_object=False, storage_options=None, bytes_per_thread=None, + prefetch_read_ahead=None, ): """{docstring}""" @@ -81,20 +82,22 @@ def read_csv( "`read_csv` does not yet support reading multiple files" ) - # Check if this is a remote file + # Extract filesystem up front fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=storage_options ) - if ( - fs - and paths - and not (ioutils._is_local_filesystem(fs) or use_python_file_object) - ): - filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( + + # Prefetch remote data if possible + if not use_python_file_object: + filepath_or_buffer, byte_range = ioutils.prefetch_remote_buffers( paths, fs, - byte_range=byte_range, bytes_per_thread=bytes_per_thread, + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, ) assert len(filepath_or_buffer) == 1 filepath_or_buffer = filepath_or_buffer[0] diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 87a566772c9..cd5299009ce 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -28,6 +28,7 @@ def read_json( mixed_types_as_string=False, prune_columns=False, on_bad_lines="error", + prefetch_read_ahead=None, *args, **kwargs, ): @@ -67,35 +68,28 @@ def read_json( if not is_list_like(path_or_buf): path_or_buf = [path_or_buf] - # Check if this is remote data + # Extract filesystem up front fs, paths = ioutils._get_filesystem_and_paths( path_or_data=path_or_buf, storage_options=storage_options ) - if fs and paths and not ioutils._is_local_filesystem(fs): - expanded_paths = [] - for path in paths: - if ioutils.is_directory(path_or_data=path, fs=fs): - expanded_paths.extend( - fs.glob(fs.sep.join([path, "*.json"])) - ) - else: - expanded_paths.append(path) - path_or_buf, byte_range = ioutils._get_remote_bytes_lines( - expanded_paths, - fs, - byte_range=byte_range, - ) + + # Prefetch remote data if possible + path_or_buf, byte_range = ioutils.prefetch_remote_buffers( + paths, + fs, + expand_paths="*.json", + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, + ) filepaths_or_buffers = [] for source in path_or_buf: if ioutils.is_directory( path_or_data=source, storage_options=storage_options ): - fs = ioutils._ensure_filesystem( - passed_filesystem=None, - path=source, - storage_options=storage_options, - ) source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index 7082a85237a..aa37dc70cc9 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -280,7 +280,7 @@ def read_orc( num_rows=None, use_index=True, timestamp_type=None, - use_python_file_object=True, + use_python_file_object=False, storage_options=None, bytes_per_thread=None, ): @@ -319,22 +319,34 @@ def read_orc( "A list of stripes must be provided for each input source" ) + # Extract filesystem up front + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + + # Prefetch remote data if possible + if not use_python_file_object: + # TODO: Add prefetcher="orc" + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( + paths, + fs, + bytes_per_thread=bytes_per_thread, + expand_paths="*.orc", + prefetcher="contiguous", + ) + filepaths_or_buffers = [] for source in filepath_or_buffer: if ioutils.is_directory( path_or_data=source, storage_options=storage_options ): - fs = ioutils._ensure_filesystem( - passed_filesystem=None, - path=source, - storage_options=storage_options, - ) source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=None, + fs=fs, use_python_file_object=use_python_file_object, storage_options=storage_options, bytes_per_thread=bytes_per_thread, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index baf54f3b965..134140f0884 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -604,19 +604,18 @@ def read_parquet( dataset_kwargs=dataset_kwargs, ) - # For remote data, we can transfer the necessary - # bytes directly into host memory - if ( - fs - and paths - and not (ioutils._is_local_filesystem(fs) or use_python_file_object) - ): - filepath_or_buffer = ioutils._get_remote_bytes_parquet( + # Prefetch remote data if possible + if not use_python_file_object: + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, fs, bytes_per_thread=bytes_per_thread, - columns=columns, - row_groups=row_groups, + prefetcher="parquet", + prefetcher_options={ + "columns": columns, + # All paths must have the same row-group selection + "row_groups": row_groups[0] if row_groups else None, + }, ) else: filepath_or_buffer = paths if paths else filepath_or_buffer diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index f7223720b91..2be291afe8f 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -27,7 +27,7 @@ _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 -_OVER_READ_DEFAULT = 1024 * 1024 +_READ_AHEAD_DEFAULT = 1024 * 1024 _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for @@ -1614,7 +1614,7 @@ def _get_remote_bytes_parquet( if fsspec_parquet is None or (columns is None and row_groups is None): return _get_remote_bytes( remote_paths, fs, bytes_per_thread=bytes_per_thread - ) + ), None sizes = fs.sizes(remote_paths) data = fsspec_parquet._get_parquet_byte_ranges( @@ -1634,15 +1634,15 @@ def _get_remote_bytes_parquet( chunk, dtype="b" ) buffers.append(buf.tobytes()) - return buffers + return buffers, None -def _get_remote_bytes_lines( +def _get_remote_bytes_contiguous( remote_paths, fs, *, byte_range=None, - over_read=_OVER_READ_DEFAULT, + read_ahead=None, bytes_per_thread=None, ): # Use byte_range to set remote_starts and remote_ends @@ -1664,7 +1664,7 @@ def _get_remote_bytes_lines( fs, remote_starts=remote_starts, remote_ends=remote_ends, - over_read=over_read, + read_ahead=read_ahead, bytes_per_thread=bytes_per_thread, use_proxy_files=False, offset=offset, @@ -1673,7 +1673,8 @@ def _get_remote_bytes_lines( # Adjust byte_range to trim unnecessary bytes. # Note that we keep the byte-range shifted by one # byte so that the libcudf reader still follows the - # correct code path + # correct code path. + # TODO: Get rid of this strange workaround! if offset: byte_range = (offset, byte_range[1]) @@ -1686,7 +1687,7 @@ def _get_remote_bytes( *, remote_starts=None, remote_ends=None, - over_read=_OVER_READ_DEFAULT, + read_ahead=None, bytes_per_thread=None, use_proxy_files=True, offset=0, @@ -1694,6 +1695,9 @@ def _get_remote_bytes( if isinstance(remote_paths, str): remote_paths = [remote_paths] + if read_ahead is None: + read_ahead = _READ_AHEAD_DEFAULT + if remote_starts: assert len(remote_starts) == len(remote_paths) else: @@ -1703,7 +1707,7 @@ def _get_remote_bytes( if remote_ends: assert len(remote_ends) == len(remote_paths) for i in range(len(remote_ends)): - remote_ends[i] = min(remote_ends[i] + over_read, sizes[i]) + remote_ends[i] = min(remote_ends[i] + read_ahead, sizes[i]) else: remote_ends = sizes @@ -1833,6 +1837,51 @@ def _open_remote_files( ] +def prefetch_remote_buffers( + paths, + fs, + *, + expand_paths=False, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, + prefetcher=None, + prefetcher_options=None, +): + # TODO: Add Docstring + # Gather bytes ahead of time for remote filesystems + if fs and paths and not _is_local_filesystem(fs): + prefetchers = { + "parquet": _get_remote_bytes_parquet, + "contiguous": _get_remote_bytes_contiguous, + } + if prefetcher not in prefetchers: + raise NotImplementedError( + f"{prefetcher} is not a supported remote-data prefetcher" + ) + + if expand_paths: + expanded_paths = [] + if expand_paths is True: + expand_paths = "*" + for path in paths: + if is_directory(path_or_data=path, fs=fs): + expanded_paths.extend( + fs.glob(fs.sep.join([path, expand_paths])) + ) + else: + expanded_paths.append(path) + else: + expanded_paths = paths + + return prefetchers.get(prefetcher)( + expanded_paths, + fs, + bytes_per_thread=bytes_per_thread, + **(prefetcher_options or {}), + ) + else: + return paths, None + + @doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index e4167132598..441b6ac40fc 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -30,9 +30,9 @@ from cudf.utils.dtypes import cudf_dtype_from_pa_type from cudf.utils.ioutils import ( _ROW_GROUP_SIZE_BYTES_DEFAULT, - _get_remote_bytes_parquet, _is_local_filesystem, _open_remote_files, + prefetch_remote_buffers, ) @@ -115,11 +115,17 @@ def _read_paths( else: # Use fsspec to collect byte ranges for all # files ahead of time - paths_or_fobs = _get_remote_bytes_parquet( + paths_or_fobs, _ = prefetch_remote_buffers( paths, fs, - columns=columns, - row_groups=row_groups[0], + prefetcher="parquet", + prefetcher_options={ + "columns": columns, + # All paths must have the same row-group selection + "row_groups": row_groups[0] + if row_groups + else None, + }, ) # Use cudf to read in data From 85a1cf6b867b841d0aadf36d60872a9cd2552956 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 12:19:20 -0700 Subject: [PATCH 10/20] update metadata reader --- python/cudf/cudf/io/parquet.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 134140f0884..6fe4d68ed6e 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -332,11 +332,14 @@ def read_parquet_metadata(filepath_or_buffer): ) if paths and not ioutils._is_local_filesystem(fs): - filepath_or_buffer = ioutils._get_remote_bytes_parquet( + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, fs, - columns=[], - row_groups=[], + prefetcher="parquet", + prefetcher_options={ + "columns": [], + "row_groups": [], + }, ) else: filepath_or_buffer = paths if paths else filepath_or_buffer From c3f48cc7e56cfb3980939eb7d9c1c81d42194505 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 12:31:26 -0700 Subject: [PATCH 11/20] resolve json behavior --- python/cudf/cudf/io/json.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index cd5299009ce..55403389f5f 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -73,22 +73,22 @@ def read_json( path_or_data=path_or_buf, storage_options=storage_options ) - # Prefetch remote data if possible - path_or_buf, byte_range = ioutils.prefetch_remote_buffers( - paths, - fs, - expand_paths="*.json", - prefetcher="contiguous", - prefetcher_options={ - "byte_range": byte_range, - "read_ahead": prefetch_read_ahead, - }, - ) + # # Prefetch remote data if possible + # path_or_buf, byte_range = ioutils.prefetch_remote_buffers( + # paths, + # fs, + # expand_paths="*.json", + # prefetcher="contiguous", + # prefetcher_options={ + # "byte_range": byte_range, + # "read_ahead": prefetch_read_ahead, + # }, + # ) filepaths_or_buffers = [] for source in path_or_buf: if ioutils.is_directory( - path_or_data=source, storage_options=storage_options + path_or_data=source, storage_options=storage_options, fs=fs ): source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) @@ -96,7 +96,6 @@ def read_json( tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=compression, - fs=fs, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, storage_options=storage_options, From 373b37a6ffb0d7eb406559d2fbc552c14a9cecd7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 13:00:14 -0700 Subject: [PATCH 12/20] json fix --- python/cudf/cudf/io/csv.py | 5 +++-- python/cudf/cudf/io/json.py | 24 +++++++++++++----------- python/cudf/cudf/io/orc.py | 2 +- python/cudf/cudf/io/parquet.py | 4 ++-- python/cudf/cudf/utils/ioutils.py | 8 ++++---- 5 files changed, 23 insertions(+), 20 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 6be9e2c5fbe..e40ac0476bc 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -88,8 +88,8 @@ def read_csv( ) # Prefetch remote data if possible - if not use_python_file_object: - filepath_or_buffer, byte_range = ioutils.prefetch_remote_buffers( + if fs and paths and not use_python_file_object: + filepath_or_buffer, info = ioutils.prefetch_remote_buffers( paths, fs, bytes_per_thread=bytes_per_thread, @@ -101,6 +101,7 @@ def read_csv( ) assert len(filepath_or_buffer) == 1 filepath_or_buffer = filepath_or_buffer[0] + byte_range = info.get("byte_range", byte_range) filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 55403389f5f..b56c12781f0 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -73,17 +73,19 @@ def read_json( path_or_data=path_or_buf, storage_options=storage_options ) - # # Prefetch remote data if possible - # path_or_buf, byte_range = ioutils.prefetch_remote_buffers( - # paths, - # fs, - # expand_paths="*.json", - # prefetcher="contiguous", - # prefetcher_options={ - # "byte_range": byte_range, - # "read_ahead": prefetch_read_ahead, - # }, - # ) + # Prefetch remote data if possible + if fs and paths: + path_or_buf, info = ioutils.prefetch_remote_buffers( + paths, + fs, + expand_paths="*.json", + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, + ) + byte_range = info.get("byte_range", byte_range) filepaths_or_buffers = [] for source in path_or_buf: diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index aa37dc70cc9..b594717b3cf 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -325,7 +325,7 @@ def read_orc( ) # Prefetch remote data if possible - if not use_python_file_object: + if fs and paths and not use_python_file_object: # TODO: Add prefetcher="orc" filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 6fe4d68ed6e..9d00d3a638d 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -331,7 +331,7 @@ def read_parquet_metadata(filepath_or_buffer): path_or_data=filepath_or_buffer, storage_options=None ) - if paths and not ioutils._is_local_filesystem(fs): + if fs and paths: filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, fs, @@ -608,7 +608,7 @@ def read_parquet( ) # Prefetch remote data if possible - if not use_python_file_object: + if fs and paths and not use_python_file_object: filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, fs, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 2be291afe8f..ed46be55ff1 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1614,7 +1614,7 @@ def _get_remote_bytes_parquet( if fsspec_parquet is None or (columns is None and row_groups is None): return _get_remote_bytes( remote_paths, fs, bytes_per_thread=bytes_per_thread - ), None + ), {} sizes = fs.sizes(remote_paths) data = fsspec_parquet._get_parquet_byte_ranges( @@ -1634,7 +1634,7 @@ def _get_remote_bytes_parquet( chunk, dtype="b" ) buffers.append(buf.tobytes()) - return buffers, None + return buffers, {} def _get_remote_bytes_contiguous( @@ -1678,7 +1678,7 @@ def _get_remote_bytes_contiguous( if offset: byte_range = (offset, byte_range[1]) - return buffers, byte_range + return buffers, {"byte_range": byte_range} def _get_remote_bytes( @@ -1879,7 +1879,7 @@ def prefetch_remote_buffers( **(prefetcher_options or {}), ) else: - return paths, None + return paths, {} @doc_get_reader_filepath_or_buffer() From 2b00674142f8516defbb955878855477f4d5e4b4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 13:16:06 -0700 Subject: [PATCH 13/20] remove open_file_options from test_read_parquet_filters --- python/cudf/cudf/tests/test_s3.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index a44bf791767..ea815323787 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -363,8 +363,7 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): assert_eq(expect, got) -@pytest.mark.parametrize("precache", [None, "parquet"]) -def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): +def test_read_parquet_filters(s3_base, s3so, pdf_ext): fname = "test_parquet_reader_filters.parquet" bucket = "parquet" buffer = BytesIO() @@ -376,7 +375,6 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): f"s3://{bucket}/{fname}", storage_options=s3so, filters=filters, - open_file_options={"precache_options": {"method": precache}}, ) # All row-groups should be filtered out From 6658f34669709a8437169b6aba05d27855914b93 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Jul 2024 13:39:17 -0700 Subject: [PATCH 14/20] update --- python/cudf/cudf/io/orc.py | 4 ++-- python/cudf/cudf/io/text.py | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index b594717b3cf..2ec99dd5f7d 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -326,7 +326,7 @@ def read_orc( # Prefetch remote data if possible if fs and paths and not use_python_file_object: - # TODO: Add prefetcher="orc" + # TODO: Add prefetcher for partial IO filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( paths, fs, @@ -338,7 +338,7 @@ def read_orc( filepaths_or_buffers = [] for source in filepath_or_buffer: if ioutils.is_directory( - path_or_data=source, storage_options=storage_options + path_or_data=source, storage_options=storage_options, fs=fs ): source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index 6a17535a6a9..2bad7f26730 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -18,24 +18,32 @@ def read_text( compression=None, compression_offsets=None, storage_options=None, + prefetch_read_ahead=None, ): """{docstring}""" if delimiter is None: raise ValueError("delimiter needs to be provided") - # Check if this is a remote file + # Extract filesystem up front fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=storage_options ) - if fs and paths and not ioutils._is_local_filesystem(fs): - filepath_or_buffer, byte_range = ioutils._get_remote_bytes_lines( + + # Prefetch remote data if possible + if fs and paths: + filepath_or_buffer, info = ioutils.prefetch_remote_buffers( paths, fs, - byte_range=byte_range, + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, ) assert len(filepath_or_buffer) == 1 filepath_or_buffer = filepath_or_buffer[0] + byte_range = info.get("byte_range", byte_range) filepath_or_buffer, _ = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, From 81f2a7dd02576beafd4e1d1c1d799568d1d7d750 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 22 Jul 2024 09:44:54 -0700 Subject: [PATCH 15/20] fix tests --- python/cudf/cudf/tests/test_s3.py | 9 +++++++++ python/dask_cudf/dask_cudf/io/parquet.py | 4 +++- python/dask_cudf/dask_cudf/io/tests/test_s3.py | 18 +++++++++++++++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 3ae318d3bf5..25cf4bdaf2d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -387,6 +387,15 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): open_file_options={"precache_options": {"method": precache}}, ) + # Check that default case doesn't warn and is correct + if precache is None: + default = cudf.read_parquet( + f"s3://{bucket}/{fname}", + storage_options=s3so, + filters=filters, + ) + assert_eq(pdf_ext.iloc[:0], default.reset_index(drop=True)) + # All row-groups should be filtered out assert_eq(pdf_ext.iloc[:0], got.reset_index(drop=True)) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 441b6ac40fc..058b68f16a6 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -102,7 +102,9 @@ def _read_paths( # Non-local filesystem handling paths_or_fobs = paths if not _is_local_filesystem(fs): - if kwargs.get("use_python_file_object", False): + if open_file_options or kwargs.get( + "use_python_file_object", False + ): # Use deprecated NativeFile code path in cudf paths_or_fobs = _open_remote_files( paths_or_fobs, diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 3947c69aaa5..3ee03edf8f5 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -120,7 +120,7 @@ def test_read_csv(s3_base, s3so): {"open_file_func": None}, ], ) -def test_read_parquet(s3_base, s3so, open_file_options): +def test_read_parquet_deprecated(s3_base, s3so, open_file_options): pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]}) buffer = BytesIO() pdf.to_parquet(path=buffer) @@ -142,3 +142,19 @@ def test_read_parquet(s3_base, s3so, open_file_options): assert df.a.sum().compute() == 10 with pytest.warns(FutureWarning): assert df.b.sum().compute() == 9 + + +def test_read_parquet_remote(s3_base, s3so): + pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]}) + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context( + s3_base=s3_base, bucket="daskparquet", files={"file.parq": buffer} + ): + df = dask_cudf.read_parquet( + "s3://daskparquet/*.parq", + storage_options=s3so, + ) + assert df.a.sum().compute() == 10 + assert df.b.sum().compute() == 9 From 5867f052ea38ec2d387f810cc51258e691edb9a8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 30 Jul 2024 14:57:29 -0700 Subject: [PATCH 16/20] register read_parquet function to CudfDXBackendEntrypoint --- python/dask_cudf/dask_cudf/backends.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 4bdb5d921ec..ac5ef865288 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -666,6 +666,16 @@ def from_dict( constructor=constructor, ) + @classmethod + def read_parquet(cls, *args, engine=None, **kwargs): + import dask_expr as dx + + from dask_cudf.io.parquet import CudfEngine + + return _default_backend( + dx.read_parquet, *args, engine=CudfEngine, **kwargs + ) + @staticmethod def read_json(*args, **kwargs): from dask_cudf.io.json import read_json as read_json_impl From 17be6b0c784774c27726657ee2ebfa9a626123a3 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 12 Aug 2024 12:10:13 -0700 Subject: [PATCH 17/20] add read_csv def --- python/dask_cudf/dask_cudf/backends.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index d2f677035fd..65b26758cee 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -677,6 +677,32 @@ def read_parquet(cls, *args, engine=None, **kwargs): dx.read_parquet, *args, engine=CudfEngine, **kwargs ) + @staticmethod + def read_csv( + path, + *args, + header="infer", + dtype_backend=None, + storage_options=None, + **kwargs, + ): + from fsspec.utils import stringify_path + from dask_expr._collection import new_collection + from dask_expr.io.csv import ReadCSV + + if not isinstance(path, str): + path = stringify_path(path) + return new_collection( + ReadCSV( + path, + dtype_backend=dtype_backend, + storage_options=storage_options, + kwargs=kwargs, + header=header, + dataframe_backend="cudf", + ) + ) + @staticmethod def read_json(*args, **kwargs): from dask_cudf.io.json import read_json as read_json_impl From 04aa983ac3050af932c3a27f73d2f407bd439abe Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 12 Aug 2024 12:12:43 -0700 Subject: [PATCH 18/20] formatting --- python/dask_cudf/dask_cudf/backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 65b26758cee..5b86108dbd4 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -686,9 +686,9 @@ def read_csv( storage_options=None, **kwargs, ): - from fsspec.utils import stringify_path from dask_expr._collection import new_collection from dask_expr.io.csv import ReadCSV + from fsspec.utils import stringify_path if not isinstance(path, str): path = stringify_path(path) From 5d8c80d864cfc768db9fe3438ffede7f0c8b7295 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 12 Aug 2024 14:25:40 -0700 Subject: [PATCH 19/20] simplify imports --- python/dask_cudf/dask_cudf/backends.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 5b86108dbd4..50548972cb0 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -686,14 +686,13 @@ def read_csv( storage_options=None, **kwargs, ): - from dask_expr._collection import new_collection - from dask_expr.io.csv import ReadCSV + import dask_expr as dx from fsspec.utils import stringify_path if not isinstance(path, str): path = stringify_path(path) - return new_collection( - ReadCSV( + return dx.new_collection( + dx.io.csv.ReadCSV( path, dtype_backend=dtype_backend, storage_options=storage_options, From ffcc137aada3c2d283570f791e2b1ba8b71ee44f Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 13 Aug 2024 10:22:41 -0500 Subject: [PATCH 20/20] Apply suggestions from code review Co-authored-by: Mads R. B. Kristensen --- python/dask_cudf/dask_cudf/backends.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 50548972cb0..01bab30190a 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -667,8 +667,8 @@ def from_dict( constructor=constructor, ) - @classmethod - def read_parquet(cls, *args, engine=None, **kwargs): + @staticmethod + def read_parquet(*args, engine=None, **kwargs): import dask_expr as dx from dask_cudf.io.parquet import CudfEngine