diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 0f2820a01e9..4a06672e065 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -53,6 +53,7 @@ def read_csv( use_python_file_object=None, storage_options=None, bytes_per_thread=None, + prefetch_read_ahead=None, ): """{docstring}""" @@ -81,9 +82,31 @@ def read_csv( "`read_csv` does not yet support reading multiple files" ) + # Extract filesystem up front + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + + # Prefetch remote data if possible + if fs and paths and not use_python_file_object: + filepath_or_buffer, info = ioutils.prefetch_remote_buffers( + paths, + fs, + bytes_per_thread=bytes_per_thread, + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, + ) + assert len(filepath_or_buffer) == 1 + filepath_or_buffer = filepath_or_buffer[0] + byte_range = info.get("byte_range", byte_range) + filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=compression, + fs=fs, iotypes=(BytesIO, StringIO, NativeFile), use_python_file_object=use_python_file_object, storage_options=storage_options, diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index fc3387d5117..b56c12781f0 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -28,6 +28,7 @@ def read_json( mixed_types_as_string=False, prune_columns=False, on_bad_lines="error", + prefetch_read_ahead=None, *args, **kwargs, ): @@ -67,16 +68,30 @@ def read_json( if not is_list_like(path_or_buf): path_or_buf = [path_or_buf] + # Extract filesystem up front + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=path_or_buf, storage_options=storage_options + ) + + # Prefetch remote data if possible + if fs and paths: + path_or_buf, info = ioutils.prefetch_remote_buffers( + paths, + fs, + expand_paths="*.json", + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, + ) + byte_range = info.get("byte_range", byte_range) + filepaths_or_buffers = [] for source in path_or_buf: if ioutils.is_directory( - path_or_data=source, storage_options=storage_options + path_or_data=source, storage_options=storage_options, fs=fs ): - fs = ioutils._ensure_filesystem( - passed_filesystem=None, - path=source, - storage_options=storage_options, - ) source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index 289292b5182..277a8e96853 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -320,25 +320,37 @@ def read_orc( "A list of stripes must be provided for each input source" ) + # Extract filesystem up front + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + + # Prefetch remote data if possible + if fs and paths and not use_python_file_object: + # TODO: Add prefetcher for partial IO + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( + paths, + fs, + bytes_per_thread=bytes_per_thread, + expand_paths="*.orc", + prefetcher="contiguous", + ) + filepaths_or_buffers = [] have_nativefile = any( isinstance(source, pa.NativeFile) for source in filepath_or_buffer ) for source in filepath_or_buffer: if ioutils.is_directory( - path_or_data=source, storage_options=storage_options + path_or_data=source, storage_options=storage_options, fs=fs ): - fs = ioutils._ensure_filesystem( - passed_filesystem=None, - path=source, - storage_options=storage_options, - ) source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=None, + fs=fs, use_python_file_object=use_python_file_object, storage_options=storage_options, bytes_per_thread=bytes_per_thread, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 4a419a2fbb6..b09e603560b 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -341,8 +341,18 @@ def read_parquet_metadata(filepath_or_buffer): path_or_data=filepath_or_buffer, storage_options=None ) - # Check if filepath or buffer - filepath_or_buffer = paths if paths else filepath_or_buffer + if fs and paths: + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( + paths, + fs, + prefetcher="parquet", + prefetcher_options={ + "columns": [], + "row_groups": [], + }, + ) + else: + filepath_or_buffer = paths if paths else filepath_or_buffer # List of filepaths or buffers filepaths_or_buffers = [] @@ -609,7 +619,22 @@ def read_parquet( categorical_partitions=categorical_partitions, dataset_kwargs=dataset_kwargs, ) - filepath_or_buffer = paths if paths else filepath_or_buffer + + # Prefetch remote data if possible + if fs and paths and not use_python_file_object: + filepath_or_buffer, _ = ioutils.prefetch_remote_buffers( + paths, + fs, + bytes_per_thread=bytes_per_thread, + prefetcher="parquet", + prefetcher_options={ + "columns": columns, + # All paths must have the same row-group selection + "row_groups": row_groups[0] if row_groups else None, + }, + ) + else: + filepath_or_buffer = paths if paths else filepath_or_buffer filepaths_or_buffers = [] if use_python_file_object: diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index 4329480bb2c..2bad7f26730 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -18,15 +18,37 @@ def read_text( compression=None, compression_offsets=None, storage_options=None, + prefetch_read_ahead=None, ): """{docstring}""" if delimiter is None: raise ValueError("delimiter needs to be provided") + # Extract filesystem up front + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) + + # Prefetch remote data if possible + if fs and paths: + filepath_or_buffer, info = ioutils.prefetch_remote_buffers( + paths, + fs, + prefetcher="contiguous", + prefetcher_options={ + "byte_range": byte_range, + "read_ahead": prefetch_read_ahead, + }, + ) + assert len(filepath_or_buffer) == 1 + filepath_or_buffer = filepath_or_buffer[0] + byte_range = info.get("byte_range", byte_range) + filepath_or_buffer, _ = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=None, + fs=fs, iotypes=(BytesIO, StringIO), storage_options=storage_options, ) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 3ae318d3bf5..25cf4bdaf2d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -387,6 +387,15 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): open_file_options={"precache_options": {"method": precache}}, ) + # Check that default case doesn't warn and is correct + if precache is None: + default = cudf.read_parquet( + f"s3://{bucket}/{fname}", + storage_options=s3so, + filters=filters, + ) + assert_eq(pdf_ext.iloc[:0], default.reset_index(drop=True)) + # All row-groups should be filtered out assert_eq(pdf_ext.iloc[:0], got.reset_index(drop=True)) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 448a815fe1b..3ccf34409f3 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -28,6 +28,7 @@ _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 +_READ_AHEAD_DEFAULT = 1024 * 1024 _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for @@ -1580,15 +1581,18 @@ def ensure_single_filepath_or_buffer(path_or_data, storage_options=None): return True -def is_directory(path_or_data, storage_options=None): +def is_directory(path_or_data, storage_options=None, fs=None): """Returns True if the provided filepath is a directory""" path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): path_or_data = os.path.expanduser(path_or_data) try: - fs = get_fs_token_paths( - path_or_data, mode="rb", storage_options=storage_options - )[0] + fs = ( + fs + or get_fs_token_paths( + path_or_data, mode="rb", storage_options=storage_options + )[0] + ) except ValueError as e: if str(e).startswith("Protocol not known"): return False @@ -1642,6 +1646,144 @@ def _set_context(obj, stack): return stack.enter_context(obj) +def _get_remote_bytes_parquet( + remote_paths, + fs, + *, + columns=None, + row_groups=None, + bytes_per_thread=None, +): + if fsspec_parquet is None or (columns is None and row_groups is None): + return _get_remote_bytes( + remote_paths, fs, bytes_per_thread=bytes_per_thread + ), {} + + sizes = fs.sizes(remote_paths) + data = fsspec_parquet._get_parquet_byte_ranges( + remote_paths, + fs, + columns=columns, + row_groups=row_groups, + max_block=bytes_per_thread or _BYTES_PER_THREAD_DEFAULT, + ) + buffers = [] + for size, path in zip(sizes, remote_paths): + path_data = data.pop(path) + buf = np.zeros(size, dtype="b") + for range_offset in list(path_data.keys()): + chunk = path_data.pop(range_offset) + buf[range_offset[0] : range_offset[1]] = np.frombuffer( + chunk, dtype="b" + ) + buffers.append(buf.tobytes()) + return buffers, {} + + +def _get_remote_bytes_contiguous( + remote_paths, + fs, + *, + byte_range=None, + read_ahead=None, + bytes_per_thread=None, +): + # Use byte_range to set remote_starts and remote_ends + remote_starts = None + remote_ends = None + offset = 0 + if byte_range: + assert len(remote_paths) == 1 + start, stop = byte_range + remote_starts = [start] * len(remote_paths) + if start: + offset = 1 + if stop: + remote_ends = [start + stop] * len(remote_paths) + + # Collect buffers + buffers = _get_remote_bytes( + remote_paths, + fs, + remote_starts=remote_starts, + remote_ends=remote_ends, + read_ahead=read_ahead, + bytes_per_thread=bytes_per_thread, + use_proxy_files=False, + offset=offset, + ) + + # Adjust byte_range to trim unnecessary bytes. + # Note that we keep the byte-range shifted by one + # byte so that the libcudf reader still follows the + # correct code path. + # TODO: Get rid of this strange workaround! + if offset: + byte_range = (offset, byte_range[1]) + + return buffers, {"byte_range": byte_range} + + +def _get_remote_bytes( + remote_paths, + fs, + *, + remote_starts=None, + remote_ends=None, + read_ahead=None, + bytes_per_thread=None, + use_proxy_files=True, + offset=0, +): + if isinstance(remote_paths, str): + remote_paths = [remote_paths] + + if read_ahead is None: + read_ahead = _READ_AHEAD_DEFAULT + + if remote_starts: + assert len(remote_starts) == len(remote_paths) + else: + remote_starts = [0] * len(remote_paths) + + sizes = fs.sizes(remote_paths) + if remote_ends: + assert len(remote_ends) == len(remote_paths) + for i in range(len(remote_ends)): + remote_ends[i] = min(remote_ends[i] + read_ahead, sizes[i]) + else: + remote_ends = sizes + + # Construct list of paths, starts, and stops + paths, starts, ends = [], [], [] + blocksize = bytes_per_thread or _BYTES_PER_THREAD_DEFAULT + for i, remote_path in enumerate(remote_paths): + for j in range(remote_starts[i], remote_ends[i], blocksize): + paths.append(remote_path) + starts.append(j) + ends.append(min(j + blocksize, remote_ends[i])) + + # Collect the byte ranges + chunks = fs.cat_ranges(paths, starts, ends) + + # Construct local byte buffers + buffers = [] + path_counts = np.unique(paths, return_counts=True)[1] + for i, remote_path in enumerate(remote_paths): + size = ( + sizes[i] if use_proxy_files else remote_ends[i] - remote_starts[i] + ) + buf = np.zeros(size + offset, dtype="b") + for j in range(path_counts[i]): + start, end = starts[i + j] + offset, ends[i + j] + offset + if not use_proxy_files: + start -= remote_starts[i] + end -= remote_starts[i] + buf[start:end] = np.frombuffer(chunks.pop(0), dtype="b") + buffers.append(buf.tobytes()) + return buffers + + def _open_remote_files( paths, fs, @@ -1738,6 +1880,51 @@ def _open_remote_files( ] +def prefetch_remote_buffers( + paths, + fs, + *, + expand_paths=False, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, + prefetcher=None, + prefetcher_options=None, +): + # TODO: Add Docstring + # Gather bytes ahead of time for remote filesystems + if fs and paths and not _is_local_filesystem(fs): + prefetchers = { + "parquet": _get_remote_bytes_parquet, + "contiguous": _get_remote_bytes_contiguous, + } + if prefetcher not in prefetchers: + raise NotImplementedError( + f"{prefetcher} is not a supported remote-data prefetcher" + ) + + if expand_paths: + expanded_paths = [] + if expand_paths is True: + expand_paths = "*" + for path in paths: + if is_directory(path_or_data=path, fs=fs): + expanded_paths.extend( + fs.glob(fs.sep.join([path, expand_paths])) + ) + else: + expanded_paths.append(path) + else: + expanded_paths = paths + + return prefetchers.get(prefetcher)( + expanded_paths, + fs, + bytes_per_thread=bytes_per_thread, + **(prefetcher_options or {}), + ) + else: + return paths, {} + + @doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, @@ -1873,17 +2060,9 @@ def get_reader_filepath_or_buffer( **(open_file_options or {}), ) else: - path_or_data = [ - BytesIO( - _fsspec_data_transfer( - fpath, - fs=fs, - mode=mode, - bytes_per_thread=bytes_per_thread, - ) - ) - for fpath in paths - ] + path_or_data = _get_remote_bytes( + paths, fs, bytes_per_thread=bytes_per_thread + ) if len(path_or_data) == 1: path_or_data = path_or_data[0] @@ -1893,11 +2072,8 @@ def get_reader_filepath_or_buffer( if use_python_file_object: path_or_data = ArrowPythonFile(path_or_data) else: - path_or_data = BytesIO( - _fsspec_data_transfer( - path_or_data, mode=mode, bytes_per_thread=bytes_per_thread - ) - ) + # Remote file is already open - Just read it + path_or_data = BytesIO(path_or_data.read()) return path_or_data, compression @@ -2190,27 +2366,6 @@ def _fsspec_data_transfer( return buf.tobytes() -def _merge_ranges(byte_ranges, max_block=256_000_000, max_gap=64_000): - # Simple utility to merge small/adjacent byte ranges - new_ranges = [] - if not byte_ranges: - # Early return - return new_ranges - - offset, size = byte_ranges[0] - for new_offset, new_size in byte_ranges[1:]: - gap = new_offset - (offset + size) - if gap > max_gap or (size + new_size + gap) > max_block: - # Gap is too large or total read is too large - new_ranges.append((offset, size)) - offset = new_offset - size = new_size - continue - size += new_size + gap - new_ranges.append((offset, size)) - return new_ranges - - def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): if fs is None: # We have an open fsspec file object diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 2b1f745fc04..01bab30190a 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -667,6 +667,41 @@ def from_dict( constructor=constructor, ) + @staticmethod + def read_parquet(*args, engine=None, **kwargs): + import dask_expr as dx + + from dask_cudf.io.parquet import CudfEngine + + return _default_backend( + dx.read_parquet, *args, engine=CudfEngine, **kwargs + ) + + @staticmethod + def read_csv( + path, + *args, + header="infer", + dtype_backend=None, + storage_options=None, + **kwargs, + ): + import dask_expr as dx + from fsspec.utils import stringify_path + + if not isinstance(path, str): + path = stringify_path(path) + return dx.new_collection( + dx.io.csv.ReadCSV( + path, + dtype_backend=dtype_backend, + storage_options=storage_options, + kwargs=kwargs, + header=header, + dataframe_backend="cudf", + ) + ) + @staticmethod def read_json(*args, **kwargs): from dask_cudf.io.json import read_json as read_json_impl diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index f0cab953458..cdcc5ff8a97 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -32,6 +32,7 @@ _ROW_GROUP_SIZE_BYTES_DEFAULT, _is_local_filesystem, _open_remote_files, + prefetch_remote_buffers, ) from cudf.utils.utils import maybe_filter_deprecation @@ -102,14 +103,33 @@ def _read_paths( # Non-local filesystem handling paths_or_fobs = paths if not _is_local_filesystem(fs): - paths_or_fobs = _open_remote_files( - paths_or_fobs, - fs, - context_stack=stack, - **_default_open_file_options( - open_file_options, columns, row_groups - ), - ) + if open_file_options or kwargs.get( + "use_python_file_object", False + ): + # Use deprecated NativeFile code path in cudf + paths_or_fobs = _open_remote_files( + paths_or_fobs, + fs, + context_stack=stack, + **_default_open_file_options( + open_file_options, columns, row_groups + ), + ) + else: + # Use fsspec to collect byte ranges for all + # files ahead of time + paths_or_fobs, _ = prefetch_remote_buffers( + paths, + fs, + prefetcher="parquet", + prefetcher_options={ + "columns": columns, + # All paths must have the same row-group selection + "row_groups": row_groups[0] + if row_groups + else None, + }, + ) # Filter out deprecation warning unless the user # specifies open_file_options and/or use_python_file_object.