Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-partition Scan support to cuDF-Polars #17494

Merged
merged 10 commits into from
Dec 19, 2024
3 changes: 2 additions & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def validate_config_options(config: dict) -> None:
executor = config.get("executor", "pylibcudf")
if executor == "dask-experimental":
unsupported = config.get("executor_options", {}).keys() - {
"max_rows_per_partition"
"max_rows_per_partition",
"parquet_blocksize",
}
else:
unsupported = config.get("executor_options", {}).keys()
Expand Down
227 changes: 223 additions & 4 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@
from __future__ import annotations

import math
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.ir import DataFrameScan, Union
import pylibcudf as plc

from cudf_polars.dsl.ir import IR, DataFrameScan, Scan, Union
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node

if TYPE_CHECKING:
from collections.abc import MutableMapping
from collections.abc import Hashable, MutableMapping

from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.experimental.dispatch import LowerIRTransformer
from cudf_polars.typing import Schema


@lower_ir_node.register(DataFrameScan)
Expand Down Expand Up @@ -47,3 +50,219 @@ def _(
}

return ir, {ir: PartitionInfo(count=1)}


class SplitScan(IR):
"""Input from a split file."""

__slots__ = (
"base_scan",
"schema",
"split_index",
"total_splits",
)
_non_child = (
"base_scan",
"split_index",
"total_splits",
)
base_scan: Scan
"""Scan operation this node is based on."""
split_index: int
"""Index of the current split."""
total_splits: int
"""Total number of splits."""

def __init__(self, base_scan: Scan, split_index: int, total_splits: int):
self.schema = base_scan.schema
self.base_scan = base_scan
self.split_index = split_index
self.total_splits = total_splits
self._non_child_args = (
split_index,
total_splits,
*base_scan._non_child_args,
)
self.children = ()
if base_scan.typ not in ("parquet",): # pragma: no cover
raise NotImplementedError(
f"Unhandled Scan type for file splitting: {base_scan.typ}"
)

def get_hashable(self) -> Hashable:
"""Hashable representation of node."""
return (
hash(self.base_scan),
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
self.split_index,
self.total_splits,
)

@classmethod
def do_evaluate(
cls,
split_index: int,
total_splits: int,
schema: Schema,
typ: str,
reader_options: dict[str, Any],
config_options: dict[str, Any],
paths: list[str],
with_columns: list[str] | None,
skip_rows: int,
n_rows: int,
row_index: tuple[str, int] | None,
predicate: NamedExpr | None,
):
"""Evaluate and return a dataframe."""
if typ not in ("parquet",): # pragma: no cover
raise NotImplementedError(f"Unhandled Scan type for file splitting: {typ}")

rowgroup_metadata = plc.io.parquet_metadata.read_parquet_metadata(
plc.io.SourceInfo(paths)
).rowgroup_metadata()
total_row_groups = len(rowgroup_metadata)
if total_splits > total_row_groups:
# Don't bother aligning on row-groups
total_rows = sum(rg["num_rows"] for rg in rowgroup_metadata)
n_rows = int(total_rows / total_splits)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
skip_rows = n_rows * split_index
else:
# Align split with row-groups
rg_stride = int(total_row_groups / total_splits)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
skip_rgs = rg_stride * split_index
skip_rows = (
sum(rg["num_rows"] for rg in rowgroup_metadata[:skip_rgs])
if skip_rgs
else 0
)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
n_rows = sum(
rg["num_rows"]
for rg in rowgroup_metadata[skip_rgs : skip_rgs + rg_stride]
)

# Last split should always read to end of file
if split_index == (total_splits - 1):
n_rows = -1

return Scan.do_evaluate(
schema,
typ,
reader_options,
config_options,
paths,
with_columns,
skip_rows,
n_rows,
row_index,
predicate,
)


def _sample_pq_statistics(ir: Scan) -> dict[str, float]:
import numpy as np
import pyarrow.dataset as pa_ds

# Use average total_uncompressed_size of three files
# TODO: Use plc.io.parquet_metadata.read_parquet_metadata
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
n_sample = 3
column_sizes = {}
ds = pa_ds.dataset(ir.paths[:n_sample], format="parquet")
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for i, frag in enumerate(ds.get_fragments()):
md = frag.metadata
for rg in range(md.num_row_groups):
row_group = md.row_group(rg)
for col in range(row_group.num_columns):
column = row_group.column(col)
name = column.path_in_schema
if name not in column_sizes:
column_sizes[name] = np.zeros(n_sample, dtype="int64")
column_sizes[name][i] += column.total_uncompressed_size

return {name: np.mean(sizes) for name, sizes in column_sizes.items()}


def _scan_partitioning(ir: Scan) -> tuple[int, int]:
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
split, stride = 1, 1
if ir.typ == "parquet":
file_size: float = 0
# TODO: Use system info to set default blocksize
parallel_options = ir.config_options.get("executor_options", {})
blocksize: int = parallel_options.get("parquet_blocksize", 1024**3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1GiB a good size, or should we pick something larger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1GiB a good size, or should we pick something larger?

My experience tells me that 1GiB is a good default, but that most users with datacenter-class GPUs will usually want to go bigger. In Dask cuDF we use pynvml to query the "real" device size. The details of this can get sticky, so I'd rather revisit this kind of improvement after we start benchmarking.

stats = _sample_pq_statistics(ir)
columns: list = ir.with_columns or list(stats.keys())
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for name in columns:
file_size += float(stats[name])
if file_size > 0:
if file_size > blocksize:
# Split large files
split = math.ceil(file_size / blocksize)
else:
# Aggregate small files
stride = max(int(blocksize / file_size), 1)

# TODO: Use file sizes for csv and json
return (split, stride)


@lower_ir_node.register(Scan)
def _(
ir: Scan, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
partition_info: MutableMapping[IR, PartitionInfo]
if ir.typ in ("csv", "parquet", "ndjson") and ir.n_rows == -1 and ir.skip_rows == 0:
split, stride = _scan_partitioning(ir)
paths = list(ir.paths)
if split > 1:
# Disable chunked reader when splitting files
config_options = ir.config_options.copy()
config_options["parquet_options"] = config_options.get(
"parquet_options", {}
).copy()
config_options["parquet_options"]["chunked"] = False
Comment on lines +275 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we require py 3.10 now, I think this simpler as:

Suggested change
config_options["parquet_options"] = config_options.get(
"parquet_options", {}
).copy()
config_options["parquet_options"]["chunked"] = False
config_options["parquet_options"] |= {"chunked": False}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works if the "parquet_options" key is missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, sorry


slices: list[SplitScan] = []
for path in paths:
base_scan = Scan(
ir.schema,
ir.typ,
ir.reader_options,
ir.cloud_options,
config_options,
[path],
ir.with_columns,
ir.skip_rows,
ir.n_rows,
ir.row_index,
ir.predicate,
)
slices.extend(
SplitScan(base_scan, sindex, split) for sindex in range(split)
)
new_node = Union(ir.schema, None, *slices)
partition_info = {slice: PartitionInfo(count=1) for slice in slices} | {
new_node: PartitionInfo(count=len(slices))
}
else:
groups: list[Scan] = [
Scan(
ir.schema,
ir.typ,
ir.reader_options,
ir.cloud_options,
ir.config_options,
paths[i : i + stride],
ir.with_columns,
ir.skip_rows,
ir.n_rows,
ir.row_index,
ir.predicate,
)
for i in range(0, len(paths), stride)
]
new_node = Union(ir.schema, None, *groups)
partition_info = {group: PartitionInfo(count=1) for group in groups} | {
new_node: PartitionInfo(count=len(groups))
}
return new_node, partition_info

return ir, {ir: PartitionInfo(count=1)} # pragma: no cover
80 changes: 80 additions & 0 deletions python/cudf_polars/tests/experimental/test_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest

import polars as pl

from cudf_polars import Translator
from cudf_polars.experimental.parallel import lower_ir_graph
from cudf_polars.testing.asserts import assert_gpu_result_equal


@pytest.fixture(scope="module")
def df():
return pl.DataFrame(
{
"x": range(3_000),
"y": ["cat", "dog", "fish"] * 1_000,
"z": [1.0, 2.0, 3.0, 4.0, 5.0] * 600,
}
)


def make_source(df, path, fmt, n_files=3):
n_rows = len(df)
stride = int(n_rows / n_files)
for i in range(n_files):
offset = stride * i
part = df.slice(offset, stride)
if fmt == "csv":
part.write_csv(path / f"part.{i}.csv")
elif fmt == "ndjson":
part.write_ndjson(path / f"part.{i}.ndjson")
else:
part.write_parquet(
path / f"part.{i}.parquet",
row_group_size=int(stride / 2),
)


@pytest.mark.parametrize(
"fmt, scan_fn",
[
("csv", pl.scan_csv),
("ndjson", pl.scan_ndjson),
("parquet", pl.scan_parquet),
],
)
def test_parallel_scan(tmp_path, df, fmt, scan_fn):
make_source(df, tmp_path, fmt)
q = scan_fn(tmp_path)
engine = pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
)
assert_gpu_result_equal(q, engine=engine)


@pytest.mark.parametrize("blocksize", [1_000, 10_000, 1_000_000])
def test_parquet_blocksize(tmp_path, df, blocksize):
n_files = 3
make_source(df, tmp_path, "parquet", n_files)
q = pl.scan_parquet(tmp_path)
engine = pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"parquet_blocksize": blocksize},
)
assert_gpu_result_equal(q, engine=engine)

# Check partitioning
qir = Translator(q._ldf.visit(), engine).translate_ir()
ir, info = lower_ir_graph(qir)
count = info[ir].count
if blocksize <= 12_000:
assert count > n_files
else:
assert count < n_files
Loading