From bea9daae95bb6f385abbbbb4bf25097cac50169d Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 1 Oct 2024 15:10:52 -0700 Subject: [PATCH] Write null counts in Parquet statistics when they are known (#6490) * Write null counts in parquet files when they are present * continue to treat None as Some(0) on read --------- Co-authored-by: Andrew Lamb --- parquet/src/file/statistics.rs | 103 +++++++++++++++++++++++++-- parquet/tests/arrow_writer_layout.rs | 52 +++++++------- 2 files changed, 122 insertions(+), 33 deletions(-) diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs index 854900f1edb9..50ed06436d86 100644 --- a/parquet/src/file/statistics.rs +++ b/parquet/src/file/statistics.rs @@ -248,20 +248,21 @@ pub fn from_thrift( pub fn to_thrift(stats: Option<&Statistics>) -> Option { let stats = stats?; - // record null counts if greater than zero. - // - // TODO: This should be Some(0) if there are no nulls. - // see https://github.com/apache/arrow-rs/pull/6216/files + // record null count if it can fit in i64 let null_count = stats .null_count_opt() - .map(|value| value as i64) - .filter(|&x| x > 0); + .and_then(|value| i64::try_from(value).ok()); + + // record distinct count if it can fit in i64 + let distinct_count = stats + .distinct_count_opt() + .and_then(|value| i64::try_from(value).ok()); let mut thrift_stats = TStatistics { max: None, min: None, null_count, - distinct_count: stats.distinct_count_opt().map(|value| value as i64), + distinct_count, max_value: None, min_value: None, is_max_value_exact: None, @@ -1052,4 +1053,92 @@ mod tests { true, )); } + + #[test] + fn test_count_encoding() { + statistics_count_test(None, None); + statistics_count_test(Some(0), Some(0)); + statistics_count_test(Some(100), Some(2000)); + statistics_count_test(Some(1), None); + statistics_count_test(None, Some(1)); + } + + #[test] + fn test_count_encoding_distinct_too_large() { + // statistics are stored using i64, so test trying to store larger values + let statistics = make_bool_stats(Some(u64::MAX), Some(100)); + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null + assert_eq!(thrift_stats.null_count, Some(100)); + } + + #[test] + fn test_count_encoding_null_too_large() { + // statistics are stored using i64, so test trying to store larger values + let statistics = make_bool_stats(Some(100), Some(u64::MAX)); + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.distinct_count, Some(100)); + assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null + } + + #[test] + fn test_count_decoding_null_invalid() { + let tstatistics = TStatistics { + null_count: Some(-42), + ..Default::default() + }; + let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: Statistics null count is negative -42" + ); + } + + /// Writes statistics to thrift and reads them back and ensures: + /// - The statistics are the same + /// - The statistics written to thrift are the same as the original statistics + fn statistics_count_test(distinct_count: Option, null_count: Option) { + let statistics = make_bool_stats(distinct_count, null_count); + + let thrift_stats = to_thrift(Some(&statistics)).unwrap(); + assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count); + assert_eq!( + thrift_stats.distinct_count.map(|c| c as u64), + distinct_count + ); + + let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats)) + .unwrap() + .unwrap(); + // TODO: remove branch when we no longer support assuming null_count==None in the thrift + // means null_count = Some(0) + if null_count.is_none() { + assert_ne!(round_tripped, statistics); + assert!(round_tripped.null_count_opt().is_some()); + assert_eq!(round_tripped.null_count_opt(), Some(0)); + assert_eq!(round_tripped.min_bytes_opt(), statistics.min_bytes_opt()); + assert_eq!(round_tripped.max_bytes_opt(), statistics.max_bytes_opt()); + assert_eq!( + round_tripped.distinct_count_opt(), + statistics.distinct_count_opt() + ); + } else { + assert_eq!(round_tripped, statistics); + } + } + + fn make_bool_stats(distinct_count: Option, null_count: Option) -> Statistics { + let min = Some(true); + let max = Some(false); + let is_min_max_deprecated = false; + + // test is about the counts, so we aren't really testing the min/max values + Statistics::Boolean(ValueStatistics::new( + min, + max, + distinct_count, + null_count, + is_min_max_deprecated, + )) + } } diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index 3e0f6ce3a8b3..9a66d13f84d7 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -189,7 +189,7 @@ fn test_primitive() { pages: (0..8) .map(|_| Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -218,14 +218,14 @@ fn test_primitive() { pages: vec![ Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 258, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1750, - page_header_size: 36, + page_header_size: 38, compressed_size: 7000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -233,7 +233,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 250, - page_header_size: 36, + page_header_size: 38, compressed_size: 1000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -260,42 +260,42 @@ fn test_primitive() { pages: vec![ Page { rows: 400, - page_header_size: 36, + page_header_size: 38, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 36, + page_header_size: 38, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 36, + page_header_size: 38, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -303,7 +303,7 @@ fn test_primitive() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 36, + page_header_size: 38, compressed_size: 8000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -329,7 +329,7 @@ fn test_primitive() { pages: (0..20) .map(|_| Page { rows: 100, - page_header_size: 36, + page_header_size: 38, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -364,14 +364,14 @@ fn test_string() { pages: (0..15) .map(|_| Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }) .chain(std::iter::once(Page { rows: 50, - page_header_size: 35, + page_header_size: 37, compressed_size: 400, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -400,21 +400,21 @@ fn test_string() { pages: vec![ Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 138, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 1250, - page_header_size: 38, + page_header_size: 40, compressed_size: 10000, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, Page { rows: 620, - page_header_size: 36, + page_header_size: 38, compressed_size: 4960, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, @@ -422,7 +422,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 130, - page_header_size: 36, + page_header_size: 38, compressed_size: 1040, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -449,42 +449,42 @@ fn test_string() { pages: vec![ Page { rows: 400, - page_header_size: 36, + page_header_size: 38, compressed_size: 452, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 370, - page_header_size: 36, + page_header_size: 38, compressed_size: 472, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 330, - page_header_size: 36, + page_header_size: 38, compressed_size: 464, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { rows: 240, - page_header_size: 36, + page_header_size: 38, compressed_size: 332, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, @@ -492,7 +492,7 @@ fn test_string() { ], dictionary_page: Some(Page { rows: 2000, - page_header_size: 36, + page_header_size: 38, compressed_size: 16000, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, @@ -532,7 +532,7 @@ fn test_list() { pages: (0..10) .map(|_| Page { rows: 20, - page_header_size: 36, + page_header_size: 38, compressed_size: 672, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE,