Skip to content

Commit

Permalink
Merge commit 'b9759b9810a05b7993c0357a5346197395cfd4cc' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-2
  • Loading branch information
appletreeisyellow committed Apr 24, 2024
2 parents d7aaeeb + b9759b9 commit 41c7f3d
Show file tree
Hide file tree
Showing 14 changed files with 762 additions and 546 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
scalar.to_array().ok()
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
let (c, _) = self.column(&column.name)?;
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
scalar.to_array().ok()
}

fn contained(
Expand Down Expand Up @@ -1022,15 +1024,17 @@ mod tests {
column_statistics: Vec<ParquetStatistics>,
) -> RowGroupMetaData {
let mut columns = vec![];
let number_row = 1000;
for (i, s) in column_statistics.iter().enumerate() {
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
.set_statistics(s.clone())
.set_num_values(number_row)
.build()
.unwrap();
columns.push(column);
}
RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_num_rows(number_row)
.set_total_byte_size(2000)
.set_column_metadata(columns)
.build()
Expand Down
38 changes: 31 additions & 7 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down Expand Up @@ -1239,10 +1240,15 @@ fn build_single_column_expr(
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
/// If set `with_not` to true: which means is not null
/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column not contain any null, and false if definitely contain null.
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
with_not: bool,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;
Expand All @@ -1251,12 +1257,21 @@ fn build_is_null_column_expr(
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
if with_not {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
} else {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
}
})
.ok()
} else {
Expand Down Expand Up @@ -1287,9 +1302,18 @@ fn build_predicate_expression(
// predicate expression can only be a binary expression
let expr_any = expr.as_any();
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
.unwrap_or(unhandled);
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
true,
)
.unwrap_or(unhandled);
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down
30 changes: 30 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::new_null_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -75,6 +76,7 @@ enum Scenario {
DecimalLargePrecisionBloomFilter,
ByteArray,
PeriodsInColumnNames,
WithNullValues,
}

enum Unit {
Expand Down Expand Up @@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
fn make_all_null_values() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));

RecordBatch::try_new(
schema,
vec![
new_null_array(&DataType::Int8, 5),
new_null_array(&DataType::Int16, 5),
new_null_array(&DataType::Int32, 5),
new_null_array(&DataType::Int64, 5),
],
)
.unwrap()
}

fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Timestamps => {
Expand Down Expand Up @@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
),
]
}
Scenario::WithNullValues => {
vec![
make_all_null_values(),
make_int_batches(1, 6),
make_all_null_values(),
]
}
}
}

Expand Down
60 changes: 60 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1262,3 +1262,63 @@ async fn prune_periods_in_column_names() {
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn test_row_group_with_null_values() {
// Three row groups:
// 1. all Null values
// 2. values from 1 to 5
// 3. all Null values

// After pruning, only row group 2 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i8\" <= 5")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_expected_rows(5)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// After pruning, only row group 1,3 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i8\" is Null")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_expected_rows(10)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// After pruning, only row group 2should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_expected_rows(5)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;

// All row groups will be pruned
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i32\" > 7")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(3))
.with_expected_rows(0)
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.test_row_group_prune()
.await;
}
Loading

0 comments on commit 41c7f3d

Please sign in to comment.