From c6b7caa2ec246a59260e900710fde289e86f1021 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 18 Dec 2024 14:39:49 +0800 Subject: [PATCH] feat: do not remove time filters in ScanRegion (#5180) * feat: do not remove time filters * chore: remove `time_range` from parquet reader * chore: print more message in the check script * chore: fix unused error --- scripts/check-snafu.py | 6 +- src/mito2/src/error.rs | 8 -- src/mito2/src/read/scan_region.rs | 11 +-- src/mito2/src/sst/parquet/reader.rs | 90 +------------------ src/query/src/tests/time_range_filter_test.rs | 4 +- src/table/src/predicate.rs | 16 ++-- 6 files changed, 17 insertions(+), 118 deletions(-) diff --git a/scripts/check-snafu.py b/scripts/check-snafu.py index d44edfeb8c45..b91950692bd8 100644 --- a/scripts/check-snafu.py +++ b/scripts/check-snafu.py @@ -58,8 +58,10 @@ def main(): if not check_snafu_in_files(branch_name, other_rust_files) ] - for name in unused_snafu: - print(name) + if unused_snafu: + print("Unused error variants:") + for name in unused_snafu: + print(name) if unused_snafu: raise SystemExit(1) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f6d1dbafeec9..82b86a21554c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -756,13 +756,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to build time range filters for value: {:?}", timestamp))] - BuildTimeRangeFilter { - timestamp: Timestamp, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to open region"))] OpenRegion { #[snafu(implicit)] @@ -1023,7 +1016,6 @@ impl ErrorExt for Error { ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, - BuildTimeRangeFilter { .. } => StatusCode::Unexpected, UnsupportedOperation { .. } => StatusCode::Unsupported, RemoteCompaction { .. } => StatusCode::Unexpected, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 946ef2884132..091b9bc48c14 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -355,8 +355,8 @@ impl ScanRegion { Ok(input) } - /// Build time range predicate from filters, also remove time filters from request. - fn build_time_range_predicate(&mut self) -> TimestampRange { + /// Build time range predicate from filters. + fn build_time_range_predicate(&self) -> TimestampRange { let time_index = self.version.metadata.time_index_column(); let unit = time_index .column_schema @@ -364,11 +364,7 @@ impl ScanRegion { .as_timestamp() .expect("Time index must have timestamp-compatible type") .unit(); - build_time_range_predicate( - &time_index.column_schema.name, - unit, - &mut self.request.filters, - ) + build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters) } /// Remove field filters if the merge mode is [MergeMode::LastNonNull]. @@ -695,7 +691,6 @@ impl ScanInput { .access_layer .read_sst(file.clone()) .predicate(self.predicate.clone()) - .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .inverted_index_applier(self.inverted_index_applier.clone()) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 335b09426eca..39153fce8d96 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -23,11 +23,7 @@ use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; -use common_time::range::TimestampRange; -use common_time::timestamp::TimeUnit; -use common_time::Timestamp; -use datafusion_common::ScalarValue; -use datafusion_expr::{Expr, Operator}; +use datafusion_expr::Expr; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -42,7 +38,6 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::cache::CacheManagerRef; -use crate::error; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, }; @@ -74,8 +69,6 @@ pub struct ParquetReaderBuilder { object_store: ObjectStore, /// Predicate to push down. predicate: Option, - /// Time range to filter. - time_range: Option, /// Metadata of columns to read. /// /// `None` reads all columns. Due to schema change, the projection @@ -104,7 +97,6 @@ impl ParquetReaderBuilder { file_handle, object_store, predicate: None, - time_range: None, projection: None, cache_manager: None, inverted_index_applier: None, @@ -120,13 +112,6 @@ impl ParquetReaderBuilder { self } - /// Attaches the time range to the builder. - #[must_use] - pub fn time_range(mut self, time_range: Option) -> ParquetReaderBuilder { - self.time_range = time_range; - self - } - /// Attaches the projection to the builder. /// /// The reader only applies the projection to fields. @@ -238,7 +223,7 @@ impl ParquetReaderBuilder { cache_manager: self.cache_manager.clone(), }; - let mut filters = if let Some(predicate) = &self.predicate { + let filters = if let Some(predicate) = &self.predicate { predicate .exprs() .iter() @@ -254,10 +239,6 @@ impl ParquetReaderBuilder { vec![] }; - if let Some(time_range) = &self.time_range { - filters.extend(time_range_to_predicate(*time_range, ®ion_meta)?); - } - let codec = McmpRowCodec::new( read_format .metadata() @@ -678,59 +659,6 @@ impl ParquetReaderBuilder { } } -/// Transforms time range into [SimpleFilterEvaluator]. -fn time_range_to_predicate( - time_range: TimestampRange, - metadata: &RegionMetadataRef, -) -> Result> { - let ts_col = metadata.time_index_column(); - let ts_col_id = ts_col.column_id; - - let ts_to_filter = |op: Operator, timestamp: &Timestamp| { - let value = match timestamp.unit() { - TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), - TimeUnit::Millisecond => { - ScalarValue::TimestampMillisecond(Some(timestamp.value()), None) - } - TimeUnit::Microsecond => { - ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None) - } - TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), - }; - let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op) - .context(error::BuildTimeRangeFilterSnafu { - timestamp: *timestamp, - })?; - Ok(SimpleFilterContext::new( - evaluator, - ts_col_id, - SemanticType::Timestamp, - ts_col.column_schema.data_type.clone(), - )) - }; - - let predicates = match (time_range.start(), time_range.end()) { - (Some(start), Some(end)) => { - vec![ - ts_to_filter(Operator::GtEq, start)?, - ts_to_filter(Operator::Lt, end)?, - ] - } - - (Some(start), None) => { - vec![ts_to_filter(Operator::GtEq, start)?] - } - - (None, Some(end)) => { - vec![ts_to_filter(Operator::Lt, end)?] - } - (None, None) => { - vec![] - } - }; - Ok(predicates) -} - /// Metrics of filtering rows groups and rows. #[derive(Debug, Default, Clone, Copy)] pub(crate) struct ReaderFilterMetrics { @@ -939,20 +867,6 @@ pub(crate) struct SimpleFilterContext { } impl SimpleFilterContext { - fn new( - filter: SimpleFilterEvaluator, - column_id: ColumnId, - semantic_type: SemanticType, - data_type: ConcreteDataType, - ) -> Self { - Self { - filter, - column_id, - semantic_type, - data_type, - } - } - /// Creates a context for the `expr`. /// /// Returns None if the column to filter doesn't exist in the SST metadata or the diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index edb404220943..e141c99fa562 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -115,9 +115,9 @@ struct TimeRangeTester { impl TimeRangeTester { async fn check(&self, sql: &str, expect: TimestampRange) { let _ = exec_selection(self.engine.clone(), sql).await; - let mut filters = self.take_filters(); + let filters = self.take_filters(); - let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters); + let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters); assert_eq!(expect, range); } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 267f60b10834..1fd5cdcbd362 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -135,21 +135,17 @@ impl Predicate { // since it requires query engine to convert sql to filters. /// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast /// time range pruning. -pub fn build_time_range_predicate<'a>( - ts_col_name: &'a str, +pub fn build_time_range_predicate( + ts_col_name: &str, ts_col_unit: TimeUnit, - filters: &'a mut Vec, + filters: &[Expr], ) -> TimestampRange { let mut res = TimestampRange::min_to_max(); - let mut filters_remain = vec![]; - for expr in std::mem::take(filters) { - if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) { + for expr in filters { + if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr) { res = res.and(&range); - } else { - filters_remain.push(expr); } } - *filters = filters_remain; res } @@ -392,7 +388,7 @@ mod tests { fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr]) + build_time_range_predicate("ts", TimeUnit::Millisecond, &[expr]) ); }