From 911d0a69cc4d02e787121d12c75efe4fe16eab7d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 15 May 2024 16:40:23 +0800 Subject: [PATCH 1/5] feat: convert timestamp range filters to predicates --- src/common/recordbatch/src/filter.rs | 24 +- src/mito2/src/error.rs | 8 + src/mito2/src/read/scan_region.rs | 9 +- src/mito2/src/sst/parquet/reader.rs | 74 +++- src/query/src/tests/time_range_filter_test.rs | 4 +- src/table/src/predicate.rs | 418 +++++++++--------- 6 files changed, 317 insertions(+), 220 deletions(-) diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs index 7a5e361138c5..195abb118135 100644 --- a/src/common/recordbatch/src/filter.rs +++ b/src/common/recordbatch/src/filter.rs @@ -14,7 +14,7 @@ //! Util record batch stream wrapper that can perform precise filter. -use datafusion::logical_expr::{Expr, Operator}; +use datafusion::logical_expr::{Expr, Literal, Operator}; use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar}; use datafusion_common::arrow::buffer::BooleanBuffer; use datafusion_common::arrow::compute::kernels::cmp; @@ -43,6 +43,28 @@ pub struct SimpleFilterEvaluator { } impl SimpleFilterEvaluator { + pub fn new(column_name: String, lit: T, op: Operator) -> Option { + match op { + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => {} + _ => return None, + } + + let Expr::Literal(val) = lit.lit() else { + return None; + }; + + Some(Self { + column_name, + literal: val.to_scalar().ok()?, + op, + }) + } + pub fn try_new(predicate: &Expr) -> Option { match predicate { Expr::BinaryExpr(binary) => { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f81052623c43..99083c5c3b54 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_runtime::JoinError; +use common_time::Timestamp; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; use object_store::ErrorKind; @@ -693,6 +694,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to build time range filters for value: {:?}", timestamp))] + BuildTimeRangeFilter { + timestamp: Timestamp, + location: Location, + }, } pub type Result = std::result::Result; @@ -802,6 +809,7 @@ impl ErrorExt for Error { EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, + BuildTimeRangeFilter { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f619744dc0c9..bcf835b6263a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -22,7 +22,7 @@ use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; use store_api::storage::ScanRequest; -use table::predicate::{Predicate, TimeRangePredicateBuilder}; +use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; @@ -235,7 +235,7 @@ impl ScanRegion { } /// Creates a scan input. - fn scan_input(self, filter_deleted: bool) -> Result { + fn scan_input(mut self, filter_deleted: bool) -> Result { let time_range = self.build_time_range_predicate(); let ssts = &self.version.ssts; @@ -300,7 +300,7 @@ impl ScanRegion { } /// Build time range predicate from filters. - fn build_time_range_predicate(&self) -> TimestampRange { + fn build_time_range_predicate(&mut self) -> TimestampRange { let time_index = self.version.metadata.time_index_column(); let unit = time_index .column_schema @@ -308,8 +308,7 @@ impl ScanRegion { .as_timestamp() .expect("Time index must have timestamp-compatible type") .unit(); - TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) - .build() + build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters) } /// Use the latest schema to build the index applier. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b56d776e6722..82de1780c57a 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -23,6 +23,11 @@ use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use datafusion_common::arrow::array::BooleanArray; +use datafusion_common::arrow::buffer::BooleanBuffer; +use datafusion_common::ScalarValue; +use datafusion_expr::Operator; use datafusion_expr::Expr; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; @@ -38,6 +43,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::cache::CacheManagerRef; +use crate::error; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result, }; @@ -225,7 +231,7 @@ impl ParquetReaderBuilder { metrics.build_cost = start.elapsed(); - let filters = if let Some(predicate) = &self.predicate { + let mut filters = if let Some(predicate) = &self.predicate { predicate .exprs() .iter() @@ -240,6 +246,11 @@ impl ParquetReaderBuilder { } else { vec![] }; + + if let Some(time_range) = &self.time_range { + predicate.extend(time_range_to_predicate(*time_range, ®ion_meta)?); + } + let codec = McmpRowCodec::new( read_format .metadata() @@ -449,6 +460,67 @@ impl ParquetReaderBuilder { } } +/// Transforms time range into [SimpleFilterEvaluator]. +fn time_range_to_predicate( + time_range: TimestampRange, + metadata: &RegionMetadataRef, +) -> Result> { + let ts_col = metadata.time_index_column(); + + let ts_lit = |val: i64| match ts_col + .column_schema + .data_type + .as_timestamp() + .unwrap() + .unit() + { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None), + }; + + let predicates = match (time_range.start(), time_range.end()) { + (Some(start), Some(end)) => { + vec![ + SimpleFilterEvaluator::new( + ts_col.column_schema.name.clone(), + ts_lit(start.value()), + Operator::GtEq, + ) + .context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?, + SimpleFilterEvaluator::new( + ts_col.column_schema.name.clone(), + ts_lit(end.value()), + Operator::Lt, + ) + .context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?, + ] + } + + (Some(start), None) => { + vec![SimpleFilterEvaluator::new( + ts_col.column_schema.name.clone(), + ts_lit(start.value()), + Operator::GtEq, + ) + .context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?] + } + (None, Some(end)) => { + vec![SimpleFilterEvaluator::new( + ts_col.column_schema.name.clone(), + ts_lit(end.value()), + Operator::Lt, + ) + .context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?] + } + (None, None) => { + vec![] + } + }; + Ok(predicates) +} + /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 87b69c6588a9..aae65e371a2e 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -29,7 +29,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; use store_api::data_source::{DataSource, DataSourceRef}; use store_api::storage::ScanRequest; use table::metadata::FilterPushDownType; -use table::predicate::TimeRangePredicateBuilder; +use table::predicate::build_time_range_predicate; use table::test_util::MemTable; use table::{Table, TableRef}; @@ -116,7 +116,7 @@ impl TimeRangeTester { let _ = exec_selection(self.engine.clone(), sql).await; let filters = self.get_filters(); - let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build(); + let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters); assert_eq!(expect, range); } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 079022925ff7..8a3069ce55a4 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -128,250 +128,246 @@ impl Predicate { } } -// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs +// tests for `build_time_range_predicate` locates in src/query/tests/time_range_filter_test.rs // since it requires query engine to convert sql to filters. -/// `TimeRangePredicateBuilder` extracts time range from logical exprs to facilitate fast +/// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast /// time range pruning. -pub struct TimeRangePredicateBuilder<'a> { +pub fn build_time_range_predicate<'a>( ts_col_name: &'a str, ts_col_unit: TimeUnit, filters: &'a [Expr], -} - -impl<'a> TimeRangePredicateBuilder<'a> { - pub fn new(ts_col_name: &'a str, ts_col_unit: TimeUnit, filters: &'a [Expr]) -> Self { - Self { - ts_col_name, - ts_col_unit, - filters, - } - } +) -> TimestampRange { + let mut res = TimestampRange::min_to_max(); - pub fn build(&self) -> TimestampRange { - let mut res = TimestampRange::min_to_max(); - for expr in self.filters { - let range = self - .extract_time_range_from_expr(expr) - .unwrap_or_else(TimestampRange::min_to_max); + for expr in filters { + if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr.df_expr()) + { res = res.and(&range); } - res } + res +} - /// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. - /// Return None if no time range can be found in expr. - fn extract_time_range_from_expr(&self, expr: &Expr) -> Option { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - self.extract_from_binary_expr(left, op, right) - } - Expr::Between(Between { - expr, - negated, - low, - high, - }) => self.extract_from_between_expr(expr, negated, low, high), - Expr::InList(InList { - expr, - list, - negated, - }) => self.extract_from_in_list_expr(expr, *negated, list), - _ => None, +/// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. +/// Return None if no time range can be found in expr. +fn extract_time_range_from_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, +) -> Option { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + extract_from_binary_expr(ts_col_name, ts_col_unit, left, op, right) } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => extract_from_between_expr(ts_col_name, ts_col_unit, expr, negated, low, high), + Expr::InList(InList { + expr, + list, + negated, + }) => extract_from_in_list_expr(ts_col_name, expr, *negated, list), + _ => None, } +} - fn extract_from_binary_expr( - &self, - left: &Expr, - op: &Operator, - right: &Expr, - ) -> Option { - match op { - Operator::Eq => self - .get_timestamp_filter(left, right) - .and_then(|(ts, _)| ts.convert_to(self.ts_col_unit)) - .map(TimestampRange::single), - Operator::Lt => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] < ts_col - let ts_val = ts.convert_to(self.ts_col_unit)?.value(); - Some(TimestampRange::from_start(Timestamp::new( - ts_val + 1, - self.ts_col_unit, - ))) - } else { - // ts_col < [lit] - ts.convert_to_ceil(self.ts_col_unit) - .map(|ts| TimestampRange::until_end(ts, false)) - } - } - Operator::LtEq => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] <= ts_col - ts.convert_to_ceil(self.ts_col_unit) - .map(TimestampRange::from_start) - } else { - // ts_col <= [lit] - ts.convert_to(self.ts_col_unit) - .map(|ts| TimestampRange::until_end(ts, true)) - } - } - Operator::Gt => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] > ts_col - ts.convert_to_ceil(self.ts_col_unit) - .map(|t| TimestampRange::until_end(t, false)) - } else { - // ts_col > [lit] - let ts_val = ts.convert_to(self.ts_col_unit)?.value(); - Some(TimestampRange::from_start(Timestamp::new( - ts_val + 1, - self.ts_col_unit, - ))) - } +fn extract_from_binary_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + left: &Expr, + op: &Operator, + right: &Expr, +) -> Option { + match op { + Operator::Eq => get_timestamp_filter(ts_col_name, left, right) + .and_then(|(ts, _)| ts.convert_to(ts_col_unit)) + .map(TimestampRange::single), + Operator::Lt => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] < ts_col + let ts_val = ts.convert_to(ts_col_unit)?.value(); + Some(TimestampRange::from_start(Timestamp::new( + ts_val + 1, + ts_col_unit, + ))) + } else { + // ts_col < [lit] + ts.convert_to_ceil(ts_col_unit) + .map(|ts| TimestampRange::until_end(ts, false)) } - Operator::GtEq => { - let (ts, reverse) = self.get_timestamp_filter(left, right)?; - if reverse { - // [lit] >= ts_col - ts.convert_to(self.ts_col_unit) - .map(|t| TimestampRange::until_end(t, true)) - } else { - // ts_col >= [lit] - ts.convert_to_ceil(self.ts_col_unit) - .map(TimestampRange::from_start) - } + } + Operator::LtEq => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] <= ts_col + ts.convert_to_ceil(ts_col_unit) + .map(TimestampRange::from_start) + } else { + // ts_col <= [lit] + ts.convert_to(ts_col_unit) + .map(|ts| TimestampRange::until_end(ts, true)) } - Operator::And => { - // instead of return none when failed to extract time range from left/right, we unwrap the none into - // `TimestampRange::min_to_max`. - let left = self - .extract_time_range_from_expr(left) - .unwrap_or_else(TimestampRange::min_to_max); - let right = self - .extract_time_range_from_expr(right) - .unwrap_or_else(TimestampRange::min_to_max); - Some(left.and(&right)) + } + Operator::Gt => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] > ts_col + ts.convert_to_ceil(ts_col_unit) + .map(|t| TimestampRange::until_end(t, false)) + } else { + // ts_col > [lit] + let ts_val = ts.convert_to(ts_col_unit)?.value(); + Some(TimestampRange::from_start(Timestamp::new( + ts_val + 1, + ts_col_unit, + ))) } - Operator::Or => { - let left = self.extract_time_range_from_expr(left)?; - let right = self.extract_time_range_from_expr(right)?; - Some(left.or(&right)) + } + Operator::GtEq => { + let (ts, reverse) = get_timestamp_filter(ts_col_name, left, right)?; + if reverse { + // [lit] >= ts_col + ts.convert_to(ts_col_unit) + .map(|t| TimestampRange::until_end(t, true)) + } else { + // ts_col >= [lit] + ts.convert_to_ceil(ts_col_unit) + .map(TimestampRange::from_start) } - Operator::NotEq - | Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - | Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - | Operator::BitwiseAnd - | Operator::BitwiseOr - | Operator::BitwiseXor - | Operator::BitwiseShiftRight - | Operator::BitwiseShiftLeft - | Operator::StringConcat - | Operator::ArrowAt - | Operator::AtArrow - | Operator::LikeMatch - | Operator::ILikeMatch - | Operator::NotLikeMatch - | Operator::NotILikeMatch => None, } + Operator::And => { + // instead of return none when failed to extract time range from left/right, we unwrap the none into + // `TimestampRange::min_to_max`. + let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left) + .unwrap_or_else(TimestampRange::min_to_max); + let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right) + .unwrap_or_else(TimestampRange::min_to_max); + Some(left.and(&right)) + } + Operator::Or => { + let left = extract_time_range_from_expr(ts_col_name, ts_col_unit, left)?; + let right = extract_time_range_from_expr(ts_col_name, ts_col_unit, right)?; + Some(left.or(&right)) + } + Operator::NotEq + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat + | Operator::ArrowAt + | Operator::AtArrow + | Operator::LikeMatch + | Operator::ILikeMatch + | Operator::NotLikeMatch + | Operator::NotILikeMatch => None, } +} - fn get_timestamp_filter(&self, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> { - let (col, lit, reverse) = match (left, right) { - (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), - (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), - _ => { - return None; - } - }; - if col.name != self.ts_col_name { +fn get_timestamp_filter( + ts_col_name: &str, + left: &Expr, + right: &Expr, +) -> Option<(Timestamp, bool)> { + let (col, lit, reverse) = match (left, right) { + (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), + (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), + _ => { return None; } - - return_none_if_utf8!(lit); - scalar_value_to_timestamp(lit, None).map(|t| (t, reverse)) + }; + if col.name != ts_col_name { + return None; } - fn extract_from_between_expr( - &self, - expr: &Expr, - negated: &bool, - low: &Expr, - high: &Expr, - ) -> Option { - let Expr::Column(col) = expr else { - return None; - }; - if col.name != self.ts_col_name { - return None; - } + return_none_if_utf8!(lit); + scalar_value_to_timestamp(lit, None).map(|t| (t, reverse)) +} - if *negated { - return None; - } +fn extract_from_between_expr( + ts_col_name: &str, + ts_col_unit: TimeUnit, + expr: &Expr, + negated: &bool, + low: &Expr, + high: &Expr, +) -> Option { + let Expr::Column(col) = expr else { + return None; + }; + if col.name != ts_col_name { + return None; + } - match (low, high) { - (Expr::Literal(low), Expr::Literal(high)) => { - return_none_if_utf8!(low); - return_none_if_utf8!(high); + if *negated { + return None; + } - let low_opt = scalar_value_to_timestamp(low, None) - .and_then(|ts| ts.convert_to(self.ts_col_unit)); - let high_opt = scalar_value_to_timestamp(high, None) - .and_then(|ts| ts.convert_to_ceil(self.ts_col_unit)); - Some(TimestampRange::new_inclusive(low_opt, high_opt)) - } - _ => None, + match (low, high) { + (Expr::Literal(low), Expr::Literal(high)) => { + return_none_if_utf8!(low); + return_none_if_utf8!(high); + + let low_opt = + scalar_value_to_timestamp(low, None).and_then(|ts| ts.convert_to(ts_col_unit)); + let high_opt = scalar_value_to_timestamp(high, None) + .and_then(|ts| ts.convert_to_ceil(ts_col_unit)); + Some(TimestampRange::new_inclusive(low_opt, high_opt)) } + _ => None, } +} - /// Extract time range filter from `IN (...)` expr. - fn extract_from_in_list_expr( - &self, - expr: &Expr, - negated: bool, - list: &[Expr], - ) -> Option { - if negated { - return None; - } - let Expr::Column(col) = expr else { - return None; - }; - if col.name != self.ts_col_name { - return None; - } +/// Extract time range filter from `IN (...)` expr. +fn extract_from_in_list_expr( + ts_col_name: &str, + expr: &Expr, + negated: bool, + list: &[Expr], +) -> Option { + if negated { + return None; + } + let Expr::Column(col) = expr else { + return None; + }; + if col.name != ts_col_name { + return None; + } - if list.is_empty() { - return Some(TimestampRange::empty()); - } - let mut init_range = TimestampRange::empty(); - for expr in list { - if let Expr::Literal(scalar) = expr { - return_none_if_utf8!(scalar); - if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { - init_range = init_range.or(&TimestampRange::single(timestamp)) - } else { - // TODO(hl): maybe we should raise an error here since cannot parse - // timestamp value from in list expr - return None; - } + if list.is_empty() { + return Some(TimestampRange::empty()); + } + let mut init_range = TimestampRange::empty(); + for expr in list { + if let Expr::Literal(scalar) = expr { + return_none_if_utf8!(scalar); + if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { + init_range = init_range.or(&TimestampRange::single(timestamp)) + } else { + // TODO(hl): maybe we should raise an error here since cannot parse + // timestamp value from in list expr + return None; } } - Some(init_range) } + Some(init_range) } #[cfg(test)] @@ -395,7 +391,7 @@ mod tests { fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &[expr]).build() + build_time_range_predicate("ts", TimeUnit::Millisecond, &[Expr::from(expr)]) ); } From 6f62c902d4d1c89c474aa74dd004679e8ad3f667 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 15 May 2024 17:16:56 +0800 Subject: [PATCH 2/5] chore: rebase main --- src/mito2/src/error.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 86 +++++++++++++++-------------- 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 99083c5c3b54..cebb9a97a2b3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -698,6 +698,7 @@ pub enum Error { #[snafu(display("Failed to build time range filters for value: {:?}", timestamp))] BuildTimeRangeFilter { timestamp: Timestamp, + #[snafu(implicit)] location: Location, }, } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 82de1780c57a..fb41923677ba 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -24,11 +24,9 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; -use datafusion_common::arrow::array::BooleanArray; -use datafusion_common::arrow::buffer::BooleanBuffer; +use common_time::Timestamp; use datafusion_common::ScalarValue; -use datafusion_expr::Operator; -use datafusion_expr::Expr; +use datafusion_expr::{Expr, Operator}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -248,7 +246,7 @@ impl ParquetReaderBuilder { }; if let Some(time_range) = &self.time_range { - predicate.extend(time_range_to_predicate(*time_range, ®ion_meta)?); + filters.extend(time_range_to_predicate(*time_range, ®ion_meta)?); } let codec = McmpRowCodec::new( @@ -464,55 +462,47 @@ impl ParquetReaderBuilder { fn time_range_to_predicate( time_range: TimestampRange, metadata: &RegionMetadataRef, -) -> Result> { +) -> Result> { let ts_col = metadata.time_index_column(); + let ts_col_id = ts_col.column_id; - let ts_lit = |val: i64| match ts_col - .column_schema - .data_type - .as_timestamp() - .unwrap() - .unit() - { - TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None), - TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None), - TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None), - TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None), + let ts_to_filter = |op: Operator, timestamp: &Timestamp| { + let value = match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(Some(timestamp.value()), None) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None) + } + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), + }; + let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op) + .context(error::BuildTimeRangeFilterSnafu { + timestamp: *timestamp, + })?; + Ok(SimpleFilterContext::new( + evaluator, + ts_col_id, + SemanticType::Timestamp, + ts_col.column_schema.data_type.clone(), + )) }; let predicates = match (time_range.start(), time_range.end()) { (Some(start), Some(end)) => { vec![ - SimpleFilterEvaluator::new( - ts_col.column_schema.name.clone(), - ts_lit(start.value()), - Operator::GtEq, - ) - .context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?, - SimpleFilterEvaluator::new( - ts_col.column_schema.name.clone(), - ts_lit(end.value()), - Operator::Lt, - ) - .context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?, + ts_to_filter(Operator::GtEq, start)?, + ts_to_filter(Operator::Lt, end)?, ] } (Some(start), None) => { - vec![SimpleFilterEvaluator::new( - ts_col.column_schema.name.clone(), - ts_lit(start.value()), - Operator::GtEq, - ) - .context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?] + vec![ts_to_filter(Operator::GtEq, start)?] } + (None, Some(end)) => { - vec![SimpleFilterEvaluator::new( - ts_col.column_schema.name.clone(), - ts_lit(end.value()), - Operator::Lt, - ) - .context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?] + vec![ts_to_filter(Operator::Lt, end)?] } (None, None) => { vec![] @@ -642,6 +632,20 @@ pub(crate) struct SimpleFilterContext { } impl SimpleFilterContext { + fn new( + filter: SimpleFilterEvaluator, + column_id: ColumnId, + semantic_type: SemanticType, + data_type: ConcreteDataType, + ) -> Self { + Self { + filter, + column_id, + semantic_type, + data_type, + } + } + /// Creates a context for the `expr`. /// /// Returns None if the column to filter doesn't exist in the SST metadata or the From 6c836372cc7953fa9be2dc10b886510a895b84d0 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 17 May 2024 16:01:25 +0800 Subject: [PATCH 3/5] fix: remove prediactes once they have been added to timestamp filters to avoid duplicate filtering --- src/mito2/src/engine/prune_test.rs | 19 ++++++++++--------- src/mito2/src/read/scan_region.rs | 9 +++++++-- src/query/src/tests/time_range_filter_test.rs | 4 ++-- src/table/src/predicate.rs | 11 +++++++---- src/table/src/table/adapter.rs | 2 ++ 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index b297af136a95..687172417d14 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -26,7 +26,7 @@ use crate::test_util::{ build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, }; -async fn check_prune_row_groups(expr: Expr, expected: &str) { +async fn check_prune_row_groups(exprs: Vec, expected: &str) { let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; @@ -55,7 +55,7 @@ async fn check_prune_row_groups(expr: Expr, expected: &str) { .scan_to_stream( region_id, ScanRequest { - filters: vec![expr], + filters: exprs, ..Default::default() }, ) @@ -70,7 +70,9 @@ async fn test_read_parquet_stats() { common_telemetry::init_default_ut_logging(); check_prune_row_groups( - datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))), + vec![ + datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))) + ], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -94,7 +96,7 @@ async fn test_read_parquet_stats() { async fn test_prune_tag() { // prune result: only row group 1&2 check_prune_row_groups( - datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), + vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -114,9 +116,10 @@ async fn test_prune_tag_and_field() { common_telemetry::init_default_ut_logging(); // prune result: only row group 1 check_prune_row_groups( - col("tag_0") - .gt(lit(ScalarValue::Utf8(Some("4".to_string())))) - .and(col("field_0").lt(lit(8.0))), + vec![ + col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), + col("field_0").lt(lit(8.0)), + ], "\ +-------+---------+---------------------+ | tag_0 | field_0 | ts | @@ -124,8 +127,6 @@ async fn test_prune_tag_and_field() { | 5 | 5.0 | 1970-01-01T00:00:05 | | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | -| 8 | 8.0 | 1970-01-01T00:00:08 | -| 9 | 9.0 | 1970-01-01T00:00:09 | +-------+---------+---------------------+", ) .await; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index bcf835b6263a..eda3fc39745f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::Instant; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{debug, error, info, warn}; use common_time::range::TimestampRange; use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; use store_api::storage::ScanRequest; @@ -278,6 +278,7 @@ impl ScanRegion { ); let index_applier = self.build_index_applier(); + info!("Request filters: {:?}", self.request.filters); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { @@ -308,7 +309,11 @@ impl ScanRegion { .as_timestamp() .expect("Time index must have timestamp-compatible type") .unit(); - build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters) + build_time_range_predicate( + &time_index.column_schema.name, + unit, + &mut self.request.filters, + ) } /// Use the latest schema to build the index applier. diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index aae65e371a2e..c7784092aca4 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -114,9 +114,9 @@ struct TimeRangeTester { impl TimeRangeTester { async fn check(&self, sql: &str, expect: TimestampRange) { let _ = exec_selection(self.engine.clone(), sql).await; - let filters = self.get_filters(); + let mut filters = self.get_filters(); - let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters); + let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters); assert_eq!(expect, range); } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 8a3069ce55a4..e0d17932b5e7 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -135,16 +135,19 @@ impl Predicate { pub fn build_time_range_predicate<'a>( ts_col_name: &'a str, ts_col_unit: TimeUnit, - filters: &'a [Expr], + filters: &'a mut Vec, ) -> TimestampRange { let mut res = TimestampRange::min_to_max(); - - for expr in filters { + let mut filters_remain = vec![]; + for expr in std::mem::take(filters) { if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr.df_expr()) { res = res.and(&range); + } else { + filters_remain.push(expr); } } + *filters = filters_remain; res } @@ -391,7 +394,7 @@ mod tests { fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - build_time_range_predicate("ts", TimeUnit::Millisecond, &[Expr::from(expr)]) + build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![Expr::from(expr)]) ); } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 565d3d7e621b..e21b4b96bd85 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use common_recordbatch::OrderOption; +use common_telemetry::info; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; @@ -85,6 +86,7 @@ impl TableProvider for DfTableProviderAdapter { limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); + info!("Filters: {:?}", filters); let request = { let mut request = self.scan_req.lock().unwrap(); request.filters = filters; From f7da38820d4a5b30acce5da32ca602999555c0b0 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 21 May 2024 17:33:05 +0800 Subject: [PATCH 4/5] fix: some comments --- src/mito2/src/read/scan_region.rs | 3 +-- src/query/src/tests/time_range_filter_test.rs | 6 +++--- src/table/src/table/adapter.rs | 2 -- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index eda3fc39745f..08bc5bc30d32 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::Instant; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; use store_api::storage::ScanRequest; @@ -278,7 +278,6 @@ impl ScanRegion { ); let index_applier = self.build_index_applier(); - info!("Request filters: {:?}", self.request.filters); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index c7784092aca4..8c52d327c892 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -114,14 +114,14 @@ struct TimeRangeTester { impl TimeRangeTester { async fn check(&self, sql: &str, expect: TimestampRange) { let _ = exec_selection(self.engine.clone(), sql).await; - let mut filters = self.get_filters(); + let mut filters = self.take_filters(); let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters); assert_eq!(expect, range); } - fn get_filters(&self) -> Vec { - self.filter.write().unwrap().drain(..).collect() + fn take_filters(&self) -> Vec { + std::mem::take(&mut self.filter.write().unwrap()) } } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index e21b4b96bd85..565d3d7e621b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -16,7 +16,6 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use common_recordbatch::OrderOption; -use common_telemetry::info; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; @@ -86,7 +85,6 @@ impl TableProvider for DfTableProviderAdapter { limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); - info!("Filters: {:?}", filters); let request = { let mut request = self.scan_req.lock().unwrap(); request.filters = filters; From 6defa142ab544208872f278dd883d0ea8aed16fd Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 21 May 2024 20:28:48 +0800 Subject: [PATCH 5/5] fix: resolve conflicts --- src/table/src/predicate.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index e0d17932b5e7..a9365004cbc8 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -140,8 +140,7 @@ pub fn build_time_range_predicate<'a>( let mut res = TimestampRange::min_to_max(); let mut filters_remain = vec![]; for expr in std::mem::take(filters) { - if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr.df_expr()) - { + if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) { res = res.and(&range); } else { filters_remain.push(expr); @@ -283,11 +282,7 @@ fn extract_from_binary_expr( } } -fn get_timestamp_filter( - ts_col_name: &str, - left: &Expr, - right: &Expr, -) -> Option<(Timestamp, bool)> { +fn get_timestamp_filter(ts_col_name: &str, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> { let (col, lit, reverse) = match (left, right) { (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), @@ -394,7 +389,7 @@ mod tests { fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![Expr::from(expr)]) + build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr]) ); }