Skip to content

Commit

Permalink
feat(12119): coerce output based upon expand_views_at_output
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Aug 31, 2024
1 parent eb6e21d commit 081eb9f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ impl LogicalPlan {
/// referenced expressions into columns.
///
/// See also: [`crate::utils::columnize_expr`]
pub(crate) fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
match self {
LogicalPlan::Aggregate(aggregate) => Ok(aggregate
.output_expressions()?
Expand Down
79 changes: 76 additions & 3 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use itertools::izip;

use arrow::datatypes::{DataType, Field, IntervalUnit};
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema};

use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;
Expand Down Expand Up @@ -66,19 +66,39 @@ impl TypeCoercion {
}
}

/// Coerce output schema based upon optimizer config.
fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
if !config.optimizer.expand_views_at_output {
return Ok(plan);
}

let outer_refs = plan.expressions();
if outer_refs.is_empty() {
return Ok(plan);
}

if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) {
coerce_plan_expr_for_schema(plan, &dfschema?)
} else {
Ok(plan)
}
}

impl AnalyzerRule for TypeCoercion {
fn name(&self) -> &str {
"type_coercion"
}

fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
let empty_schema = DFSchema::empty();

// recurse
let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;

Ok(transformed_plan)
// finish
coerce_output(transformed_plan, config)
}
}

Expand Down Expand Up @@ -515,6 +535,59 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
}
}

/// Transform a schema to use non-view types for Utf8View and BinaryView
fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option<Result<DFSchema>> {
let metadata = dfschema.as_arrow().metadata.clone();
let len = dfschema.fields().len();
let mut transformed = false;

let (qualifiers, transformed_fields) = dfschema
.iter()
.map(|(qualifier, field)| match field.data_type() {
DataType::Utf8View => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
)),
)
}
DataType::BinaryView => {
transformed = true;
(
qualifier,
Arc::new(Field::new(
field.name(),
DataType::LargeBinary,
field.is_nullable(),
)),
)
}
_ => (qualifier, Arc::clone(field)),
})
.fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut qs, mut fs), (q, f)| {
qs.push(q.cloned());
fs.push(f);
(qs, fs)
},
);

if !transformed {
return None;
}

let schema = Schema::new_with_metadata(transformed_fields, metadata);
Some(DFSchema::from_field_specific_qualified_schema(
qualifiers,
&Arc::new(schema),
))
}

/// Casts the given `value` to `target_type`. Note that this function
/// only considers `Null` or `Utf8` values.
fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result<ScalarValue> {
Expand Down

0 comments on commit 081eb9f

Please sign in to comment.