From 2d70413d69f0ccb878e5a76d6f8bc9b937318653 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 15 Aug 2024 12:26:00 -0400 Subject: [PATCH] Write null counts in parquet files when they are present --- parquet/src/arrow/arrow_reader/statistics.rs | 27 +++- parquet/src/file/statistics.rs | 160 ++++++++++++++++--- parquet/tests/arrow_reader/statistics.rs | 67 +++++++- parquet/tests/arrow_writer_layout.rs | 52 +++--- 4 files changed, 252 insertions(+), 54 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 602a9ad5e506..c8fa80d1c596 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1179,6 +1179,8 @@ pub struct StatisticsConverter<'a> { parquet_column_index: Option, /// The field (with data type) of the column in the Arrow schema arrow_field: &'a Field, + /// treat missing null_counts as 0 nulls + missing_null_counts_as_zero: bool, } impl<'a> StatisticsConverter<'a> { @@ -1195,6 +1197,18 @@ impl<'a> StatisticsConverter<'a> { self.arrow_field } + /// Set the statistics converter to treat missing null counts as missing + /// + /// By default, the converter will treat missing null counts as 0 nulls. + /// + /// Due to , prior to version + /// 53.0.0, parquet files written by parquet-rs did not store null counts + /// when there were zero nulls. + pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self { + self.missing_null_counts_as_zero = missing_null_counts_as_zero; + self + } + /// Returns a [`UInt64Array`] with row counts for each row group /// /// # Return Value @@ -1288,6 +1302,7 @@ impl<'a> StatisticsConverter<'a> { Ok(Self { parquet_column_index: parquet_index, arrow_field, + missing_null_counts_as_zero: true, }) } @@ -1386,7 +1401,15 @@ impl<'a> StatisticsConverter<'a> { let null_counts = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()) - .map(|s| s.and_then(|s| s.null_count_opt())); + .map(|s| { + s.and_then(|s| { + if self.missing_null_counts_as_zero { + Some(s.null_count_opt().unwrap_or(0)) + } else { + s.null_count_opt() + } + }) + }); Ok(UInt64Array::from_iter(null_counts)) } @@ -1597,3 +1620,5 @@ impl<'a> StatisticsConverter<'a> { new_null_array(data_type, num_row_groups) } } + +// See tests in parquet/tests/arrow_reader/statistics.rs diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 4134685ffcfb..2da86b5fa69c 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -125,22 +125,35 @@ pub fn from_thrift( ) -> Result> { Ok(match thrift_stats { Some(stats) => { - // Number of nulls recorded, when it is not available, we just mark it as 0. - // TODO this should be `None` if there is no information about NULLS. - // see https://github.com/apache/arrow-rs/pull/6216/files - let null_count = stats.null_count.unwrap_or(0); - - if null_count < 0 { - return Err(ParquetError::General(format!( - "Statistics null count is negative {}", - null_count - ))); - } + // transform null count to u64 + let null_count = stats + .null_count + .map(|null_count| { + if null_count < 0 { + return Err(ParquetError::General(format!( + "Statistics null count is negative {}", + null_count + ))); + } + Ok(null_count as u64) + }) + .transpose()?; - // Generic null count. - let null_count = Some(null_count as u64); // Generic distinct count (count of distinct values occurring) - let distinct_count = stats.distinct_count.map(|value| value as u64); + let distinct_count = stats + .distinct_count + .map(|distinct_count| { + if distinct_count < 0 { + return Err(ParquetError::General(format!( + "Statistics distinct count is negative {}", + distinct_count + ))); + } + + Ok(distinct_count as u64) + }) + .transpose()?; + // Whether or not statistics use deprecated min/max fields. let old_format = stats.min_value.is_none() && stats.max_value.is_none(); // Generic min value as bytes. @@ -244,20 +257,21 @@ pub fn from_thrift( pub fn to_thrift(stats: Option<&Statistics>) -> Option { let stats = stats?; - // record null counts if greater than zero. - // - // TODO: This should be Some(0) if there are no nulls. - // see https://github.com/apache/arrow-rs/pull/6216/files + // record null count if it can fit in i64 let null_count = stats .null_count_opt() - .map(|value| value as i64) - .filter(|&x| x > 0); + .and_then(|value| i64::try_from(value).ok()); + + // record distinct count if it can fit in i64 + let distinct_count = stats + .distinct_count() + .and_then(|value| i64::try_from(value).ok()); let mut thrift_stats = TStatistics { max: None, min: None, null_count, - distinct_count: stats.distinct_count().map(|value| value as i64), + distinct_count, max_value: None, min_value: None, is_max_value_exact: None, @@ -404,9 +418,20 @@ impl Statistics { /// Returns number of null values for the column, if known. /// Note that this includes all nulls when column is part of the complex type. /// - /// Note this API returns Some(0) even if the null count was not present - /// in the statistics. - /// See + /// Note: Versions of this library prior to `53.0.0` returned 0 if the null count was + /// not available. This method returns `None` in that case. + /// + /// Also, versions of this library prior to `53.0.0` did not store the null count in the + /// statistics if the null count was `0`. + /// + /// To preserve the prior behavior and read null counts properly from older files + /// you should default to zero: + /// + /// ```no_run + /// # use parquet::file::statistics::Statistics; + /// # let statistics: Statistics = todo!(); + /// let null_count = statistics.null_count_opt().unwrap_or(0); + /// ``` pub fn null_count_opt(&self) -> Option { statistics_enum_func![self, null_count_opt] } @@ -1041,4 +1066,91 @@ mod tests { true, )); } + + #[test] + fn test_count_encoding() { + statistics_count_test(None, None); + statistics_count_test(Some(0), Some(0)); + statistics_count_test(Some(100), Some(2000)); + statistics_count_test(Some(1), None); + statistics_count_test(None, Some(1)); + } + + #[test] + fn test_count_encoding_distinct_too_large() { + // statistics are stored using i64, so test trying to store larger values + let statistics = make_bool_stats(Some(u64::MAX), Some(100)); + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null + assert_eq!(thrift_stats.null_count, Some(100)); + } + + #[test] + fn test_count_encoding_null_too_large() { + // statistics are stored using i64, so test trying to store larger values + let statistics = make_bool_stats(Some(100), Some(u64::MAX)); + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.distinct_count, Some(100)); + assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null + } + + #[test] + fn test_count_decoding_distinct_invalid() { + let tstatistics = TStatistics { + distinct_count: Some(-42), + ..Default::default() + }; + let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: Statistics distinct count is negative -42" + ); + } + + #[test] + fn test_count_decoding_null_invalid() { + let tstatistics = TStatistics { + null_count: Some(-42), + ..Default::default() + }; + let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: Statistics null count is negative -42" + ); + } + + /// Writes statistics to thrift and reads them back and ensures: + /// - The statistics are the same + /// - The statistics written to thrift are the same as the original statistics + fn statistics_count_test(distinct_count: Option, null_count: Option) { + let statistics = make_bool_stats(distinct_count, null_count); + + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count); + assert_eq!( + thrift_stats.distinct_count.map(|c| c as u64), + distinct_count + ); + + let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats)) + .unwrap() + .unwrap(); + assert_eq!(round_tripped, statistics); + } + + fn make_bool_stats(distinct_count: Option, null_count: Option) -> Statistics { + let min = Some(true); + let max = Some(false); + let is_min_max_deprecated = false; + + // test is about the counts, so we aren't really testing the min/max values + Statistics::Boolean(ValueStatistics::new( + min, + max, + distinct_count, + null_count, + is_min_max_deprecated, + )) + } } diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 384be83d30e3..29adbbb4f6fe 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -22,6 +22,7 @@ use std::default::Default; use std::fs::File; use std::sync::Arc; +use super::make_test_file_rg; use super::{struct_array, Scenario}; use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::{ @@ -37,16 +38,17 @@ use arrow_array::{ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use half::f16; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, }; use parquet::arrow::ArrowWriter; +use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use parquet::file::properties::{EnabledStatistics, WriterProperties}; - -use super::make_test_file_rg; +use parquet::file::statistics::{Statistics, ValueStatistics}; +use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor}; #[derive(Debug, Default, Clone)] struct Int64Case { @@ -2139,6 +2141,65 @@ async fn test_missing_statistics() { .run(); } +#[test] +fn missing_null_counts_as_zero() { + let min = None; + let max = None; + let distinct_count = None; + let null_count = None; // NB: no null count + let is_min_max_deprecated = false; + let stats = Statistics::Boolean(ValueStatistics::new( + min, + max, + distinct_count, + null_count, + is_min_max_deprecated, + )); + let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema(); + + let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0)) + .set_statistics(stats) + .build() + .unwrap(); + let metadata = RowGroupMetaData::builder(parquet_schema.clone()) + .set_column_metadata(vec![column_chunk]) + .build() + .unwrap(); + + let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap(); + + // by default null count should be 0 + assert_eq!( + converter.row_group_null_counts([&metadata]).unwrap(), + UInt64Array::from_iter(vec![Some(0)]) + ); + + // if we disable missing null counts as zero flag null count will be None + let converter = converter.with_missing_null_counts_as_zero(false); + assert_eq!( + converter.row_group_null_counts([&metadata]).unwrap(), + UInt64Array::from_iter(vec![None]) + ); +} + +/// return an Arrow schema and corresponding Parquet SchemaDescriptor for +/// a schema with a single boolean column "b" +fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) { + let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)])); + use parquet::schema::types::Type as ParquetType; + let parquet_schema = ParquetType::group_type_builder("schema") + .with_fields(vec![Arc::new( + ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32) + .build() + .unwrap(), + )]) + .build() + .unwrap(); + + let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema))); + (arrow_schema, parquet_schema) +} + /////// NEGATIVE TESTS /////// // column not found #[tokio::test] diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index 3e0f6ce3a8b3..9a66d13f84d7 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -189,7 +189,7 @@ fn test_primitive() { pages: (0..8) .map(|_| Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -218,14 +218,14 @@ fn test_primitive() { pages: vec![ Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 258, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1750, - page_header_size: 36, + page_header_size: 38, compressed_size: 7000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -233,7 +233,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -260,42 +260,42 @@ fn test_primitive() { pages: vec![ Page { rows: 400, - page_header_size: 36, + page_header_size: 38, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 36, + page_header_size: 38, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 36, + page_header_size: 38, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -303,7 +303,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 36, + page_header_size: 38, compressed_size: 8000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -329,7 +329,7 @@ fn test_primitive() { pages: (0..20) .map(|_| Page { rows: 100, - page_header_size: 36, + page_header_size: 38, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -364,14 +364,14 @@ fn test_string() { pages: (0..15) .map(|_| Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }) .chain(std::iter::once(Page { rows: 50, - page_header_size: 35, + page_header_size: 37, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -400,21 +400,21 @@ fn test_string() { pages: vec![ Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 138, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1250, - page_header_size: 38, + page_header_size: 40, compressed_size: 10000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, Page { rows: 620, - page_header_size: 36, + page_header_size: 38, compressed_size: 4960, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -422,7 +422,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -449,42 +449,42 @@ fn test_string() { pages: vec![ Page { rows: 400, - page_header_size: 36, + page_header_size: 38, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 36, + page_header_size: 38, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 36, + page_header_size: 38, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -492,7 +492,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 36, + page_header_size: 38, compressed_size: 16000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -532,7 +532,7 @@ fn test_list() { pages: (0..10) .map(|_| Page { rows: 20, - page_header_size: 36, + page_header_size: 38, compressed_size: 672, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE,