Skip to content

Commit

Permalink
Expose unnest feature (#641)
Browse files Browse the repository at this point in the history
* Expose unnest feature

* Update dataframe operation name to match rust implementation
  • Loading branch information
timsaucer authored Apr 23, 2024
1 parent 84415dd commit 7366f89
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
34 changes: 34 additions & 0 deletions datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ def struct_df():
return ctx.create_dataframe([[batch]])


@pytest.fixture
def nested_df():
ctx = SessionContext()

# create a RecordBatch and a new DataFrame from it
# Intentionally make each array of different length
batch = pa.RecordBatch.from_arrays(
[pa.array([[1], [2, 3], [4, 5, 6], None]), pa.array([7, 8, 9, 10])],
names=["a", "b"],
)

return ctx.create_dataframe([[batch]])


@pytest.fixture
def aggregate_df():
ctx = SessionContext()
Expand Down Expand Up @@ -160,6 +174,26 @@ def test_with_column_renamed(df):
assert result.schema.field(2).name == "sum"


def test_unnest(nested_df):
nested_df = nested_df.unnest_column("a")

# execute and collect the first (and only) batch
result = nested_df.collect()[0]

assert result.column(0) == pa.array([1, 2, 3, 4, 5, 6, None])
assert result.column(1) == pa.array([7, 8, 8, 9, 9, 9, 10])


def test_unnest_without_nulls(nested_df):
nested_df = nested_df.unnest_column("a", preserve_nulls=False)

# execute and collect the first (and only) batch
result = nested_df.collect()[0]

assert result.column(0) == pa.array([1, 2, 3, 4, 5, 6])
assert result.column(1) == pa.array([7, 8, 8, 9, 9, 9])


def test_udf(df):
# is_null is a pa function over arrays
is_null = udf(
Expand Down
12 changes: 12 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
use datafusion_common::UnnestOptions;
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyTuple;
Expand Down Expand Up @@ -293,6 +294,17 @@ impl PyDataFrame {
Ok(Self::new(new_df))
}

#[pyo3(signature = (column, preserve_nulls=true))]
fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyResult<Self> {
let unnest_options = UnnestOptions { preserve_nulls };
let df = self
.df
.as_ref()
.clone()
.unnest_column_with_options(column, unnest_options)?;
Ok(Self::new(df))
}

/// Calculate the intersection of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
fn intersect(&self, py_df: PyDataFrame) -> PyResult<Self> {
let new_df = self
Expand Down
2 changes: 2 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub mod subquery;
pub mod subquery_alias;
pub mod table_scan;
pub mod union;
pub mod unnest;
pub mod window;

/// A PyExpr that can be used on a DataFrame
Expand Down Expand Up @@ -684,6 +685,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<join::PyJoinConstraint>()?;
m.add_class::<cross_join::PyCrossJoin>()?;
m.add_class::<union::PyUnion>()?;
m.add_class::<unnest::PyUnnest>()?;
m.add_class::<extension::PyExtension>()?;
m.add_class::<filter::PyFilter>()?;
m.add_class::<projection::PyProjection>()?;
Expand Down
85 changes: 85 additions & 0 deletions src/expr/unnest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::logical_plan::Unnest;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};

use crate::common::df_schema::PyDFSchema;
use crate::expr::logical_node::LogicalNode;
use crate::sql::logical::PyLogicalPlan;

#[pyclass(name = "Unnest", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyUnnest {
unnest_: Unnest,
}

impl From<Unnest> for PyUnnest {
fn from(unnest_: Unnest) -> PyUnnest {
PyUnnest { unnest_ }
}
}

impl From<PyUnnest> for Unnest {
fn from(unnest_: PyUnnest) -> Self {
unnest_.unnest_
}
}

impl Display for PyUnnest {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Unnest
Inputs: {:?}
Schema: {:?}",
&self.unnest_.input, &self.unnest_.schema,
)
}
}

#[pymethods]
impl PyUnnest {
/// Retrieves the input `LogicalPlan` to this `Unnest` node
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Unnest` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok(self.unnest_.schema.as_ref().clone().into())
}

fn __repr__(&self) -> PyResult<String> {
Ok(format!("Unnest({})", self))
}

fn __name__(&self) -> PyResult<String> {
Ok("Unnest".to_string())
}
}

impl LogicalNode for PyUnnest {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.unnest_.input).clone())]
}

fn to_variant(&self, py: Python) -> PyResult<PyObject> {
Ok(self.clone().into_py(py))
}
}
2 changes: 2 additions & 0 deletions src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::expr::sort::PySort;
use crate::expr::subquery::PySubquery;
use crate::expr::subquery_alias::PySubqueryAlias;
use crate::expr::table_scan::PyTableScan;
use crate::expr::unnest::PyUnnest;
use crate::expr::window::PyWindow;
use datafusion_expr::LogicalPlan;
use pyo3::prelude::*;
Expand Down Expand Up @@ -78,6 +79,7 @@ impl PyLogicalPlan {
LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py),
LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
LogicalPlan::Window(plan) => PyWindow::from(plan.clone()).to_variant(py),
other => Err(py_unsupported_variant_err(format!(
"Cannot convert this plan to a LogicalNode: {:?}",
Expand Down

0 comments on commit 7366f89

Please sign in to comment.