diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bdaed479655..9466ff6dd459 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -26,6 +26,7 @@ use crate::aggregates::{ topk_stream::GroupedTopKAggregateStream, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::get_field_metadata; use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, @@ -795,14 +796,17 @@ fn create_schema( ) -> Result { let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len()); for (index, (expr, name)) in group_expr.iter().enumerate() { - fields.push(Field::new( - name, - expr.data_type(input_schema)?, - // In cases where we have multiple grouping sets, we will use NULL expressions in - // order to align the grouping sets. So the field must be nullable even if the underlying - // schema field is not. - group_expr_nullable[index] || expr.nullable(input_schema)?, - )) + fields.push( + Field::new( + name, + expr.data_type(input_schema)?, + // In cases where we have multiple grouping sets, we will use NULL expressions in + // order to align the grouping sets. So the field must be nullable even if the underlying + // schema field is not. + group_expr_nullable[index] || expr.nullable(input_schema)?, + ) + .with_metadata(get_field_metadata(expr, input_schema).unwrap_or_default()), + ) } match mode { @@ -823,7 +827,10 @@ fn create_schema( } } - Ok(Schema::new(fields)) + Ok(Schema::new_with_metadata( + fields, + input_schema.metadata().clone(), + )) } fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f1b9cdaf728f..4c889d1fc88c 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -237,7 +237,7 @@ impl ExecutionPlan for ProjectionExec { /// If e is a direct column reference, returns the field level /// metadata for that field, if any. Otherwise returns None -fn get_field_metadata( +pub(crate) fn get_field_metadata( e: &Arc, input_schema: &Schema, ) -> Option> { diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 0f505a7d7b53..f38281abc5ab 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -61,15 +61,19 @@ WHERE "data"."id" = "samples"."id"; # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query I select count(distinct name) from table_with_metadata; +---- +2 # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query I select approx_median(distinct id) from table_with_metadata; +---- +2 # Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +statement ok select array_agg(distinct id) from table_with_metadata; query I