diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 9ff7710f968b2..433dda870defa 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -468,7 +468,9 @@ pub fn can_interleave>>( } fn union_schema(inputs: &[Arc]) -> SchemaRef { - let fields: Vec = (0..inputs[0].schema().fields().len()) + let first_schema = inputs[0].schema(); + + let fields = (0..first_schema.fields().len()) .map(|i| { inputs .iter() @@ -477,25 +479,30 @@ fn union_schema(inputs: &[Arc]) -> SchemaRef { let field = input.schema().field(i).clone(); let mut metadata = field.metadata().clone(); - let other_side_metdata = inputs - .get(input_idx ^ (1 << 0)) - .map(|other_input| { - other_input.schema().field(i).metadata().clone() - }) - .unwrap_or_default(); + let other_metadatas = inputs + .iter() + .enumerate() + .filter(|(other_idx, _)| *other_idx != input_idx) + .flat_map(|(_, other_input)| { + other_input.schema().field(i).metadata().clone().into_iter() + }); - metadata.extend(other_side_metdata); + metadata.extend(other_metadatas); field.with_metadata(metadata) }) - .find_or_first(|f| f.is_nullable()) + .find_or_first(Field::is_nullable) + // We can unwrap this because if inputs was empty, this would've already panic'ed when we + // indexed into inputs[0]. .unwrap() }) + .collect::>(); + + let all_metadata_merged = inputs + .iter() + .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Arc::new(Schema::new_with_metadata( - fields, - inputs[0].schema().metadata().clone(), - )) + Arc::new(Schema::new_with_metadata(fields, all_metadata_merged)) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one