From 30c14abd3340735e5e4ab9375628f8d8ba7223b4 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 5 Dec 2024 15:05:29 -0800 Subject: [PATCH] Optionally coerce names of maps and lists to match Parquet specification (#6828) * optionally coerce names of maps and lists to match Parquet spec * less verbose * add ArrowWriter round trip test * move documentation to builder * use create_random_array for map and list arrays --- parquet/src/arrow/arrow_writer/mod.rs | 51 ++++++++++++ parquet/src/arrow/schema/mod.rs | 110 ++++++++++++++++++++++++-- parquet/src/bin/parquet-rewrite.rs | 7 ++ parquet/src/file/properties.rs | 28 ++++--- 4 files changed, 179 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 222d86131e0a..bb6ebf75ec8e 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1088,6 +1088,7 @@ mod tests { use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Schema}; use arrow::error::Result as ArrowResult; + use arrow::util::data_gen::create_random_array; use arrow::util::pretty::pretty_format_batches; use arrow::{array::*, buffer::Buffer}; use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer}; @@ -2491,6 +2492,56 @@ mod tests { one_column_roundtrip(values, false); } + #[test] + fn list_and_map_coerced_names() { + // Create map and list with non-Parquet naming + let list_field = + Field::new_list("my_list", Field::new("item", DataType::Int32, false), false); + let map_field = Field::new_map( + "my_map", + "entries", + Field::new("keys", DataType::Int32, false), + Field::new("values", DataType::Int32, true), + false, + true, + ); + + let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap(); + let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap(); + + let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field])); + + // Write data to Parquet but coerce names to match spec + let props = Some(WriterProperties::builder().set_coerce_types(true).build()); + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap(); + + let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap(); + writer.write(&batch).unwrap(); + let file_metadata = writer.close().unwrap(); + + // Coerced name of "item" should be "element" + assert_eq!(file_metadata.schema[3].name, "element"); + // Coerced name of "entries" should be "key_value" + assert_eq!(file_metadata.schema[5].name, "key_value"); + // Coerced name of "keys" should be "key" + assert_eq!(file_metadata.schema[6].name, "key"); + // Coerced name of "values" should be "value" + assert_eq!(file_metadata.schema[7].name, "value"); + + // Double check schema after reading from the file + let reader = SerializedFileReader::new(file).unwrap(); + let file_schema = reader.metadata().file_metadata().schema(); + let fields = file_schema.get_fields(); + let list_field = &fields[0].get_fields()[0]; + assert_eq!(list_field.get_fields()[0].name(), "element"); + let map_field = &fields[1].get_fields()[0]; + assert_eq!(map_field.name(), "key_value"); + assert_eq!(map_field.get_fields()[0].name(), "key"); + assert_eq!(map_field.get_fields()[1].name(), "value"); + } + #[test] fn fallback_flush_data_page() { //tests if the Fallback::flush_data_page clears all buffers correctly diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index ec34840d858f..0fbcb4856e46 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> usize { /// Convert an arrow field to a parquet `Type` fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { + const PARQUET_LIST_ELEMENT_NAME: &str = "element"; + const PARQUET_MAP_STRUCT_NAME: &str = "key_value"; + const PARQUET_KEY_FIELD_NAME: &str = "key"; + const PARQUET_VALUE_FIELD_NAME: &str = "value"; + let name = field.name().as_str(); let repetition = if field.is_nullable() { Repetition::OPTIONAL @@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { .with_id(id) .build(), DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { + let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME { + // Ensure proper naming per the Parquet specification + let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME); + Arc::new(arrow_to_parquet_type(&ff, coerce_types)?) + } else { + Arc::new(arrow_to_parquet_type(f, coerce_types)?) + }; + Type::group_type_builder(name) .with_fields(vec![Arc::new( Type::group_type_builder("list") - .with_fields(vec![Arc::new(arrow_to_parquet_type(f, coerce_types)?)]) + .with_fields(vec![field_ref]) .with_repetition(Repetition::REPEATED) .build()?, )]) @@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { } DataType::Map(field, _) => { if let DataType::Struct(struct_fields) = field.data_type() { + // If coercing then set inner struct name to "key_value" + let map_struct_name = if coerce_types { + PARQUET_MAP_STRUCT_NAME + } else { + field.name() + }; + + // If coercing then ensure struct fields are named "key" and "value" + let fix_map_field = |name: &str, fld: &Arc| -> Result> { + if coerce_types && fld.name() != name { + let f = fld.as_ref().clone().with_name(name); + Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?)) + } else { + Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?)) + } + }; + let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?; + let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?; + Type::group_type_builder(name) .with_fields(vec![Arc::new( - Type::group_type_builder(field.name()) - .with_fields(vec![ - Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?), - Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?), - ]) + Type::group_type_builder(map_struct_name) + .with_fields(vec![key_field, val_field]) .with_repetition(Repetition::REPEATED) .build()?, )]) @@ -1420,6 +1449,75 @@ mod tests { assert_eq!(arrow_fields, converted_arrow_fields); } + #[test] + fn test_coerced_map_list() { + // Create Arrow schema with non-Parquet naming + let arrow_fields = vec![ + Field::new_list( + "my_list", + Field::new("item", DataType::Boolean, true), + false, + ), + Field::new_map( + "my_map", + "entries", + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + false, + true, + ), + ]; + let arrow_schema = Schema::new(arrow_fields); + + // Create Parquet schema with coerced names + let message_type = " + message parquet_schema { + REQUIRED GROUP my_list (LIST) { + REPEATED GROUP list { + OPTIONAL BOOLEAN element; + } + } + OPTIONAL GROUP my_map (MAP) { + REPEATED GROUP key_value { + REQUIRED BINARY key (STRING); + OPTIONAL INT32 value; + } + } + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true).unwrap(); + assert_eq!( + parquet_schema.columns().len(), + converted_arrow_schema.columns().len() + ); + + // Create Parquet schema without coerced names + let message_type = " + message parquet_schema { + REQUIRED GROUP my_list (LIST) { + REPEATED GROUP list { + OPTIONAL BOOLEAN item; + } + } + OPTIONAL GROUP my_map (MAP) { + REPEATED GROUP entries { + REQUIRED BINARY keys (STRING); + OPTIONAL INT32 values; + } + } + } + "; + let parquet_group_type = parse_message_type(message_type).unwrap(); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); + assert_eq!( + parquet_schema.columns().len(), + converted_arrow_schema.columns().len() + ); + } + #[test] fn test_field_to_column_desc() { let message_type = " diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index ad0f7ae0df7d..eaecda50375d 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -199,6 +199,10 @@ struct Args { /// Sets writer version. #[clap(long)] writer_version: Option, + + /// Sets whether to coerce Arrow types to match Parquet specification + #[clap(long)] + coerce_types: Option, } fn main() { @@ -262,6 +266,9 @@ fn main() { if let Some(value) = args.writer_version { writer_properties_builder = writer_properties_builder.set_writer_version(value.into()); } + if let Some(value) = args.coerce_types { + writer_properties_builder = writer_properties_builder.set_coerce_types(value); + } let writer_properties = writer_properties_builder.build(); let mut parquet_writer = ArrowWriter::try_new( File::create(&args.output).expect("Unable to open output file"), diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 1e8a4868dfc3..aac450acd82f 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -287,15 +287,7 @@ impl WriterProperties { self.statistics_truncate_length } - /// Returns `coerce_types` boolean - /// - /// Some Arrow types do not have a corresponding Parquet logical type. - /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. - /// Writers have the option to coerce these into native Parquet types. Type - /// coercion allows for meaningful representations that do not require - /// downstream readers to consider the embedded Arrow schema. However, type - /// coercion also prevents the data from being losslessly round-tripped. This method - /// returns `true` if type coercion enabled. + /// Returns `true` if type coercion is enabled. pub fn coerce_types(&self) -> bool { self.coerce_types } @@ -788,8 +780,22 @@ impl WriterPropertiesBuilder { self } - /// Sets flag to enable/disable type coercion. - /// Takes precedence over globally defined settings. + /// Sets flag to control if type coercion is enabled (defaults to `false`). + /// + /// # Notes + /// Some Arrow types do not have a corresponding Parquet logical type. + /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. + /// Also, for [`List`] and [`Map`] types, Parquet expects certain schema elements + /// to have specific names to be considered fully compliant. + /// Writers have the option to coerce these types and names to match those required + /// by the Parquet specification. + /// This type coercion allows for meaningful representations that do not require + /// downstream readers to consider the embedded Arrow schema, and can allow for greater + /// compatibility with other Parquet implementations. However, type + /// coercion also prevents the data from being losslessly round-tripped. + /// + /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists + /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps pub fn set_coerce_types(mut self, coerce_types: bool) -> Self { self.coerce_types = coerce_types; self