From f3149b16c9aac6d2beb1ce06947dee46cede6721 Mon Sep 17 00:00:00 2001 From: Eduardo Vega Date: Sun, 23 Jun 2024 04:14:40 -0600 Subject: [PATCH 1/2] Support dictionary data type in array_to_string (#10908) * Support dictionary data type in array_to_string * Fix import * Some tests * Update datafusion/functions-array/src/string.rs Co-authored-by: Alex Huang * Add some tests showing incorrect results * Get logical array * apply rust fmt * Simplify implementation, avoid panics --------- Co-authored-by: Alex Huang Co-authored-by: Andrew Lamb --- datafusion/functions-array/src/string.rs | 29 ++++++++++-- datafusion/sqllogictest/test_files/array.slt | 48 ++++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-array/src/string.rs b/datafusion/functions-array/src/string.rs index 04832b4b1259..d02c863db8b7 100644 --- a/datafusion/functions-array/src/string.rs +++ b/datafusion/functions-array/src/string.rs @@ -26,12 +26,15 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field}; use datafusion_expr::TypeSignature; -use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use std::any::{type_name, Any}; use crate::utils::{downcast_arg, make_scalar_function}; -use arrow_schema::DataType::{FixedSizeList, LargeList, LargeUtf8, List, Null, Utf8}; +use arrow::compute::cast; +use arrow_schema::DataType::{ + Dictionary, FixedSizeList, LargeList, LargeUtf8, List, Null, Utf8, +}; use datafusion_common::cast::{ as_generic_string_array, as_large_list_array, as_list_array, as_string_array, }; @@ -76,7 +79,7 @@ macro_rules! call_array_function { DataType::UInt16 => array_function!(UInt16Array), DataType::UInt32 => array_function!(UInt32Array), DataType::UInt64 => array_function!(UInt64Array), - _ => unreachable!(), + dt => not_impl_err!("Unsupported data type in array_to_string: {dt}"), } }; ($DATATYPE:expr, $INCLUDE_LIST:expr) => {{ @@ -95,7 +98,7 @@ macro_rules! call_array_function { DataType::UInt16 => array_function!(UInt16Array), DataType::UInt32 => array_function!(UInt32Array), DataType::UInt64 => array_function!(UInt64Array), - _ => unreachable!(), + dt => not_impl_err!("Unsupported data type in array_to_string: {dt}"), } }}; } @@ -245,6 +248,8 @@ pub(super) fn array_to_string_inner(args: &[ArrayRef]) -> Result { with_null_string = true; } + /// Creates a single string from single element of a ListArray (which is + /// itself another Array) fn compute_array_to_string( arg: &mut String, arr: ArrayRef, @@ -281,6 +286,22 @@ pub(super) fn array_to_string_inner(args: &[ArrayRef]) -> Result { Ok(arg) } + Dictionary(_key_type, value_type) => { + // Call cast to unwrap the dictionary. This could be optimized if we wanted + // to accept the overhead of extra code + let values = cast(&arr, value_type.as_ref()).map_err(|e| { + DataFusionError::from(e).context( + "Casting dictionary to values in compute_array_to_string", + ) + })?; + compute_array_to_string( + arg, + values, + delimiter, + null_string, + with_null_string, + ) + } Null => Ok(arg), data_type => { macro_rules! array_function { diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 55a430767c76..77d1a9da1f55 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3769,6 +3769,54 @@ select array_to_string(make_array(), ',') ---- (empty) +# array to string dictionary +statement ok +CREATE TABLE table1 AS VALUES + (1, 'foo'), + (3, 'bar'), + (1, 'foo'), + (2, NULL), + (NULL, 'baz') + ; + +# expect 1-3-1-2 (dictionary values should be repeated) +query T +SELECT array_to_string(array_agg(column1),'-') +FROM ( + SELECT arrow_cast(column1, 'Dictionary(Int32, Int32)') as column1 + FROM table1 +); +---- +1-3-1-2 + +# expect foo,bar,foo,baz (dictionary values should be repeated) +query T +SELECT array_to_string(array_agg(column2),',') +FROM ( + SELECT arrow_cast(column2, 'Dictionary(Int64, Utf8)') as column2 + FROM table1 +); +---- +foo,bar,foo,baz + +# Expect only values that are in the group +query I?T +SELECT column1, array_agg(column2), array_to_string(array_agg(column2),',') +FROM ( + SELECT column1, arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2 + FROM table1 +) +GROUP BY column1 +ORDER BY column1; +---- +1 [foo, foo] foo,foo +2 [] (empty) +3 [bar] bar +NULL [baz] baz + +statement ok +drop table table1; + ## array_union (aliases: `list_union`) From bdea76ccfb8c3a02d19815cfe95e8fc67473ee99 Mon Sep 17 00:00:00 2001 From: Xin Li <33629085+xinlifoobar@users.noreply.github.com> Date: Tue, 25 Jun 2024 05:45:14 +0800 Subject: [PATCH 2/2] Update ListingTable to use StatisticsConverter (#11068) * Update ListingTable to use StatisticsConverter * complete support for all types parquet * Fix misc * Fix misc * fix test * fix misc * fix none --- .../src/datasource/file_format/parquet.rs | 254 +++++++----------- 1 file changed, 103 insertions(+), 151 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 572904254fd7..4204593eba96 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,25 +25,20 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; -use crate::arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, -}; -use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; +use crate::arrow::array::RecordBatch; +use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; -use crate::datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapterFactory, -}; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, }; +use arrow::compute::sum; use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; @@ -52,11 +47,13 @@ use datafusion_common::{ }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; +use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, @@ -66,16 +63,17 @@ use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::footer::{decode_footer, decode_metadata}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; -use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use crate::datasource::physical_plan::parquet::{ + ParquetExecBuilder, StatisticsConverter, +}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -295,86 +293,6 @@ impl FileFormat for ParquetFormat { } } -fn summarize_min_max( - max_values: &mut [Option], - min_values: &mut [Option], - fields: &Fields, - i: usize, - stat: &ParquetStatistics, -) { - if !stat.has_min_max_set() { - max_values[i] = None; - min_values[i] = None; - return; - } - match stat { - ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - _ => { - max_values[i] = None; - min_values[i] = None; - } - } -} - /// Fetches parquet metadata from ObjectStore for given object /// /// This component is a subject to **change** in near future and is exposed for low level integrations @@ -467,7 +385,7 @@ async fn fetch_statistics( statistics_from_parquet_meta(&metadata, table_schema).await } -/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] +/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using ['StatisticsConverter`] /// /// The statistics are calculated for each column in the table schema /// using the row group statistics in the parquet metadata. @@ -475,80 +393,107 @@ pub async fn statistics_from_parquet_meta( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { - let file_metadata = metadata.file_metadata(); + let row_groups_metadata = metadata.row_groups(); + let mut statistics = Statistics::new_unknown(&table_schema); + let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; + for row_group_meta in row_groups_metadata { + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + row_group_meta.columns().iter().for_each(|column| { + has_statistics = column.statistics().is_some(); + }); + } + } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); + + let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( file_metadata.schema_descr(), file_metadata.key_value_metadata(), )?; - let num_fields = table_schema.fields().len(); - let fields = table_schema.fields(); - - let mut num_rows = 0; - let mut total_byte_size = 0; - let mut null_counts = vec![Precision::Exact(0); num_fields]; - let mut has_statistics = false; - - let schema_adapter = - DefaultSchemaAdapterFactory::default().create(table_schema.clone()); + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; - let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); - - for row_group_meta in metadata.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); - - let mut column_stats: HashMap = HashMap::new(); - - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - has_statistics = true; - column_stats.insert(i, (stat.null_count(), stat)); - } - } - - if has_statistics { - for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() { - if let Some(file_idx) = - schema_adapter.map_column_index(table_idx, &file_schema) - { - if let Some((null_count, stats)) = column_stats.get(&file_idx) { - *null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize)); - summarize_min_max( - &mut max_values, - &mut min_values, - fields, - table_idx, - stats, + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + num_rows, + &stats_converter, + row_groups_metadata, ) - } else { - // If none statistics of current column exists, set the Max/Min Accumulator to None. - max_values[table_idx] = None; - min_values[table_idx] = None; + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {}", e); + null_counts_array[idx] = Precision::Exact(num_rows); } - } else { - *null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize)); } - } - } - } + }); - let column_stats = if has_statistics { - get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values) + get_col_stats( + &table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ) } else { Statistics::unknown_column(&table_schema) }; - let statistics = Statistics { - num_rows: Precision::Exact(num_rows as usize), - total_byte_size: Precision::Exact(total_byte_size as usize), - column_statistics: column_stats, - }; - Ok(statistics) } +fn summarize_min_max_null_counts( + min_accs: &mut [Option], + max_accs: &mut [Option], + null_counts_array: &mut [Precision], + arrow_schema_index: usize, + num_rows: usize, + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result<()> { + let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; + let min_values = stats_converter.row_group_mins(row_groups_metadata)?; + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + + if let Some(max_acc) = &mut max_accs[arrow_schema_index] { + max_acc.update_batch(&[max_values])?; + } + + if let Some(min_acc) = &mut min_accs[arrow_schema_index] { + min_acc.update_batch(&[min_values])?; + } + + null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); + + Ok(()) +} + /// Implements [`DataSink`] for writing to a parquet file. pub struct ParquetSink { /// Config options for writing data @@ -1126,7 +1071,8 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow_schema::Field; + use arrow_array::Int64Array; + use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, @@ -1449,8 +1395,14 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!(c1_stats.max_value, Precision::Absent); - assert_eq!(c1_stats.min_value, Precision::Absent); + assert_eq!( + c1_stats.max_value, + Precision::Exact(ScalarValue::Utf8(Some("bar".to_string()))) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(ScalarValue::Utf8(Some("Foo".to_string()))) + ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3));