Skip to content

Commit

Permalink
Fix data page statistics when all rows are null in a data page (apach…
Browse files Browse the repository at this point in the history
…e#11295)

* Adds tests for data page statistics when all values on the page are null. Fixes most of the failing tests for iterators not handling this situation correctly.

* Fix handling of data page statistics for FixedBinaryArray using a builder.

* Fix data page all nulls stats test for Dictionary DataType.

* Fixes handling of None statistics for Decimal128 and Decimal256.

* Consolidate make_data_page_stats_iterator uses.

* Fix linting error.

* Remove unnecessary collect.

---------

Co-authored-by: Eric Fredine <[email protected]>
  • Loading branch information
efredine and Eric Fredine authored Jul 7, 2024
1 parent 229c139 commit 6f330c9
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 102 deletions.
128 changes: 68 additions & 60 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::array::builder::FixedSizeBinaryBuilder;
use arrow::datatypes::i256;
use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{
Expand Down Expand Up @@ -600,6 +601,31 @@ make_data_page_stats_iterator!(
Index::DOUBLE,
f64
);
make_data_page_stats_iterator!(
MinByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.min.clone() },
Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.max.clone() },
Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);

make_data_page_stats_iterator!(
MinFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);

macro_rules! get_decimal_page_stats_iterator {
($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => {
Expand Down Expand Up @@ -634,9 +660,7 @@ macro_rules! get_decimal_page_stats_iterator {
.indexes
.iter()
.map(|x| {
Some($stat_value_type::from(
x.$func.unwrap_or_default(),
))
x.$func.and_then(|x| Some($stat_value_type::from(x)))
})
.collect::<Vec<_>>(),
),
Expand All @@ -645,9 +669,7 @@ macro_rules! get_decimal_page_stats_iterator {
.indexes
.iter()
.map(|x| {
Some($stat_value_type::from(
x.$func.unwrap_or_default(),
))
x.$func.and_then(|x| Some($stat_value_type::from(x)))
})
.collect::<Vec<_>>(),
),
Expand All @@ -656,9 +678,9 @@ macro_rules! get_decimal_page_stats_iterator {
.indexes
.iter()
.map(|x| {
Some($convert_func(
x.clone().$func.unwrap_or_default().data(),
))
x.clone()
.$func
.and_then(|x| Some($convert_func(x.data())))
})
.collect::<Vec<_>>(),
),
Expand All @@ -667,9 +689,9 @@ macro_rules! get_decimal_page_stats_iterator {
.indexes
.iter()
.map(|x| {
Some($convert_func(
x.clone().$func.unwrap_or_default().data(),
))
x.clone()
.$func
.and_then(|x| Some($convert_func(x.data())))
})
.collect::<Vec<_>>(),
),
Expand Down Expand Up @@ -713,32 +735,6 @@ get_decimal_page_stats_iterator!(
i256,
from_bytes_to_i256
);
make_data_page_stats_iterator!(
MinByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.min.clone() },
Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.max.clone() },
Index::BYTE_ARRAY,
ByteArray
);

make_data_page_stats_iterator!(
MaxFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.max.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);

make_data_page_stats_iterator!(
MinFixedLenByteArrayDataPageStatsIterator,
|x: &PageIndex<FixedLenByteArray>| { x.min.clone() },
Index::FIXED_LEN_BYTE_ARRAY,
FixedLenByteArray
);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
Expand All @@ -757,7 +753,7 @@ macro_rules! get_data_page_statistics {
UInt8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| u8::try_from(x).ok())
})
})
Expand All @@ -768,7 +764,7 @@ macro_rules! get_data_page_statistics {
UInt16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| u16::try_from(x).ok())
})
})
Expand All @@ -779,7 +775,7 @@ macro_rules! get_data_page_statistics {
UInt32Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| Some(x as u32))
})
})
Expand All @@ -789,7 +785,7 @@ macro_rules! get_data_page_statistics {
UInt64Array::from_iter(
[<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| Some(x as u64))
})
})
Expand All @@ -799,7 +795,7 @@ macro_rules! get_data_page_statistics {
Int8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| i8::try_from(x).ok())
})
})
Expand All @@ -810,7 +806,7 @@ macro_rules! get_data_page_statistics {
Int16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| i16::try_from(x).ok())
})
})
Expand All @@ -823,8 +819,8 @@ macro_rules! get_data_page_statistics {
Float16Array::from_iter(
[<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| Some(from_bytes_to_f16(x.data())))
x.into_iter().map(|x| {
x.and_then(|x| from_bytes_to_f16(x.data()))
})
})
.flatten()
Expand All @@ -836,7 +832,7 @@ macro_rules! get_data_page_statistics {
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
Expand All @@ -849,7 +845,7 @@ macro_rules! get_data_page_statistics {
))),
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().filter_map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
Expand Down Expand Up @@ -878,10 +874,10 @@ macro_rules! get_data_page_statistics {
Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.filter_map(|x| {
.map(|x| {
x.and_then(|x| i64::try_from(x).ok())
.map(|x| x * 24 * 60 * 60 * 1000)
})
.map(|x| x * 24 * 60 * 60 * 1000)
}).flatten().collect::<Vec<_>>()
)
)
Expand Down Expand Up @@ -919,16 +915,28 @@ macro_rules! get_data_page_statistics {
})
},
Some(DataType::FixedSizeBinary(size)) => {
Ok(Arc::new(
FixedSizeBinaryArray::try_from_iter(
[<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator)
.flat_map(|x| x.into_iter())
.filter_map(|x| x)
).unwrap_or_else(|e| {
log::debug!("FixedSizeBinary statistics is invalid: {}", e);
FixedSizeBinaryArray::new(*size, vec![].into(), None)
})
))
let mut builder = FixedSizeBinaryBuilder::new(*size);
let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

if x.len() == *size as usize {
let _ = builder.append_value(x.data());
} else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
builder.append_null();
}
}
}
Ok(Arc::new(builder.finish()))
},
_ => unimplemented!()
}
Expand Down
Loading

0 comments on commit 6f330c9

Please sign in to comment.