diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 2e1c04d5dfae..a8377d382ed0 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -15,27 +15,23 @@ use std::sync::Arc; use common_query::logical_plan::{DfExpr, Expr}; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{error, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::parquet::file::metadata::RowGroupMetaData; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use datafusion_common::{ScalarValue, ToDFSchema}; +use datafusion_common::ToDFSchema; use datafusion_expr::expr::InList; -use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator}; +use datafusion_expr::{Between, BinaryExpr, Operator}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datatypes::arrow; -use datatypes::arrow::array::BooleanArray; -use datatypes::schema::SchemaRef; use datatypes::value::scalar_value_to_timestamp; use snafu::ResultExt; use crate::error; -use crate::predicate::stats::RowGroupPruningStatistics; +#[cfg(test)] mod stats; #[derive(Debug, Clone)] @@ -77,83 +73,6 @@ impl Predicate { .collect::>()) } - /// Builds an empty predicate from given schema. - pub fn empty() -> Self { - Self { exprs: vec![] } - } - - /// Evaluates the predicate against row group metadata. - /// Returns a vector of boolean values, among which `false` means the row group can be skipped. - pub fn prune_row_groups( - &self, - row_groups: &[RowGroupMetaData], - schema: SchemaRef, - ) -> Vec { - let mut res = vec![true; row_groups.len()]; - - let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else { - return res; - }; - - let arrow_schema = schema.arrow_schema(); - for expr in &physical_exprs { - match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) { - Ok(p) => { - let stat = RowGroupPruningStatistics::new(row_groups, &schema); - match p.prune(&stat) { - Ok(r) => { - for (curr_val, res) in r.into_iter().zip(res.iter_mut()) { - *res &= curr_val - } - } - Err(e) => { - warn!("Failed to prune row groups, error: {:?}", e); - } - } - } - Err(e) => { - error!("Failed to create predicate for expr, error: {:?}", e); - } - } - } - res - } - - /// Prunes primary keys - pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result { - let pk_schema = primary_key.schema(); - let physical_exprs = self.to_physical_exprs(&pk_schema)?; - for expr in &physical_exprs { - // evaluate every filter against primary key - let Ok(eva) = expr.evaluate(primary_key) else { - continue; - }; - let result = match eva { - ColumnarValue::Array(array) => { - let predicate_array = array.as_any().downcast_ref::().unwrap(); - predicate_array - .into_iter() - .map(|x| x.unwrap_or(true)) - .next() - .unwrap_or(true) - } - // result was a column - ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true), - _ => { - unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); - } - }; - debug!( - "Evaluate primary key {:?} against filter: {:?}, result: {:?}", - primary_key, expr, result - ); - if !result { - return Ok(false); - } - } - Ok(true) - } - /// Evaluates the predicate against the `stats`. /// Returns a vector of boolean values, among which `false` means the row group can be skipped. pub fn prune_with_stats( @@ -443,6 +362,7 @@ mod tests { use parquet::file::properties::WriterProperties; use super::*; + use crate::predicate::stats::RowGroupPruningStatistics; fn check_build_predicate(expr: DfExpr, expect: TimestampRange) { assert_eq!( @@ -568,6 +488,7 @@ mod tests { TimestampRange::until_end(Timestamp::new_millisecond(1000), false), ); } + #[test] fn test_lt_eq() { // ts <= 1ms @@ -651,8 +572,8 @@ mod tests { expect: Vec, ) { let dir = create_temp_dir("prune_parquet"); - let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await; - let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap()); + let (path, arrow_schema) = gen_test_parquet_file(&dir, array_cnt).await; + let schema = Arc::new(datatypes::schema::Schema::try_from(arrow_schema.clone()).unwrap()); let arrow_predicate = Predicate::new(filters); let builder = ParquetRecordBatchStreamBuilder::new( tokio::fs::OpenOptions::new() @@ -665,7 +586,9 @@ mod tests { .unwrap(); let metadata = builder.metadata().clone(); let row_groups = metadata.row_groups(); - let res = arrow_predicate.prune_row_groups(row_groups, schema); + + let stats = RowGroupPruningStatistics::new(row_groups, &schema); + let res = arrow_predicate.prune_with_stats(&stats, &arrow_schema); assert_eq!(expect, res); }