Skip to content

Commit

Permalink
add include_path_column support and tesst coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 5, 2024
1 parent a3f08d9 commit 83d1f0a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
21 changes: 9 additions & 12 deletions python/dask_cudf/dask_cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dask.utils import parse_bytes

import cudf
from cudf.core.column import as_column, build_categorical_column
from cudf.core.column import as_column
from cudf.utils.ioutils import _is_local_filesystem

from dask_cudf.backends import _default_backend
Expand All @@ -36,20 +36,15 @@ def _read_json_partition(
if include_path_column:
# Add "path" column.
# Must iterate over sources sequentially
converted_paths = [
path_converter(path) if path_converter else path for path in paths
]
converted_paths = (
paths
if path_converter is None
else [path_converter(path) for path in paths]
)
dfs = []
for i, source in enumerate(sources):
df = cudf.read_json(source, **kwargs)
codes = as_column(i, length=len(df))
df["path"] = build_categorical_column(
categories=converted_paths,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)
df["path"] = as_column(converted_paths[i], length=len(df))
dfs.append(df)
return cudf.concat(dfs)
else:
Expand Down Expand Up @@ -179,6 +174,8 @@ def read_json(
orient=orient,
lines=lines,
compression=compression,
include_path_column=kwargs.get("include_path_column"),
path_converter=kwargs.get("path_converter"),
)
if not _is_local_filesystem(fs):
_kwargs["fs"] = fs
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ def test_read_json_aggregate_files(tmp_path):

df2 = dask_cudf.read_json(
json_path,
aggregate_files=True,
blocksize="1GiB",
aggregate_files=2,
include_path_column=True,
)
assert "path" in df2.columns
assert len(df2["path"].compute().unique()) == df1.npartitions
dd.assert_eq(df1, df2.drop(columns=["path"]), check_index=False)

0 comments on commit 83d1f0a

Please sign in to comment.