From d1c78c2aabd6c4295fc4c419091b2d65ffe25532 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Dec 2024 15:22:06 -0500 Subject: [PATCH] Add ArrowToParquetSchemaConverter, deprecate `arrow_to_parquet_schema` et al --- parquet/src/arrow/arrow_writer/mod.rs | 16 +-- parquet/src/arrow/mod.rs | 8 +- parquet/src/arrow/schema/mod.rs | 135 ++++++++++++++++++++++---- parquet/src/file/properties.rs | 19 ++-- 4 files changed, 137 insertions(+), 41 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 222d86131e0a..1533a64bdfc3 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -30,12 +30,10 @@ use arrow_array::types::*; use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter}; use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef}; -use super::schema::{ - add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, - arrow_to_parquet_schema_with_root, decimal_length_from_precision, -}; +use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision}; use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder; +use crate::arrow::ArrowToParquetSchemaConverter; use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter}; use crate::column::writer::encoder::ColumnValueEncoder; use crate::column::writer::{ @@ -181,10 +179,12 @@ impl ArrowWriter { options: ArrowWriterOptions, ) -> Result { let mut props = options.properties; - let schema = match options.schema_root { - Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?, - None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?, - }; + let mut converter = ArrowToParquetSchemaConverter::new(&arrow_schema) + .with_coerce_types(props.coerce_types()); + if let Some(s) = &options.schema_root { + converter = converter.schema_root(s); + } + let schema = converter.build()?; if !options.skip_arrow_metadata { // add serialized arrow schema add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 2d09cd19203f..df8e2d74c026 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -116,9 +116,13 @@ pub use self::async_writer::AsyncArrowWriter; use crate::schema::types::SchemaDescriptor; use arrow_schema::{FieldRef, Schema}; +// continue to until functions are removed +#[allow(deprecated)] +pub use self::schema::arrow_to_parquet_schema; + pub use self::schema::{ - arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema, - parquet_to_arrow_schema_by_columns, FieldLevels, + parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + ArrowToParquetSchemaConverter, FieldLevels, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index ec34840d858f..856445840cae 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -225,29 +225,121 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut } } +/// Converter for arrow schema to parquet schema +/// +/// Example: +/// ``` +/// # use std::sync::Arc; +/// use arrow_schema::{Field, Schema, DataType}; +/// use parquet::arrow::ArrowToParquetSchemaConverter; +/// use parquet::schema::types::{SchemaDescriptor, Type}; +/// use parquet::basic; +/// let arrow_schema = Schema::new(vec![ +/// Field::new("a", DataType::Int64, true), +/// Field::new("b", DataType::Date32, true), +/// ]); +/// +/// let parquet_schema = ArrowToParquetSchemaConverter::new(&arrow_schema) +/// .build() +/// .unwrap(); +/// // +/// let expected_parquet_schema = SchemaDescriptor::new( +/// Arc::new( +/// Type::group_type_builder("arrow_schema") +/// .with_fields(vec![ +/// Arc::new( +/// Type::primitive_type_builder("a", basic::Type::INT64) +/// .build().unwrap() +/// ), +/// Arc::new( +/// Type::primitive_type_builder("b", basic::Type::INT32) +/// .with_converted_type(basic::ConvertedType::DATE) +/// .with_logical_type(Some(basic::LogicalType::Date)) +/// .build().unwrap() +/// ), +/// ]) +/// .build().unwrap() +/// ) +/// ); +/// +/// assert_eq!(parquet_schema, expected_parquet_schema); +/// ``` +#[derive(Debug)] +pub struct ArrowToParquetSchemaConverter<'a> { + /// The schema to convert + schema: &'a Schema, + /// Name of the root schema in Parquet + schema_root: &'a str, + /// Should we Coerce arrow types to compatible Parquet types? + /// + /// See docs on [Self::with_coerce_types]` + coerce_types: bool +} + +impl <'a> ArrowToParquetSchemaConverter<'a> { + /// Create a new converter + pub fn new(schema: &'a Schema) -> Self { + Self { + schema, + schema_root: "arrow_schema", + coerce_types: false, + } + } + + /// Should arrow types be coerced into parquet native types (default false). + /// + /// Setting this option to `true` will result in parquet files that can be + /// read by more readers, but may lose precision for arrow types such as + /// [`DataType::Date64`] which have no direct corresponding Parquet type. + /// + /// # Discussion + /// + /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no + /// corresponding Parquet logical type. Thus, they can not be losslessly + /// round-tripped when stored using the appropriate Parquet logical type. + /// + /// For example, some Date64 values may be truncated when stored with + /// parquet's native 32 bit date type. + /// + /// By default, the arrow writer does not coerce to native parquet types. It + /// writes data in such a way that it can be lossless round tripped. + /// However, this means downstream readers must be aware of and correctly + /// interpret the embedded Arrow schema. + pub fn with_coerce_types(mut self, coerce_types: bool) -> Self { + self.coerce_types = coerce_types; + self + } + + /// Set the root schema element name (defaults to `"arrow_schema"`). + pub fn schema_root(mut self, schema_root: &'a str) -> Self { + self.schema_root = schema_root; + self + } + + /// Build the desired parquet [`SchemaDescriptor`] + pub fn build(self) -> Result { + let Self { schema, schema_root: root_schema_name, coerce_types } = self; + let fields = schema + .fields() + .iter() + .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new)) + .collect::>()?; + let group = Type::group_type_builder(root_schema_name).with_fields(fields).build()?; + Ok(SchemaDescriptor::new(Arc::new(group))) + } +} + /// Convert arrow schema to parquet schema /// /// The name of the root schema element defaults to `"arrow_schema"`, this can be /// overridden with [`arrow_to_parquet_schema_with_root`] -pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> Result { - arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types) -} +#[deprecated(since = "54.0.0", note = "Use `ArrowToParquetSchemaConverter` instead")] +pub fn arrow_to_parquet_schema(schema: &Schema) -> Result { -/// Convert arrow schema to parquet schema specifying the name of the root schema element -pub fn arrow_to_parquet_schema_with_root( - schema: &Schema, - root: &str, - coerce_types: bool, -) -> Result { - let fields = schema - .fields() - .iter() - .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new)) - .collect::>()?; - let group = Type::group_type_builder(root).with_fields(fields).build()?; - Ok(SchemaDescriptor::new(Arc::new(group))) + ArrowToParquetSchemaConverter::new(schema).build() } + fn parse_key_value_metadata( key_value_metadata: Option<&Vec>, ) -> Option> { @@ -1569,7 +1661,7 @@ mod tests { Field::new("decimal256", DataType::Decimal256(39, 2), false), ]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, false).unwrap(); + let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema).build().unwrap(); assert_eq!( parquet_schema.columns().len(), @@ -1606,9 +1698,10 @@ mod tests { false, )]; let arrow_schema = Schema::new(arrow_fields); - let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, true); + let converted_arrow_schema = ArrowToParquetSchemaConverter::new(&arrow_schema) + .with_coerce_types(true) + .build(); - assert!(converted_arrow_schema.is_err()); converted_arrow_schema.unwrap(); } @@ -1878,7 +1971,9 @@ mod tests { // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; - let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?; + let parq_schema_descr = crate::arrow::ArrowToParquetSchemaConverter::new(&arrow_schema) + .with_coerce_types(true) + .build()?; let parq_fields = parq_schema_descr.root_schema().get_fields(); assert_eq!(parq_fields.len(), 2); assert_eq!(parq_fields[0].get_basic_info().id(), 1); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 1e8a4868dfc3..55ce6794507b 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -16,14 +16,13 @@ // under the License. //! Configuration via [`WriterProperties`] and [`ReaderProperties`] -use std::str::FromStr; -use std::{collections::HashMap, sync::Arc}; - use crate::basic::{Compression, Encoding}; use crate::compression::{CodecOptions, CodecOptionsBuilder}; use crate::file::metadata::KeyValue; use crate::format::SortingColumn; use crate::schema::types::ColumnPath; +use std::str::FromStr; +use std::{collections::HashMap, sync::Arc}; /// Default value for [`WriterProperties::data_page_size_limit`] pub const DEFAULT_PAGE_SIZE: usize = 1024 * 1024; @@ -287,15 +286,13 @@ impl WriterProperties { self.statistics_truncate_length } - /// Returns `coerce_types` boolean + /// Should the writer coerce types to parquet native types. + /// + /// Setting this option to `true` will result in parquet files that can be + /// read by more readers, but may lose precision for arrow types such as + /// [`DataType::Date64`] which have no direct corresponding Parquet type. /// - /// Some Arrow types do not have a corresponding Parquet logical type. - /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`. - /// Writers have the option to coerce these into native Parquet types. Type - /// coercion allows for meaningful representations that do not require - /// downstream readers to consider the embedded Arrow schema. However, type - /// coercion also prevents the data from being losslessly round-tripped. This method - /// returns `true` if type coercion enabled. + /// See [`ArrowToParquetSchemaConverter::with_coerce_types`] for more details pub fn coerce_types(&self) -> bool { self.coerce_types }