Skip to content

Commit

Permalink
Prune pages are all null in ParquetExec by row_counts
Browse files Browse the repository at this point in the history
and fix NOT NULL prune
  • Loading branch information
Ted-Jiang committed Apr 12, 2024
1 parent 58e0b59 commit dee9265
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow_schema::Schema;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use itertools::Itertools;
use log::{debug, trace};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{
Expand Down Expand Up @@ -314,6 +315,7 @@ fn prune_pages_in_one_row_group(
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
num_rows_in_row_group: group.num_rows(),
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -385,6 +387,7 @@ struct PagesPruningStatistics<'a> {
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
num_rows_in_row_group: i64,
}

// Extract the min or max value calling `func` from page idex
Expand Down Expand Up @@ -548,7 +551,20 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
}

fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
None
// see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982
let mut first_row_index = self
.col_offset_indexes
.iter()
.map(|i| i.first_row_index)
.collect_vec();
first_row_index.push(self.num_rows_in_row_group);

let row_count_per_page: Vec<_> = first_row_index
.windows(2)
.map(|window| Some(window[1] - window[0]))
.collect();

Some(Arc::new(Int64Array::from_iter(row_count_per_page)))
}

fn contained(
Expand Down
47 changes: 31 additions & 16 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
/// `x IS NOT NULL` | `!(x_null_count = row_count)`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down Expand Up @@ -1241,9 +1241,11 @@ fn build_single_column_expr(
/// if the column may contain null, and false if definitely does not
/// contain null.
/// If set `with_not` to true: which means is not null
/// because datafusion use false flag of expr result to prune unit (row group, page ..)
/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column not contain any null, and false if definitely contain null.
/// if the column may contain any non-null values, and false if definitely does not contain
/// non-null values null as all null values.
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
Expand All @@ -1254,26 +1256,39 @@ fn build_is_null_column_expr(
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
if with_not {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
} else {
if with_not {
if let Ok(row_count_expr) =
required_columns.row_count_column_expr(col, expr, null_count_field)
{
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count == row_count
// but use false to prune the whole unit so need add the negate
let equal_expr = Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
row_count_expr,
));
Arc::new(phys_expr::NotExpr::new(equal_expr)) as _
})
.ok()
} else {
return None;
}
} else {
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
}
})
.ok()
})
.ok()
}
} else {
None
}
Expand Down
65 changes: 56 additions & 9 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::new_null_array;
use arrow_array::make_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -77,6 +77,7 @@ enum Scenario {
ByteArray,
PeriodsInColumnNames,
WithNullValues,
WithNullValuesPageLevel,
}

enum Unit {
Expand Down Expand Up @@ -632,22 +633,60 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
fn make_all_null_values() -> RecordBatch {
/// Return record batch with i8, i16, i32, and i64 sequences with Null values
/// here 5 rows in page when using Unit::Page
fn make_int_batches_with_null(
null_values: usize,
no_null_values_start: usize,
no_null_values_end: usize,
) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));

let v8: Vec<i8> = (no_null_values_start as _..no_null_values_end as _).collect();
let v16: Vec<i16> = (no_null_values_start as _..no_null_values_end as _).collect();
let v32: Vec<i32> = (no_null_values_start as _..no_null_values_end as _).collect();
let v64: Vec<i64> = (no_null_values_start as _..no_null_values_end as _).collect();

RecordBatch::try_new(
schema,
vec![
new_null_array(&DataType::Int8, 5),
new_null_array(&DataType::Int16, 5),
new_null_array(&DataType::Int32, 5),
new_null_array(&DataType::Int64, 5),
make_array(
Int8Array::from_iter(
v8.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int16Array::from_iter(
v16.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int32Array::from_iter(
v32.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int64Array::from_iter(
v64.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
],
)
.unwrap()
Expand Down Expand Up @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}
Scenario::WithNullValues => {
vec![
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
make_int_batches(1, 6),
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
]
}
Scenario::WithNullValuesPageLevel => {
vec![
make_int_batches_with_null(5, 1, 6),
make_int_batches(1, 11),
make_int_batches_with_null(1, 1, 10),
make_int_batches_with_null(5, 1, 6),
]
}
}
Expand Down
49 changes: 49 additions & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,55 @@ async fn without_pushdown_filter() {
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
}

#[tokio::test]
// Data layout like this:
// row_group1: page1(1~5), page2(All Null)
// row_group2: page1(1~5), page2(6~10)
// row_group3: page1(1~5), page2(6~9 + Null)
// row_group4: page1(1~5), page2(All Null)
// total 40 rows
async fn test_pages_with_null_values() {
test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where i8 <= 6",
Some(0),
// expect prune two pages which 10 rows
Some(10),
22,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i16\" is not null",
Some(0),
// expect prune two pages which 10 rows
Some(10),
29,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i32\" is null",
Some(0),
// expect prune 5 pages which 25 rows
Some(25),
11,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i64\" > 6",
Some(0),
// 6 pages will be pruned which 30 rows
Some(30),
7,
)
.await;
}

fn cast_count_metric(metric: MetricValue) -> Option<usize> {
match metric {
MetricValue::Count { count, .. } => Some(count.value()),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() {
.test_row_group_prune()
.await;

// After pruning, only row group 2should be selected
// After pruning, only row group 2 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
Expand Down

0 comments on commit dee9265

Please sign in to comment.