diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 359de2d30a57..9a328d04773d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1289,7 +1289,7 @@ impl LogicalPlan { /// referenced expressions into columns. /// /// See also: [`crate::utils::columnize_expr`] - pub(crate) fn columnized_output_exprs(&self) -> Result> { + pub fn columnized_output_exprs(&self) -> Result> { match self { LogicalPlan::Aggregate(aggregate) => Ok(aggregate .output_expressions()? diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index a6b9bad6c5d9..a7691987550e 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use itertools::izip; -use arrow::datatypes::{DataType, Field, IntervalUnit}; +use arrow::datatypes::{DataType, Field, IntervalUnit, Schema}; use crate::analyzer::AnalyzerRule; use crate::utils::NamePreserver; @@ -66,19 +66,39 @@ impl TypeCoercion { } } +/// Coerce output schema based upon optimizer config. +fn coerce_output(plan: LogicalPlan, config: &ConfigOptions) -> Result { + if !config.optimizer.expand_views_at_output { + return Ok(plan); + } + + let outer_refs = plan.expressions(); + if outer_refs.is_empty() { + return Ok(plan); + } + + if let Some(dfschema) = transform_schema_to_nonview(plan.schema()) { + coerce_plan_expr_for_schema(plan, &dfschema?) + } else { + Ok(plan) + } +} + impl AnalyzerRule for TypeCoercion { fn name(&self) -> &str { "type_coercion" } - fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { + fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result { let empty_schema = DFSchema::empty(); + // recurse let transformed_plan = plan .transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))? .data; - Ok(transformed_plan) + // finish + coerce_output(transformed_plan, config) } } @@ -515,6 +535,59 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { } } +/// Transform a schema to use non-view types for Utf8View and BinaryView +fn transform_schema_to_nonview(dfschema: &DFSchemaRef) -> Option> { + let metadata = dfschema.as_arrow().metadata.clone(); + let len = dfschema.fields().len(); + let mut transformed = false; + + let (qualifiers, transformed_fields) = dfschema + .iter() + .map(|(qualifier, field)| match field.data_type() { + DataType::Utf8View => { + transformed = true; + ( + qualifier, + Arc::new(Field::new( + field.name(), + DataType::LargeUtf8, + field.is_nullable(), + )), + ) + } + DataType::BinaryView => { + transformed = true; + ( + qualifier, + Arc::new(Field::new( + field.name(), + DataType::LargeBinary, + field.is_nullable(), + )), + ) + } + _ => (qualifier, Arc::clone(field)), + }) + .fold( + (Vec::with_capacity(len), Vec::with_capacity(len)), + |(mut qs, mut fs), (q, f)| { + qs.push(q.cloned()); + fs.push(f); + (qs, fs) + }, + ); + + if !transformed { + return None; + } + + let schema = Schema::new_with_metadata(transformed_fields, metadata); + Some(DFSchema::from_field_specific_qualified_schema( + qualifiers, + &Arc::new(schema), + )) +} + /// Casts the given `value` to `target_type`. Note that this function /// only considers `Null` or `Utf8` values. fn coerce_scalar(target_type: &DataType, value: &ScalarValue) -> Result {