diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 503839c66ce7..77f0d37dcb89 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -17,7 +17,7 @@ use common_query::logical_plan::DfExpr; use common_query::prelude::Expr; use common_recordbatch::RecordBatches; use datafusion_common::ScalarValue; -use datafusion_expr::lit; +use datafusion_expr::{col, lit}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -46,7 +46,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) { region_id, Rows { schema: column_schemas.clone(), - rows: build_rows(0, 10), + rows: build_rows(0, 15), }, ) .await; @@ -76,6 +76,16 @@ async fn test_read_parquet_stats() { +-------+---------+---------------------+ | tag_0 | field_0 | ts | +-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 10 | 10.0 | 1970-01-01T00:00:10 | +| 11 | 11.0 | 1970-01-01T00:00:11 | +| 12 | 12.0 | 1970-01-01T00:00:12 | +| 13 | 13.0 | 1970-01-01T00:00:13 | +| 14 | 14.0 | 1970-01-01T00:00:14 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | | 5 | 5.0 | 1970-01-01T00:00:05 | | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | @@ -84,7 +94,11 @@ async fn test_read_parquet_stats() { +-------+---------+---------------------+", ) .await; +} +#[tokio::test] +async fn test_prune_tag() { + // prune result: only row group 1&2 check_prune_row_groups( datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), "\ @@ -100,3 +114,25 @@ async fn test_read_parquet_stats() { ) .await; } + +#[tokio::test] +async fn test_prune_tag_and_field() { + common_telemetry::init_default_ut_logging(); + // prune result: only row group 1 + check_prune_row_groups( + col("tag_0") + .gt(lit(ScalarValue::Utf8(Some("4".to_string())))) + .and(col("field_0").lt(lit(8.0))), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | ++-------+---------+---------------------+", + ) + .await; +} diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 6ab6f984ad05..efa6bc9506d6 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -20,8 +20,11 @@ use std::sync::{Arc, RwLock}; use api::v1::OpType; use common_telemetry::debug; +use datafusion::physical_plan::PhysicalExpr; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; use datatypes::arrow; -use datatypes::arrow::array::ArrayRef; +use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; @@ -300,12 +303,16 @@ impl SeriesSet { let (primary_key_builders, primary_key_schema) = primary_key_builders(&self.region_metadata, 1); + let physical_exprs: Vec<_> = predicate + .and_then(|p| p.to_physical_exprs(&primary_key_schema).ok()) + .unwrap_or_default(); + Iter { metadata: self.region_metadata.clone(), series: self.series.clone(), projection, last_key: None, - predicate, + predicate: physical_exprs, pk_schema: primary_key_schema, primary_key_builders, codec: self.codec.clone(), @@ -341,7 +348,7 @@ struct Iter { series: Arc, projection: HashSet, last_key: Option>, - predicate: Option, + predicate: Vec>, pk_schema: arrow::datatypes::SchemaRef, primary_key_builders: Vec>, codec: Arc, @@ -362,18 +369,18 @@ impl Iterator for Iter { // TODO(hl): maybe yield more than one time series to amortize range overhead. for (primary_key, series) in range { let mut series = series.write().unwrap(); - if let Some(predicate) = &self.predicate { - if !prune_primary_key( + if !self.predicate.is_empty() + && !prune_primary_key( &self.codec, primary_key.as_slice(), &mut series, &mut self.primary_key_builders, self.pk_schema.clone(), - predicate, - ) { - // read next series - continue; - } + &self.predicate, + ) + { + // read next series + continue; } self.last_key = Some(primary_key.clone()); @@ -392,7 +399,7 @@ fn prune_primary_key( series: &mut Series, builders: &mut Vec>, pk_schema: arrow::datatypes::SchemaRef, - predicate: &Predicate, + predicate: &[Arc], ) -> bool { // no primary key, we simply return true. if pk_schema.fields().is_empty() { @@ -400,20 +407,52 @@ fn prune_primary_key( } if let Some(rb) = series.pk_cache.as_ref() { - let res = predicate.prune_primary_key(rb).unwrap_or(true); + let res = prune_inner(predicate, rb).unwrap_or(true); debug!("Prune primary key: {:?}, res: {:?}", rb, res); res } else { let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else { return true; }; - let res = predicate.prune_primary_key(&rb).unwrap_or(true); + let res = prune_inner(predicate, &rb).unwrap_or(true); debug!("Prune primary key: {:?}, res: {:?}", rb, res); series.update_pk_cache(rb); res } } +fn prune_inner(predicates: &[Arc], primary_key: &RecordBatch) -> Result { + for expr in predicates { + // evaluate every filter against primary key + let Ok(eva) = expr.evaluate(primary_key) else { + continue; + }; + let result = match eva { + ColumnarValue::Array(array) => { + let predicate_array = array.as_any().downcast_ref::().unwrap(); + predicate_array + .into_iter() + .map(|x| x.unwrap_or(true)) + .next() + .unwrap_or(true) + } + // result was a column + ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true), + _ => { + unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); + } + }; + debug!( + "Evaluate primary key {:?} against filter: {:?}, result: {:?}", + primary_key, expr, result + ); + if !result { + return Ok(false); + } + } + Ok(true) +} + fn pk_to_record_batch( codec: &Arc, bytes: &[u8], diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 403c38b7fa81..6c6415b6f2cd 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -17,13 +17,12 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; -use snafu::ResultExt; use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::error::{BuildPredicateSnafu, Result}; +use crate::error::Result; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; @@ -173,11 +172,7 @@ impl ScanRegion { total_ssts ); - let predicate = Predicate::try_new( - self.request.filters.clone(), - self.version.metadata.schema.clone(), - ) - .context(BuildPredicateSnafu)?; + let predicate = Predicate::new(self.request.filters.clone()); let mapper = match &self.request.projection { Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, None => ProjectionMapper::all(&self.version.metadata)?, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 3eade74a4c62..0e40a909a364 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -188,8 +188,9 @@ impl ParquetReaderBuilder { &read_format, column_ids, ); + let pruned_row_groups = predicate - .prune_with_stats(&stats) + .prune_with_stats(&stats, read_format.metadata().schema.arrow_schema()) .into_iter() .enumerate() .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 8cef41dea647..e8fc4a555c83 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -245,11 +245,7 @@ impl ChunkReaderBuilder { reader_builder = reader_builder.push_batch_iter(iter); } - let predicate = Predicate::try_new( - self.filters.clone(), - self.schema.store_schema().schema().clone(), - ) - .context(error::BuildPredicateSnafu)?; + let predicate = Predicate::new(self.filters.clone()); let read_opts = ReadOptions { batch_size: self.iter_ctx.batch_size, diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index d989f674ded2..fa0cb9c56e0e 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -277,7 +277,10 @@ impl ParquetReader { let pruned_row_groups = self .predicate - .prune_row_groups(builder.metadata().row_groups()) + .prune_row_groups( + builder.metadata().row_groups(), + store_schema.schema().clone(), + ) .into_iter() .enumerate() .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) @@ -549,12 +552,11 @@ mod tests { let operator = create_object_store(dir.path().to_str().unwrap()); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); - let user_schema = projected_schema.projected_user_schema().clone(); let reader = ParquetReader::new( sst_file_handle, operator, projected_schema, - Predicate::empty(user_schema), + Predicate::empty(), TimestampRange::min_to_max(), ); @@ -636,12 +638,11 @@ mod tests { let operator = create_object_store(dir.path().to_str().unwrap()); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); - let user_schema = projected_schema.projected_user_schema().clone(); let reader = ParquetReader::new( file_handle, operator, projected_schema, - Predicate::empty(user_schema), + Predicate::empty(), TimestampRange::min_to_max(), ); @@ -665,14 +666,8 @@ mod tests { range: TimestampRange, expect: Vec, ) { - let store_schema = schema.schema_to_read().clone(); - let reader = ParquetReader::new( - file_handle, - object_store, - schema, - Predicate::empty(store_schema.schema().clone()), - range, - ); + let reader = + ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range); let mut stream = reader.chunk_stream().await.unwrap(); let result = stream.next_batch().await; diff --git a/src/storage/src/sst/pruning.rs b/src/storage/src/sst/pruning.rs index a6c2080be81f..577ecd2e431b 100644 --- a/src/storage/src/sst/pruning.rs +++ b/src/storage/src/sst/pruning.rs @@ -29,9 +29,11 @@ use datatypes::prelude::ConcreteDataType; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::ProjectionMask; use parquet::schema::types::SchemaDescriptor; +use snafu::ResultExt; use table::predicate::Predicate; use crate::error; +use crate::error::BuildPredicateSnafu; use crate::schema::StoreSchema; /// Builds row filters according to predicates. @@ -80,7 +82,11 @@ pub(crate) fn build_row_filter( Box::new(PlainTimestampRowFilter::new(time_range, ts_col_projection)) as _ }; let mut predicates = vec![time_range_row_filter]; - if let Ok(datafusion_filters) = predicate_to_row_filter(predicate, projection_mask) { + if let Ok(datafusion_filters) = predicate_to_row_filter( + predicate, + projection_mask, + store_schema.schema().arrow_schema(), + ) { predicates.extend(datafusion_filters); } let filter = RowFilter::new(predicates); @@ -90,9 +96,13 @@ pub(crate) fn build_row_filter( fn predicate_to_row_filter( predicate: &Predicate, projection_mask: ProjectionMask, + schema: &arrow::datatypes::SchemaRef, ) -> error::Result>> { - let mut datafusion_predicates = Vec::with_capacity(predicate.exprs().len()); - for expr in predicate.exprs() { + let physical_exprs = predicate + .to_physical_exprs(schema) + .context(BuildPredicateSnafu)?; + let mut datafusion_predicates = Vec::with_capacity(physical_exprs.len()); + for expr in &physical_exprs { datafusion_predicates.push(Box::new(DatafusionArrowPredicate { projection_mask: projection_mask.clone(), physical_expr: expr.clone(), diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index cfb3b066a8d6..cfd643a44df0 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -27,6 +27,7 @@ use datafusion_expr::expr::InList; use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; +use datatypes::arrow; use datatypes::arrow::array::BooleanArray; use datatypes::schema::SchemaRef; use datatypes::value::scalar_value_to_timestamp; @@ -39,19 +40,24 @@ mod stats; #[derive(Clone)] pub struct Predicate { - /// The schema of the table that the expressions being applied. - schema: SchemaRef, - /// Physical expressions of this predicate. - exprs: Vec>, + /// logical exprs + exprs: Vec, } impl Predicate { /// Creates a new `Predicate` by converting logical exprs to physical exprs that can be /// evaluated against record batches. /// Returns error when failed to convert exprs. - pub fn try_new(exprs: Vec, schema: SchemaRef) -> error::Result { - let arrow_schema = schema.arrow_schema(); - let df_schema = arrow_schema + pub fn new(exprs: Vec) -> Self { + Self { exprs } + } + + /// Builds physical exprs according to provided schema. + pub fn to_physical_exprs( + &self, + schema: &arrow::datatypes::SchemaRef, + ) -> error::Result>> { + let df_schema = schema .clone() .to_dfschema_ref() .context(error::DatafusionSnafu)?; @@ -61,47 +67,38 @@ impl Predicate { // registering variables. let execution_props = &ExecutionProps::new(); - let physical_exprs = exprs + self.exprs .iter() .map(|expr| { - create_physical_expr( - expr.df_expr(), - df_schema.as_ref(), - arrow_schema.as_ref(), - execution_props, - ) + create_physical_expr(expr.df_expr(), df_schema.as_ref(), schema, execution_props) }) .collect::>() - .context(error::DatafusionSnafu)?; - - Ok(Self { - schema, - exprs: physical_exprs, - }) - } - - #[inline] - pub fn exprs(&self) -> &[Arc] { - &self.exprs + .context(error::DatafusionSnafu) } /// Builds an empty predicate from given schema. - pub fn empty(schema: SchemaRef) -> Self { - Self { - schema, - exprs: vec![], - } + pub fn empty() -> Self { + Self { exprs: vec![] } } /// Evaluates the predicate against row group metadata. /// Returns a vector of boolean values, among which `false` means the row group can be skipped. - pub fn prune_row_groups(&self, row_groups: &[RowGroupMetaData]) -> Vec { + pub fn prune_row_groups( + &self, + row_groups: &[RowGroupMetaData], + schema: SchemaRef, + ) -> Vec { let mut res = vec![true; row_groups.len()]; - let arrow_schema = self.schema.arrow_schema(); - for expr in &self.exprs { + + let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else { + return res; + }; + + let arrow_schema = schema.arrow_schema(); + for expr in &physical_exprs { match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) { Ok(p) => { - let stat = RowGroupPruningStatistics::new(row_groups, &self.schema); + let stat = RowGroupPruningStatistics::new(row_groups, &schema); match p.prune(&stat) { Ok(r) => { for (curr_val, res) in r.into_iter().zip(res.iter_mut()) { @@ -123,7 +120,9 @@ impl Predicate { /// Prunes primary keys pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result { - for expr in &self.exprs { + let pk_schema = primary_key.schema(); + let physical_exprs = self.to_physical_exprs(&pk_schema)?; + for expr in &physical_exprs { // evaluate every filter against primary key let Ok(eva) = expr.evaluate(primary_key) else { continue; @@ -156,11 +155,22 @@ impl Predicate { /// Evaluates the predicate against the `stats`. /// Returns a vector of boolean values, among which `false` means the row group can be skipped. - pub fn prune_with_stats(&self, stats: &S) -> Vec { + pub fn prune_with_stats( + &self, + stats: &S, + schema: &arrow::datatypes::SchemaRef, + ) -> Vec { let mut res = vec![true; stats.num_containers()]; - let arrow_schema = self.schema.arrow_schema(); - for expr in &self.exprs { - match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) { + let physical_exprs = match self.to_physical_exprs(schema) { + Ok(expr) => expr, + Err(e) => { + warn!(e; "Failed to build physical expr from predicates: {:?}", &self.exprs); + return res; + } + }; + + for expr in &physical_exprs { + match PruningPredicate::try_new(expr.clone(), schema.clone()) { Ok(p) => match p.prune(stats) { Ok(r) => { for (curr_val, res) in r.into_iter().zip(res.iter_mut()) { @@ -641,7 +651,7 @@ mod tests { let dir = create_temp_dir("prune_parquet"); let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await; let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap()); - let arrow_predicate = Predicate::try_new(filters, schema.clone()).unwrap(); + let arrow_predicate = Predicate::new(filters); let builder = ParquetRecordBatchStreamBuilder::new( tokio::fs::OpenOptions::new() .read(true) @@ -653,7 +663,7 @@ mod tests { .unwrap(); let metadata = builder.metadata().clone(); let row_groups = metadata.row_groups(); - let res = arrow_predicate.prune_row_groups(row_groups); + let res = arrow_predicate.prune_row_groups(row_groups, schema); assert_eq!(expect, res); } diff --git a/tests/cases/standalone/common/select/prune.result b/tests/cases/standalone/common/select/prune.result new file mode 100644 index 000000000000..c6884bddccd5 --- /dev/null +++ b/tests/cases/standalone/common/select/prune.result @@ -0,0 +1,83 @@ +create table demo(ts timestamp time index, `value` double, host string,idc string, collector string, primary key(host, idc, collector)); + +Affected Rows: 0 + +insert into demo values(1,2,'test1', 'idc1', 'disk') ,(2,3,'test2', 'idc1', 'disk'), (3,4,'test3', 'idc2','memory'); + +Affected Rows: 3 + +select * from demo where host='test1'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where host='test2'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where host='test3'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.003 | 4.0 | test3 | idc2 | memory | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where host='test2' and idc='idc1'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where host='test2' and idc='idc1' and collector='disk'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where host='test2' and idc='idc2'; + +++ +++ + +select * from demo where host='test3' and idc>'idc1'; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.003 | 4.0 | test3 | idc2 | memory | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where idc='idc1' order by ts; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk | +| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +select * from demo where collector='disk' order by ts; + ++-------------------------+-------+-------+------+-----------+ +| ts | value | host | idc | collector | ++-------------------------+-------+-------+------+-----------+ +| 1970-01-01T00:00:00.001 | 2.0 | test1 | idc1 | disk | +| 1970-01-01T00:00:00.002 | 3.0 | test2 | idc1 | disk | ++-------------------------+-------+-------+------+-----------+ + +drop table demo; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/prune.sql b/tests/cases/standalone/common/select/prune.sql new file mode 100644 index 000000000000..fb007105ed58 --- /dev/null +++ b/tests/cases/standalone/common/select/prune.sql @@ -0,0 +1,23 @@ +create table demo(ts timestamp time index, `value` double, host string,idc string, collector string, primary key(host, idc, collector)); + +insert into demo values(1,2,'test1', 'idc1', 'disk') ,(2,3,'test2', 'idc1', 'disk'), (3,4,'test3', 'idc2','memory'); + +select * from demo where host='test1'; + +select * from demo where host='test2'; + +select * from demo where host='test3'; + +select * from demo where host='test2' and idc='idc1'; + +select * from demo where host='test2' and idc='idc1' and collector='disk'; + +select * from demo where host='test2' and idc='idc2'; + +select * from demo where host='test3' and idc>'idc1'; + +select * from demo where idc='idc1' order by ts; + +select * from demo where collector='disk' order by ts; + +drop table demo;