From 67efbc584622f70aa0d0e1a2e4c789858a7de27c Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 13 Jun 2024 07:38:37 -0400 Subject: [PATCH] refactor: fetch statistics for a given ParquetMetaData (#10880) * refactor: fetch statistics for a given ParquetMetaData * test: add tests for fetch_statistics_from_parquet_meta * Rename function and improve docs * Simplify the test --------- Co-authored-by: Andrew Lamb --- .../src/datasource/file_format/parquet.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 99c38d3f0980..572904254fd7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -455,6 +455,8 @@ async fn fetch_schema( } /// Read and parse the statistics of the Parquet file at location `path` +/// +/// See [`statistics_from_parquet_meta`] for more details async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, @@ -462,6 +464,17 @@ async fn fetch_statistics( metadata_size_hint: Option, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; + statistics_from_parquet_meta(&metadata, table_schema).await +} + +/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] +/// +/// The statistics are calculated for each column in the table schema +/// using the row group statistics in the parquet metadata. +pub async fn statistics_from_parquet_meta( + metadata: &ParquetMetaData, + table_schema: SchemaRef, +) -> Result { let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( @@ -1402,6 +1415,66 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_from_parquet_metadata() -> Result<()> { + // Data for column c1: ["Foo", null, "bar"] + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); + + // Data for column c2: [1, 2, null] + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + + // Use store_parquet to write each batch to its own file + // . batch1 written into first file and includes: + // - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values + // . batch2 written into second file and includes: + // - column c2 that has 3 rows with one null. Stats min and max of int are avaialble and 1 and 2 respectively + let store = Arc::new(LocalFileSystem::new()) as _; + let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; + + let state = SessionContext::new().state(); + let format = ParquetFormat::default(); + let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + + let null_i64 = ScalarValue::Int64(None); + let null_utf8 = ScalarValue::Utf8(None); + + // Fetch statistics for first file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; + let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?; + // + assert_eq!(stats.num_rows, Precision::Exact(3)); + // column c1 + let c1_stats = &stats.column_statistics[0]; + assert_eq!(c1_stats.null_count, Precision::Exact(1)); + assert_eq!(c1_stats.max_value, Precision::Absent); + assert_eq!(c1_stats.min_value, Precision::Absent); + // column c2: missing from the file so the table treats all 3 rows as null + let c2_stats = &stats.column_statistics[1]; + assert_eq!(c2_stats.null_count, Precision::Exact(3)); + assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone())); + assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); + + // Fetch statistics for second file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?; + let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?; + assert_eq!(stats.num_rows, Precision::Exact(3)); + // column c1: missing from the file so the table treats all 3 rows as null + let c1_stats = &stats.column_statistics[0]; + assert_eq!(c1_stats.null_count, Precision::Exact(3)); + assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone())); + assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone())); + // column c2 + let c2_stats = &stats.column_statistics[1]; + assert_eq!(c2_stats.null_count, Precision::Exact(1)); + assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into())); + assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into())); + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2);