Skip to content

Commit

Permalink
Add support for Bloom filters on unsigned integer columns in Parquet …
Browse files Browse the repository at this point in the history
…tables (apache#9770)

* Add support for Bloom filters on unsigned integer columns in Parquet tables

* Add Scenario::UInt

* Add tests for Bloom filters on unsigned integer columns in Parquet tables

* Fix _scalar_fun_and_eq to actually call a function

* Add prune_uint32_eq_large_in_list

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
progval and alamb authored Apr 2, 2024
1 parent e8ab555 commit e9c5dc9
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl PruningStatistics for BloomFilterStatistics {
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
ScalarValue::UInt64(Some(v)) => sbbf.check(v),
ScalarValue::UInt32(Some(v)) => sbbf.check(v),
ScalarValue::UInt16(Some(v)) => sbbf.check(v),
ScalarValue::UInt8(Some(v)) => sbbf.check(v),
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
Type::INT32 => {
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum Scenario {
Int,
Int32Range,
UInt,
UInt32Range,
Float64,
Decimal,
DecimalBloomFilterInt32,
Expand Down Expand Up @@ -455,6 +456,13 @@ fn make_int32_range(start: i32, end: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_uint32_range(start: u32, end: u32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("u", DataType::UInt32, true)]));
let v = vec![start, end];
let array = Arc::new(UInt32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

/// Return record batch with f64 vector
///
/// Columns are named
Expand Down Expand Up @@ -659,6 +667,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_uint_batches(250, 255),
]
}
Scenario::UInt32Range => {
vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down
162 changes: 161 additions & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ macro_rules! int_tests {
async fn [<prune_int $bits _scalar_fun_and_eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Int)
.with_query(&format!("SELECT * FROM t where i{} = 1", $bits))
.with_query(&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
Expand Down Expand Up @@ -452,6 +452,144 @@ int_tests!(16, correct_bloom_filters: false);
int_tests!(32, correct_bloom_filters: true);
int_tests!(64, correct_bloom_filters: true);

// $bits: number of bits of the integer to test (8, 16, 32, 64)
// $correct_bloom_filters: if false, replicates the
// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass
// if and only if Bloom filters on UInt8 and UInt16 columns are still buggy.
macro_rules! uint_tests {
($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => {
paste::item! {
#[tokio::test]
async fn [<prune_uint $bits _lt >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} < 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(11)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} = 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}
#[tokio::test]
async fn [<prune_uint $bits _scalar_fun_and_eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where power(u{}, 2) = 36 and u{} = 6", $bits, $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _scalar_fun >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where power(u{}, 2) = 25", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(2)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _complex_expr >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{}+1 = 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(2)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list >]() {
// result of sql "SELECT * FROM t where in (1)"
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} in (6)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list_2 >]() {
// result of sql "SELECT * FROM t where in (1000)", prune all
// test whether statistics works
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} in (100)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(4))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list_negated >]() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} not in (6)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(4))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(4))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(19)
.test_row_group_prune()
.await;
}
}
};
}

uint_tests!(8, correct_bloom_filters: false);
uint_tests!(16, correct_bloom_filters: false);
uint_tests!(32, correct_bloom_filters: true);
uint_tests!(64, correct_bloom_filters: true);

#[tokio::test]
async fn prune_int32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
Expand All @@ -474,6 +612,28 @@ async fn prune_int32_eq_large_in_list() {
.await;
}

#[tokio::test]
async fn prune_uint32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt32Range)
.with_query(
format!(
"SELECT * FROM t where u in ({})",
(200050..200082).join(",")
)
.as_str(),
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_f64_lt() {
RowGroupPruningTest::new()
Expand Down

0 comments on commit e9c5dc9

Please sign in to comment.