Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add with_columns #909

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from __future__ import annotations

from typing import Any, List, TYPE_CHECKING, Literal
from typing import Any, Iterable, List, TYPE_CHECKING
timsaucer marked this conversation as resolved.
Show resolved Hide resolved
from datafusion.record_batch import RecordBatchStream
from typing_extensions import deprecated
from datafusion.plan import LogicalPlan, ExecutionPlan
Expand Down Expand Up @@ -171,6 +171,51 @@
"""
return DataFrame(self.df.with_column(name, expr.expr))

def with_columns(
self, *exprs: Expr | Iterable[Expr], **named_exprs: Expr
) -> DataFrame:
"""Add columns to the DataFrame.

By passing expressions, iteratables of expressions, or named expressions. To
pass named expressions use the form name=Expr.

Example usage: The following will add 4 columns labeled a, b, c, and d::

df = df.with_columns(
lit(0).alias('a'),
[lit(1).alias('b'), lit(2).alias('c')],
d=lit(3)
)

Args:
exprs: Either a single expression or an iterable of expressions to add.
named_exprs: Named expressions in the form of ``name=expr``

Returns:
DataFrame with the new columns added.
"""

def _simplify_expression(
*exprs: Expr | Iterable[Expr], **named_exprs: Expr
) -> list[Expr]:
expr_list = []
for expr in exprs:
if isinstance(expr, Expr):
expr_list.append(expr.expr)
elif isinstance(expr, Iterable):
for inner_expr in expr:
expr_list.append(inner_expr.expr)
else:
raise NotImplementedError
if named_exprs:
for alias, expr in named_exprs.items():
expr_list.append(expr.alias(alias).expr)
return expr_list

expressions = _simplify_expression(*exprs, **named_exprs)

return DataFrame(self.df.with_columns(expressions))

def with_column_renamed(self, old_name: str, new_name: str) -> DataFrame:
r"""Rename one column by applying a new projection.

Expand Down Expand Up @@ -308,7 +353,7 @@
self,
right: DataFrame,
*on_exprs: Expr,
how: Literal["inner", "left", "right", "full", "semi", "anti"] = "inner",

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:14: F821 Undefined name `Literal`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:23: F821 Undefined name `inner`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:32: F821 Undefined name `left`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:40: F821 Undefined name `right`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:49: F821 Undefined name `full`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:57: F821 Undefined name `semi`

Check failure on line 356 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F821)

python/datafusion/dataframe.py:356:65: F821 Undefined name `anti`
) -> DataFrame:
"""Join two :py:class:`DataFrame`using the specified expressions.

Expand Down
31 changes: 31 additions & 0 deletions python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,37 @@ def test_with_column(df):
assert result.column(2) == pa.array([5, 7, 9])


def test_with_columns(df):
df = df.with_columns(
(column("a") + column("b")).alias("c"),
(column("a") + column("b")).alias("d"),
[
(column("a") + column("b")).alias("e"),
(column("a") + column("b")).alias("f"),
],
g=(column("a") + column("b")),
)

# execute and collect the first (and only) batch
result = df.collect()[0]

assert result.schema.field(0).name == "a"
assert result.schema.field(1).name == "b"
assert result.schema.field(2).name == "c"
assert result.schema.field(3).name == "d"
assert result.schema.field(4).name == "e"
assert result.schema.field(5).name == "f"
assert result.schema.field(6).name == "g"

assert result.column(0) == pa.array([1, 2, 3])
assert result.column(1) == pa.array([4, 5, 6])
assert result.column(2) == pa.array([5, 7, 9])
assert result.column(3) == pa.array([5, 7, 9])
assert result.column(4) == pa.array([5, 7, 9])
assert result.column(5) == pa.array([5, 7, 9])
assert result.column(6) == pa.array([5, 7, 9])


def test_with_column_renamed(df):
df = df.with_column("c", column("a") + column("b")).with_column_renamed("c", "sum")

Expand Down
10 changes: 10 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ impl PyDataFrame {
Ok(Self::new(df))
}

fn with_columns(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
let mut df = self.df.as_ref().clone();
for expr in exprs {
let expr: Expr = expr.into();
let name = format!("{}", expr.schema_name());
df = df.with_column(name.as_str(), expr)?
}
Ok(Self::new(df))
}

/// Rename one column by applying a new projection. This is a no-op if the column to be
/// renamed does not exist.
fn with_column_renamed(&self, old_name: &str, new_name: &str) -> PyResult<Self> {
Expand Down
Loading