Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise get_reader_filepath_or_buffer to handle a list of data sources #16613

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8d8dee6
revise get_reader_filepath_or_buffer
rjzamora Aug 20, 2024
ac2bf31
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 20, 2024
58637fc
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 21, 2024
ecd149c
Merge remote-tracking branch 'upstream/branch-24.10' into revise-get_…
rjzamora Aug 22, 2024
ca6eed2
always pass back a list
rjzamora Aug 22, 2024
7553e59
Merge remote-tracking branch 'upstream/branch-24.10' into revise-get_…
rjzamora Aug 22, 2024
825aff7
Merge remote-tracking branch 'upstream/branch-24.10' into revise-get_…
rjzamora Aug 22, 2024
f7045db
use _maybe_expand_directories
rjzamora Aug 22, 2024
14bf5fb
add missing read_text change
rjzamora Aug 22, 2024
e2d8ee6
Merge remote-tracking branch 'upstream/branch-24.10' into revise-get_…
rjzamora Aug 22, 2024
9f3d66d
consolidate logic to check single source
rjzamora Aug 22, 2024
8ab3a1c
format
rjzamora Aug 22, 2024
997f30c
avro fix
rjzamora Aug 22, 2024
4c48e65
remove compression arg
rjzamora Aug 22, 2024
a286ffe
remove remaining call to ensure_single_filepath_or_buffer
rjzamora Aug 22, 2024
693836a
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 22, 2024
8ad1ba5
remove leftover compression arg
rjzamora Aug 23, 2024
f5a9be0
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 23, 2024
f037bfc
Apply suggestions from code review
rjzamora Aug 23, 2024
40f0692
Update python/cudf/cudf/utils/ioutils.py
rjzamora Aug 23, 2024
151a71c
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 23, 2024
d11a1ff
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 23, 2024
a807b2c
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 24, 2024
e966219
Merge branch 'branch-24.10' into revise-get_reader_filepath_or_buffer
rjzamora Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions python/cudf/cudf/io/avro.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -29,11 +29,16 @@ def read_avro(
compression=None,
storage_options=storage_options,
)
if len(filepath_or_buffer) > 1:
raise ValueError(
"read_avro does not support multiple sources,"
f" got: {filepath_or_buffer}"
)
if compression is not None:
ValueError("URL content-encoding decompression is not supported")

return cudf.DataFrame._from_data(
*libcudf.avro.read_avro(
filepath_or_buffer, columns, skiprows, num_rows
filepath_or_buffer[0], columns, skiprows, num_rows
)
)
58 changes: 18 additions & 40 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import cudf
from cudf._lib import json as libjson
from cudf.api.types import is_list_like
from cudf.utils import ioutils
from cudf.utils.dtypes import _maybe_convert_to_default_type

Expand Down Expand Up @@ -62,37 +61,18 @@ def read_json(
f"following positional arguments: {list(args)}"
)

# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(path_or_buf):
path_or_buf = [path_or_buf]

filepaths_or_buffers = []
for source in path_or_buf:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])

tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
filepaths_or_buffers, compression = (
ioutils.get_reader_filepath_or_buffer(
path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
storage_options=storage_options,
warn_on_raw_text_input=True,
warn_meta=("json", "read_json"),
expand_dir_pattern="*.json",
)
if isinstance(tmp_source, list):
filepaths_or_buffers.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)
)

df = libjson.read_json(
filepaths_or_buffers=filepaths_or_buffers,
Expand All @@ -111,25 +91,23 @@ def read_json(
"be GPU accelerated in the future"
)

if not ioutils.ensure_single_filepath_or_buffer(
path_or_data=path_or_buf,
storage_options=storage_options,
):
raise NotImplementedError(
"`read_json` does not yet support reading "
"multiple files via pandas"
filepaths_or_buffers, compression = (
ioutils.get_reader_filepath_or_buffer(
path_or_data=path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
storage_options=storage_options,
)

path_or_buf, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
storage_options=storage_options,
)
if len(filepaths_or_buffers) > 1:
vyasr marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"read_json does not support multiple sources via pandas,"
f" got: {filepaths_or_buffers}"
)

pd_value = pd.read_json(
path_or_buf,
filepaths_or_buffers[0],
lines=lines,
dtype=dtype,
compression=compression,
Expand Down
44 changes: 15 additions & 29 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import warnings

import pyarrow as pa
from fsspec.utils import stringify_path

import cudf
from cudf._lib import orc as liborc
Expand Down Expand Up @@ -173,11 +172,16 @@ def read_orc_statistics(
path_or_buf, _ = ioutils.get_reader_filepath_or_buffer(
path_or_data=source, compression=None, **kwargs
)
if len(path_or_buf) > 1:
raise ValueError(
"read_orc_statistics does not support multiple sources,"
f" got: {path_or_buf}"
)
(
column_names,
parsed_file_statistics,
parsed_stripes_statistics,
) = liborc.read_parsed_orc_statistics(path_or_buf)
) = liborc.read_parsed_orc_statistics(path_or_buf[0])

# Parse column names
column_names = [
Expand Down Expand Up @@ -318,33 +322,15 @@ def read_orc(
"A list of stripes must be provided for each input source"
)

filepaths_or_buffers = []
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = stringify_path(source)
source = fs.sep.join([source, "*.orc"])

tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)
if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepaths_or_buffers.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)
filepaths_or_buffers, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=None,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
expand_dir_pattern="*.orc",
)
if compression is not None:
raise ValueError("URL content-encoding decompression is not supported")

if filters is not None:
selected_stripes = _filter_stripes(
Expand Down
64 changes: 16 additions & 48 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,39 +329,16 @@ def write_to_dataset(
@_performance_tracking
def read_parquet_metadata(filepath_or_buffer):
"""{docstring}"""
# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
filepath_or_buffer = [filepath_or_buffer]

# Start by trying to construct a filesystem object
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=None
)

# Check if filepath or buffer
filepath_or_buffer = paths if paths else filepath_or_buffer

# List of filepaths or buffers
filepaths_or_buffers = []
filepaths_or_buffers, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=None,
bytes_per_thread=None,
)

for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
storage_options=None,
bytes_per_thread=None,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)
if compression is not None:
raise ValueError("URL content-encoding decompression is not supported")
vyasr marked this conversation as resolved.
Show resolved Hide resolved

return libparquet.read_parquet_metadata(filepaths_or_buffers)

Expand Down Expand Up @@ -598,24 +575,15 @@ def read_parquet(
)
filepath_or_buffer = paths if paths else filepath_or_buffer

filepaths_or_buffers = []
for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)
filepaths_or_buffers, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=None,
fs=fs,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)
if compression is not None:
raise ValueError("URL content-encoding decompression is not supported")

# Warn user if they are not using cudf for IO
# (There is a good chance this was not the intention)
Expand Down
Loading
Loading