Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor for polars logical plans #15504

Merged
merged 58 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b3d0e06
Give pylibcudf DataTypes a __hash__
wence- May 16, 2024
22f6a4f
WIP: Translate polars IR to ours
wence- May 8, 2024
8ac4347
Add some container objects
wence- May 8, 2024
4ab983e
WIP: really, fleshing out some evaluation
wence- May 8, 2024
1981a3d
Flesh out more container stuff
wence- May 9, 2024
700f075
WIP: More fleshing out evaluation
wence- May 9, 2024
9c303bc
WIP: More fleshing out
wence- May 9, 2024
688d8ef
WIP: more implementation
wence- May 10, 2024
f56525a
WIP: simplify
wence- May 10, 2024
2cb6f50
WIP: Maybe done with eval of plan nodes
wence- May 10, 2024
c3e0a92
WIP: expression evaluation
wence- May 13, 2024
ec4562c
WIP: some more
wence- May 13, 2024
f21cd57
WIP: some agg expr stuff
wence- May 14, 2024
1f5a490
Bla
wence- May 14, 2024
31a3d5e
More fixes
wence- May 15, 2024
cda34e0
WIP: More working
wence- May 16, 2024
235575d
Expr objects are no longer dataclasses
wence- May 17, 2024
e158de6
No recursive nvtx annotations
wence- May 17, 2024
b400391
Testing infrastructure
wence- May 17, 2024
7f04985
Add basic tests
wence- May 17, 2024
233c1be
All tests passing (or at least xfailing appropriately)
wence- May 17, 2024
3a3ad2d
Handle string functions and boolean functions and add some docs
wence- May 21, 2024
dd6efaa
Flesh out more boolean functions
wence- May 21, 2024
e279a2f
More fixes
wence- May 21, 2024
bdd6ee3
Simplify
wence- May 21, 2024
c06b980
More fixes
wence- May 21, 2024
3b17c71
xfail strict in cudf_polars tests
wence- May 21, 2024
19db751
Overview doc, simplify callback
wence- May 21, 2024
146327c
Docstrings for plan nodes.
wence- May 21, 2024
e81a1e1
ClosedInterval will be a string
wence- May 21, 2024
98281e8
Small fixes from code review
wence- May 22, 2024
3a1ac86
Dedent some assertions
wence- May 22, 2024
f0686a2
More fixes in review
wence- May 23, 2024
8d25f3a
Singledispatch for translation
wence- May 23, 2024
90fca6d
Spell out DSL
wence- May 23, 2024
0f82d0f
Avoid double import
wence- May 23, 2024
f5683e7
Docs fixes
wence- May 23, 2024
b774e0e
Merge remote-tracking branch 'upstream/branch-24.08' into wence/fea/c…
wence- May 24, 2024
34aac9a
Split scan tests out into separate file
wence- May 24, 2024
74e3824
Build out groupby test and fix one bug
wence- May 24, 2024
b77c573
Split out a few more tests
wence- May 24, 2024
4b7dd6e
Move expression tests to subdirectory
wence- May 24, 2024
3aefc56
Migrate agg tests
wence- May 24, 2024
22805a6
Joins and sorts already test elsewhere
wence- May 24, 2024
d8745f6
Better distinct test and fix bug
wence- May 24, 2024
eb6626e
More exhaustive binop tests
wence- May 24, 2024
246ff6a
Migrate basic gather test
wence- May 24, 2024
26c5994
Basic tests now covered elsewhere, or unimplemented functionality
wence- May 24, 2024
00628b0
Update join for new names
wence- May 28, 2024
47df8e2
Dataframe copy
wence- May 29, 2024
2157323
Fix handling of CSE in Select and HStack
wence- May 29, 2024
6d324cb
Adapt to polars-side changes
wence- May 30, 2024
786730a
A few more tests
wence- May 30, 2024
810a8b8
Merge remote-tracking branch 'upstream/branch-24.08' into wence/fea/c…
wence- May 30, 2024
2773b0b
Update for rapids-build-backend
wence- May 30, 2024
62f6455
Rename with_sorted to sorted_like
wence- May 30, 2024
a1f579f
Column.copy takes an optional new_name argument
wence- May 30, 2024
1240b62
Expand docstrings
wence- May 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ cdef class DataType:
self.c_obj == (<DataType>other).c_obj
)

def __hash__(self):
return hash((self.c_obj.id(), self.c_obj.scale()))

@staticmethod
cdef DataType from_libcudf(data_type dt):
"""Create a DataType from a libcudf data_type.
Expand Down
56 changes: 56 additions & 0 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Callback for the polars collect function to execute on device."""

from __future__ import annotations

from functools import partial
from typing import TYPE_CHECKING

import nvtx

from cudf_polars.dsl.translate import translate_ir

if TYPE_CHECKING:
import polars as pl

from cudf_polars.dsl.ir import IR

__all__: list[str] = ["execute_with_cudf"]


def _callback(
ir: IR,
with_columns: list[str] | None,
pyarrow_predicate: str | None,
n_rows: int | None,
) -> pl.DataFrame:
assert with_columns is None
assert pyarrow_predicate is None
assert n_rows is None
vyasr marked this conversation as resolved.
Show resolved Hide resolved
with nvtx.annotate(message="ExecuteIR", domain="cudf_polars"):
return ir.evaluate(cache={}).to_polars()


def execute_with_cudf(nt, *, raise_on_fail: bool = False) -> None:
"""
A post optimization callback that attempts to execute the plan with cudf.

Parameters
----------
nt
NodeTraverser

raise_on_fail
Should conversion raise an exception rather than continuing
without setting a callback.

The NodeTraverser is mutated if the libcudf executor can handle the plan.
"""
try:
with nvtx.annotate(message="ConvertIR", domain="cudf_polars"):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
nt.set_udf(partial(_callback, translate_ir(nt)))
except NotImplementedError:
if raise_on_fail:
raise
12 changes: 12 additions & 0 deletions python/cudf_polars/cudf_polars/containers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""Containers of concrete data."""

from __future__ import annotations

__all__: list[str] = ["DataFrame", "Column", "Scalar"]

from cudf_polars.containers.column import Column
from cudf_polars.containers.dataframe import DataFrame
from cudf_polars.containers.scalar import Scalar
95 changes: 95 additions & 0 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""A column, with some properties."""

from __future__ import annotations

import functools
from typing import TYPE_CHECKING

import cudf._lib.pylibcudf as plc

if TYPE_CHECKING:
from typing_extensions import Self

__all__: list[str] = ["Column"]


class Column:
"""A column, a name, and sortedness."""

obj: plc.Column
name: str
is_sorted: plc.types.Sorted
order: plc.types.Order
null_order: plc.types.NullOrder

def __init__(self, column: plc.Column, name: str):
self.obj = column
self.name = name
self.is_sorted = plc.types.Sorted.NO
self.order = plc.types.Order.ASCENDING
self.null_order = plc.types.NullOrder.BEFORE

def rename(self, name: str) -> Column:
"""Return a new column sharing data with a new name."""
return type(self)(self.obj, name).with_sorted(like=self)
wence- marked this conversation as resolved.
Show resolved Hide resolved

def with_sorted(self, *, like: Column) -> Self:
wence- marked this conversation as resolved.
Show resolved Hide resolved
"""Copy sortedness properties from a column onto self."""
return self.set_sorted(
is_sorted=like.is_sorted, order=like.order, null_order=like.null_order
)

def set_sorted(
self,
*,
is_sorted: plc.types.Sorted,
order: plc.types.Order,
null_order: plc.types.NullOrder,
) -> Self:
"""
Modify sortedness metadata in place.

Parameters
----------
is_sorted
Is the column sorted
order
The order if sorted
null_order
Where nulls sort, if sorted

Returns
-------
Self with metadata set.
"""
self.is_sorted = is_sorted
self.order = order
self.null_order = null_order
return self

def copy(self) -> Self:
"""Return a shallow copy of the column."""
return type(self)(self.obj, self.name).with_sorted(like=self)

def mask_nans(self) -> Self:
"""Return a copy of self with nans masked out."""
if self.nan_count > 0:
raise NotImplementedError
return self.copy()

@functools.cached_property
def nan_count(self) -> int:
"""Return the number of NaN values in the column."""
if self.obj.type().id() not in (plc.TypeId.FLOAT32, plc.TypeId.FLOAT64):
return 0
return plc.interop.to_arrow(
plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
# TODO: pylibcudf needs to have a SizeType DataType singleton
plc.DataType(plc.TypeId.INT32),
)
).as_py()
171 changes: 171 additions & 0 deletions python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""A dataframe, with some properties."""

from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING

import polars as pl

import cudf._lib.pylibcudf as plc

from cudf_polars.containers.column import Column

if TYPE_CHECKING:
from collections.abc import Mapping, Sequence, Set

from typing_extensions import Self

import cudf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark all the places we're importing/using cudf rather than pylibcudf with TODOs or something so that we can find them easily as we work to remove them.


from cudf_polars.containers.scalar import Scalar


__all__: list[str] = ["DataFrame"]


class DataFrame:
"""A representation of a dataframe."""

columns: list[Column]
scalars: list[Scalar]
table: plc.Table | None

def __init__(self, columns: Sequence[Column], scalars: Sequence[Scalar]) -> None:
self.columns = list(columns)
self._column_map = {c.name: c for c in self.columns}
self.scalars = list(scalars)
vyasr marked this conversation as resolved.
Show resolved Hide resolved
if len(scalars) == 0:
self.table = plc.Table([c.obj for c in columns])
else:
self.table = None

def to_polars(self) -> pl.DataFrame:
"""Convert to a polars DataFrame."""
assert len(self.scalars) == 0
return pl.from_arrow(
plc.interop.to_arrow(
self.table,
[plc.interop.ColumnMetadata(name=c.name) for c in self.columns],
)
)

@cached_property
def column_names_set(self) -> frozenset[str]:
"""Return the column names as a set."""
return frozenset(c.name for c in self.columns)

@cached_property
def column_names(self) -> list[str]:
"""Return a list of the column names."""
return [c.name for c in self.columns]

@cached_property
def num_columns(self) -> int:
"""Number of columns."""
return len(self.columns)

@cached_property
def num_rows(self) -> int:
"""Number of rows."""
if self.table is None:
raise ValueError("Number of rows of frame with scalars makes no sense")
return self.table.num_rows()

@classmethod
def from_cudf(cls, df: cudf.DataFrame) -> Self:
"""Create from a cudf dataframe."""
return cls(
[Column(c.to_pylibcudf(mode="read"), name) for name, c in df._data.items()],
[],
)

@classmethod
def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self:
"""Create from a pylibcudf table."""
# TODO: strict=True when we drop py39
wence- marked this conversation as resolved.
Show resolved Hide resolved
if table.num_columns() != len(names):
raise ValueError("Mismatching name and table length.")
return cls([Column(c, name) for c, name in zip(table.columns(), names)], [])

def with_sorted(self, *, like: DataFrame, subset: Set[str] | None = None) -> Self:
wence- marked this conversation as resolved.
Show resolved Hide resolved
"""Copy sortedness from a dataframe onto self."""
if like.column_names != self.column_names:
raise ValueError("Can only copy from identically named frame")
subset = self.column_names_set if subset is None else subset
self.columns = [
c.with_sorted(like=other) if c.name in subset else c
for c, other in zip(self.columns, like.columns)
vyasr marked this conversation as resolved.
Show resolved Hide resolved
]
return self

def with_columns(self, columns: Sequence[Column]) -> Self:
"""
Return a new dataframe with extra columns.

Data is shared.
"""
return type(self)([*self.columns, *columns], self.scalars)

def discard_columns(self, names: Set[str]) -> Self:
"""Drop columns by name."""
return type(self)(
[c for c in self.columns if c.name not in names], self.scalars
)

def select(self, names: Sequence[str]) -> Self:
"""Select columns by name returning DataFrame."""
want = set(names)
if not want.issubset(self.column_names_set):
raise ValueError("Can't select missing names")
return type(self)([self._column_map[name] for name in names], self.scalars)

def replace_columns(self, *columns: Column) -> Self:
"""Return a new dataframe with columns replaced by name."""
new = {c.name: c for c in columns}
if not set(new).issubset(self.column_names_set):
raise ValueError("Cannot replace with non-existing names")
return type(self)([new.get(c.name, c) for c in self.columns], self.scalars)

def rename_columns(self, mapping: Mapping[str, str]) -> Self:
"""Rename some columns."""
return type(self)(
[c.rename(mapping.get(c.name, c.name)) for c in self.columns], self.scalars
)

def select_columns(self, names: Set[str]) -> list[Column]:
"""Select columns by name."""
return [c for c in self.columns if c.name in names]

def filter(self, mask: Column) -> Self:
"""Return a filtered table given a mask."""
table = plc.stream_compaction.apply_boolean_mask(self.table, mask.obj)
return type(self).from_table(table, self.column_names).with_sorted(like=self)

def slice(self, zlice: tuple[int, int] | None) -> Self:
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"""
Slice a dataframe.

Parameters
----------
zlice
optional, tuple of start and length, negative values of start
treated as for python indexing. If not provided, returns self.

Returns
-------
New dataframe (if zlice is not None) other self (if it is)
"""
if zlice is None:
return self
start, length = zlice
if start < 0:
start += self.num_rows
# Polars slice takes an arbitrary positive integer and slice
# to the end of the frame if it is larger.
end = min(start + length, self.num_rows)
(table,) = plc.copying.slice(self.table, [start, end])
return type(self).from_table(table, self.column_names).with_sorted(like=self)
23 changes: 23 additions & 0 deletions python/cudf_polars/cudf_polars/containers/scalar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""A scalar, with some properties."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
import cudf._lib.pylibcudf as plc

__all__: list[str] = ["Scalar"]


class Scalar:
wence- marked this conversation as resolved.
Show resolved Hide resolved
"""A scalar, and a name."""

__slots__ = ("obj", "name")
obj: plc.Scalar

def __init__(self, scalar: plc.Scalar):
self.obj = scalar
8 changes: 8 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

"""The domain-specific language (DSL) for the polars executor."""

from __future__ import annotations

__all__: list[str] = []
Loading
Loading