diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index c7706f3458d0..402cc106492e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -314,6 +314,7 @@ fn prune_pages_in_one_row_group( col_page_indexes, col_offset_indexes, target_type: &target_type, + num_rows_in_row_group: group.num_rows(), }; match predicate.prune(&pruning_stats) { @@ -385,6 +386,7 @@ struct PagesPruningStatistics<'a> { // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` target_type: &'a Option, + num_rows_in_row_group: i64, } // Extract the min or max value calling `func` from page idex @@ -548,7 +550,19 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { } fn row_counts(&self, _column: &datafusion_common::Column) -> Option { - None + // see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982 + + let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| { + Some(location[1].first_row_index - location[0].first_row_index) + }); + + // append the last page row count + let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( + self.num_rows_in_row_group + - self.col_offset_indexes.last().unwrap().first_row_index, + ))); + + Some(Arc::new(Int64Array::from_iter(row_count_per_page))) } fn contained( diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index ebb811408fb3..d8a3814d77e1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -335,7 +335,7 @@ pub trait PruningStatistics { /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` /// `x IS NULL` | `x_null_count > 0` -/// `x IS NOT NULL` | `x_null_count = 0` +/// `x IS NOT NULL` | `x_null_count != row_count` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation @@ -1240,10 +1240,10 @@ fn build_single_column_expr( /// returns a pruning expression in terms of IsNull that will evaluate to true /// if the column may contain null, and false if definitely does not /// contain null. -/// If set `with_not` to true: which means is not null -/// Given an expression reference to `expr`, if `expr` is a column expression, -/// returns a pruning expression in terms of IsNotNull that will evaluate to true -/// if the column not contain any null, and false if definitely contain null. +/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count` +/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS +/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and +/// thus can prune the row group / value fn build_is_null_column_expr( expr: &Arc, schema: &Schema, @@ -1254,26 +1254,37 @@ fn build_is_null_column_expr( let field = schema.field_with_name(col.name()).ok()?; let null_count_field = &Field::new(field.name(), DataType::UInt64, true); - required_columns - .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { - if with_not { - // IsNotNull(column) => null_count = 0 - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::Eq, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), - )) as _ - } else { + if with_not { + if let Ok(row_count_expr) = + required_columns.row_count_column_expr(col, expr, null_count_field) + { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNotNull(column) => null_count != row_count + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::NotEq, + row_count_expr, + )) as _ + }) + .ok() + } else { + None + } + } else { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { // IsNull(column) => null_count > 0 Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, Operator::Gt, Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), )) as _ - } - }) - .ok() + }) + .ok() + } } else { None } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f36afe1976b1..f90d0e8afb4c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::new_null_array; +use arrow_array::make_array; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -77,6 +77,7 @@ enum Scenario { ByteArray, PeriodsInColumnNames, WithNullValues, + WithNullValuesPageLevel, } enum Unit { @@ -632,8 +633,13 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch { RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap() } -/// Return record batch with i8, i16, i32, and i64 sequences with all Null values -fn make_all_null_values() -> RecordBatch { +/// Return record batch with i8, i16, i32, and i64 sequences with Null values +/// here 5 rows in page when using Unit::Page +fn make_int_batches_with_null( + null_values: usize, + no_null_values_start: usize, + no_null_values_end: usize, +) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("i8", DataType::Int8, true), Field::new("i16", DataType::Int16, true), @@ -641,13 +647,46 @@ fn make_all_null_values() -> RecordBatch { Field::new("i64", DataType::Int64, true), ])); + let v8: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v16: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v32: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + RecordBatch::try_new( schema, vec![ - new_null_array(&DataType::Int8, 5), - new_null_array(&DataType::Int16, 5), - new_null_array(&DataType::Int32, 5), - new_null_array(&DataType::Int64, 5), + make_array( + Int8Array::from_iter( + v8.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int16Array::from_iter( + v16.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int32Array::from_iter( + v32.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), ], ) .unwrap() @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec { } Scenario::WithNullValues => { vec![ - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), make_int_batches(1, 6), - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), + ] + } + Scenario::WithNullValuesPageLevel => { + vec![ + make_int_batches_with_null(5, 1, 6), + make_int_batches(1, 11), + make_int_batches_with_null(1, 1, 10), + make_int_batches_with_null(5, 1, 6), ] } } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index da9617f13ee9..1615a1c5766a 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -871,6 +871,57 @@ async fn without_pushdown_filter() { assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); } +#[tokio::test] +// Data layout like this: +// row_group1: page1(1~5), page2(All Null) +// row_group2: page1(1~5), page2(6~10) +// row_group3: page1(1~5), page2(6~9 + Null) +// row_group4: page1(1~5), page2(All Null) +// total 40 rows +async fn test_pages_with_null_values() { + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where i8 <= 6", + Some(0), + // expect prune pages with all null or pages that have only values greater than 6 + // (row_group1, page2), (row_group4, page2) + Some(10), + 22, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i16\" is not null", + Some(0), + // expect prune (row_group1, page2) and (row_group4, page2) = 10 rows + Some(10), + 29, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i32\" is null", + Some(0), + // expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows + Some(25), + 11, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i64\" > 6", + Some(0), + // expect to prune pages where i is all null, or where always <= 5 + // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows + Some(30), + 7, + ) + .await; +} + fn cast_count_metric(metric: MetricValue) -> Option { match metric { MetricValue::Count { count, .. } => Some(count.value()), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 29bf1ef0a8d4..b3f1fec1753b 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() { .test_row_group_prune() .await; - // After pruning, only row group 2should be selected + // After pruning, only row group 2 should be selected RowGroupPruningTest::new() .with_scenario(Scenario::WithNullValues) .with_query("SELECT * FROM t WHERE \"i16\" is Not Null")