Skip to content

Commit

Permalink
add support for include_path_column
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 5, 2024
1 parent b494d3a commit a3f08d9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
61 changes: 40 additions & 21 deletions python/dask_cudf/dask_cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,52 @@
from dask.utils import parse_bytes

import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.utils.ioutils import _is_local_filesystem

from dask_cudf.backends import _default_backend


def _read_json_partition(paths, fs=None, **kwargs):
if fs is None:
# Pass list of paths directly to cudf
return cudf.read_json(paths, **kwargs)
else:
# Use fs to copy the bytes into host memory.
# It only makes sense to do this for remote data.
return cudf.read_json(
fs.cat_ranges(
paths,
[0] * len(paths),
fs.sizes(paths),
),
**kwargs,
def _read_json_partition(
paths,
fs=None,
include_path_column=None,
path_converter=None,
**kwargs,
):
# Transfer all data up front for remote storage
sources = (
paths
if fs is None
else fs.cat_ranges(
paths,
[0] * len(paths),
fs.sizes(paths),
)
)

if include_path_column:
# Add "path" column.
# Must iterate over sources sequentially
converted_paths = [
path_converter(path) if path_converter else path for path in paths
]
dfs = []
for i, source in enumerate(sources):
df = cudf.read_json(source, **kwargs)
codes = as_column(i, length=len(df))
df["path"] = build_categorical_column(
categories=converted_paths,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)
dfs.append(df)
return cudf.concat(dfs)
else:
# Pass sources directly to cudf
return cudf.read_json(sources, **kwargs)


def read_json(
Expand Down Expand Up @@ -145,13 +171,6 @@ def read_json(
# Use custom _read_json_partition function
# to generate each partition.

if kwargs.get("include_path_column"):
# TODO: Support include_path_column
raise NotImplementedError(
"aggregate_files is not currently supported with "
"include_path_column. Please specify"
)

compression = get_compression(
url_path[0] if isinstance(url_path, list) else url_path,
compression,
Expand Down
9 changes: 9 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,12 @@ def test_read_json_aggregate_files(tmp_path):
)
assert df2.npartitions == 1
dd.assert_eq(df1, df2, check_index=False)

df2 = dask_cudf.read_json(
json_path,
aggregate_files=True,
blocksize="1GiB",
include_path_column=True,
)
assert "path" in df2.columns
dd.assert_eq(df1, df2.drop(columns=["path"]), check_index=False)

0 comments on commit a3f08d9

Please sign in to comment.