From b2cd442f812edd08d653e8ee046e7ffddf84542c Mon Sep 17 00:00:00 2001 From: maco Date: Tue, 21 May 2024 10:41:23 +0800 Subject: [PATCH] refactor: replace Expr with datafusion::Expr --- Cargo.lock | 2 + .../src/information_schema/predicate.rs | 76 +++++++++--------- src/common/query/src/logical_plan.rs | 1 - src/common/query/src/logical_plan/expr.rs | 51 +++++------- src/common/query/src/prelude.rs | 2 +- src/file-engine/src/query.rs | 4 +- src/file-engine/src/query/file_stream.rs | 7 +- src/metric-engine/Cargo.toml | 1 + src/metric-engine/src/engine/read.rs | 4 +- src/metric-engine/src/metadata_region.rs | 4 +- src/mito2/benches/memtable_bench.rs | 2 +- src/mito2/src/engine/prune_test.rs | 29 +++---- src/mito2/src/memtable/partition_tree.rs | 2 +- src/mito2/src/memtable/partition_tree/tree.rs | 2 +- src/mito2/src/memtable/time_series.rs | 2 +- src/mito2/src/sst/index/applier/builder.rs | 51 ++++++------ src/mito2/src/sst/index/creator.rs | 2 +- src/mito2/src/sst/parquet.rs | 6 +- src/mito2/src/sst/parquet/reader.rs | 2 +- src/operator/src/statement/copy_table_to.rs | 1 - src/operator/src/tests/partition_manager.rs | 36 ++++----- src/partition/src/error.rs | 2 +- src/partition/src/manager.rs | 18 ++--- src/query/src/dummy_catalog.rs | 5 +- src/query/src/tests/time_range_filter_test.rs | 2 +- src/store-api/Cargo.toml | 1 + src/store-api/src/storage/requests.rs | 2 +- src/table/src/predicate.rs | 80 ++++++++----------- src/table/src/table.rs | 2 +- src/table/src/table/adapter.rs | 12 +-- 30 files changed, 184 insertions(+), 227 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84941ae3a226..6f73625f2275 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5701,6 +5701,7 @@ dependencies = [ "common-test-util", "common-time", "datafusion 37.0.0", + "datafusion-expr 37.0.0", "datatypes", "itertools 0.10.5", "lazy_static", @@ -10169,6 +10170,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-wal", + "datafusion-expr 37.0.0", "datafusion-physical-plan 37.0.0", "datatypes", "derive_builder 0.12.0", diff --git a/src/catalog/src/information_schema/predicate.rs b/src/catalog/src/information_schema/predicate.rs index 243ff7053321..f6b377dcc03e 100644 --- a/src/catalog/src/information_schema/predicate.rs +++ b/src/catalog/src/information_schema/predicate.rs @@ -14,10 +14,10 @@ use arrow::array::StringArray; use arrow::compute::kernels::comparison; -use common_query::logical_plan::DfExpr; use datafusion::common::ScalarValue; use datafusion::logical_expr::expr::Like; use datafusion::logical_expr::Operator; +use datafusion::prelude::Expr; use datatypes::value::Value; use store_api::storage::ScanRequest; @@ -118,12 +118,12 @@ impl Predicate { } /// Try to create a predicate from datafusion [`Expr`], return None if fails. - fn from_expr(expr: DfExpr) -> Option { + fn from_expr(expr: Expr) -> Option { match expr { // NOT expr - DfExpr::Not(expr) => Some(Predicate::Not(Box::new(Self::from_expr(*expr)?))), + Expr::Not(expr) => Some(Predicate::Not(Box::new(Self::from_expr(*expr)?))), // expr LIKE pattern - DfExpr::Like(Like { + Expr::Like(Like { negated, expr, pattern, @@ -131,10 +131,10 @@ impl Predicate { .. }) if is_column(&expr) && is_string_literal(&pattern) => { // Safety: ensured by gurad - let DfExpr::Column(c) = *expr else { + let Expr::Column(c) = *expr else { unreachable!(); }; - let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else { + let Expr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else { unreachable!(); }; @@ -147,10 +147,10 @@ impl Predicate { } } // left OP right - DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) { + Expr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) { // left == right - (DfExpr::Literal(scalar), Operator::Eq, DfExpr::Column(c)) - | (DfExpr::Column(c), Operator::Eq, DfExpr::Literal(scalar)) => { + (Expr::Literal(scalar), Operator::Eq, Expr::Column(c)) + | (Expr::Column(c), Operator::Eq, Expr::Literal(scalar)) => { let Ok(v) = Value::try_from(scalar) else { return None; }; @@ -158,8 +158,8 @@ impl Predicate { Some(Predicate::Eq(c.name, v)) } // left != right - (DfExpr::Literal(scalar), Operator::NotEq, DfExpr::Column(c)) - | (DfExpr::Column(c), Operator::NotEq, DfExpr::Literal(scalar)) => { + (Expr::Literal(scalar), Operator::NotEq, Expr::Column(c)) + | (Expr::Column(c), Operator::NotEq, Expr::Literal(scalar)) => { let Ok(v) = Value::try_from(scalar) else { return None; }; @@ -183,14 +183,14 @@ impl Predicate { _ => None, }, // [NOT] IN (LIST) - DfExpr::InList(list) => { + Expr::InList(list) => { match (*list.expr, list.list, list.negated) { // column [NOT] IN (v1, v2, v3, ...) - (DfExpr::Column(c), list, negated) if is_all_scalars(&list) => { + (Expr::Column(c), list, negated) if is_all_scalars(&list) => { let mut values = Vec::with_capacity(list.len()); for scalar in list { // Safety: checked by `is_all_scalars` - let DfExpr::Literal(scalar) = scalar else { + let Expr::Literal(scalar) = scalar else { unreachable!(); }; @@ -237,12 +237,12 @@ fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option { Some(booleans.value(0)) } -fn is_string_literal(expr: &DfExpr) -> bool { - matches!(expr, DfExpr::Literal(ScalarValue::Utf8(Some(_)))) +fn is_string_literal(expr: &Expr) -> bool { + matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(_)))) } -fn is_column(expr: &DfExpr) -> bool { - matches!(expr, DfExpr::Column(_)) +fn is_column(expr: &Expr) -> bool { + matches!(expr, Expr::Column(_)) } /// A list of predicate @@ -257,7 +257,7 @@ impl Predicates { let mut predicates = Vec::with_capacity(request.filters.len()); for filter in &request.filters { - if let Some(predicate) = Predicate::from_expr(filter.df_expr().clone()) { + if let Some(predicate) = Predicate::from_expr(filter.clone()) { predicates.push(predicate); } } @@ -286,8 +286,8 @@ impl Predicates { } /// Returns true when the values are all [`DfExpr::Literal`]. -fn is_all_scalars(list: &[DfExpr]) -> bool { - list.iter().all(|v| matches!(v, DfExpr::Literal(_))) +fn is_all_scalars(list: &[Expr]) -> bool { + list.iter().all(|v| matches!(v, Expr::Literal(_))) } #[cfg(test)] @@ -376,7 +376,7 @@ mod tests { #[test] fn test_predicate_like() { // case insensitive - let expr = DfExpr::Like(Like { + let expr = Expr::Like(Like { negated: false, expr: Box::new(column("a")), pattern: Box::new(string_literal("%abc")), @@ -403,7 +403,7 @@ mod tests { assert!(p.eval(&[]).is_none()); // case sensitive - let expr = DfExpr::Like(Like { + let expr = Expr::Like(Like { negated: false, expr: Box::new(column("a")), pattern: Box::new(string_literal("%abc")), @@ -423,7 +423,7 @@ mod tests { assert!(p.eval(&[]).is_none()); // not like - let expr = DfExpr::Like(Like { + let expr = Expr::Like(Like { negated: true, expr: Box::new(column("a")), pattern: Box::new(string_literal("%abc")), @@ -437,15 +437,15 @@ mod tests { assert!(p.eval(&[]).is_none()); } - fn column(name: &str) -> DfExpr { - DfExpr::Column(Column { + fn column(name: &str) -> Expr { + Expr::Column(Column { relation: None, name: name.to_string(), }) } - fn string_literal(v: &str) -> DfExpr { - DfExpr::Literal(ScalarValue::Utf8(Some(v.to_string()))) + fn string_literal(v: &str) -> Expr { + Expr::Literal(ScalarValue::Utf8(Some(v.to_string()))) } fn match_string_value(v: &Value, expected: &str) -> bool { @@ -463,14 +463,14 @@ mod tests { result } - fn mock_exprs() -> (DfExpr, DfExpr) { - let expr1 = DfExpr::BinaryExpr(BinaryExpr { + fn mock_exprs() -> (Expr, Expr) { + let expr1 = Expr::BinaryExpr(BinaryExpr { left: Box::new(column("a")), op: Operator::Eq, right: Box::new(string_literal("a_value")), }); - let expr2 = DfExpr::BinaryExpr(BinaryExpr { + let expr2 = Expr::BinaryExpr(BinaryExpr { left: Box::new(column("b")), op: Operator::NotEq, right: Box::new(string_literal("b_value")), @@ -491,17 +491,17 @@ mod tests { assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b" && match_string_value(v, "b_value"))); - let and_expr = DfExpr::BinaryExpr(BinaryExpr { + let and_expr = Expr::BinaryExpr(BinaryExpr { left: Box::new(expr1.clone()), op: Operator::And, right: Box::new(expr2.clone()), }); - let or_expr = DfExpr::BinaryExpr(BinaryExpr { + let or_expr = Expr::BinaryExpr(BinaryExpr { left: Box::new(expr1.clone()), op: Operator::Or, right: Box::new(expr2.clone()), }); - let not_expr = DfExpr::Not(Box::new(expr1.clone())); + let not_expr = Expr::Not(Box::new(expr1.clone())); let and_p = Predicate::from_expr(and_expr).unwrap(); assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2)); @@ -510,7 +510,7 @@ mod tests { let not_p = Predicate::from_expr(not_expr).unwrap(); assert!(matches!(not_p, Predicate::Not(p) if *p == p1)); - let inlist_expr = DfExpr::InList(InList { + let inlist_expr = Expr::InList(InList { expr: Box::new(column("a")), list: vec![string_literal("a1"), string_literal("a2")], negated: false, @@ -520,7 +520,7 @@ mod tests { assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a" && match_string_values(values, &["a1", "a2"]))); - let inlist_expr = DfExpr::InList(InList { + let inlist_expr = Expr::InList(InList { expr: Box::new(column("a")), list: vec![string_literal("a1"), string_literal("a2")], negated: true, @@ -540,7 +540,7 @@ mod tests { let (expr1, expr2) = mock_exprs(); let request = ScanRequest { - filters: vec![expr1.into(), expr2.into()], + filters: vec![expr1, expr2], ..Default::default() }; let predicates = Predicates::from_scan_request(&Some(request)); @@ -578,7 +578,7 @@ mod tests { let (expr1, expr2) = mock_exprs(); let request = ScanRequest { - filters: vec![expr1.into(), expr2.into()], + filters: vec![expr1, expr2], ..Default::default() }; let predicates = Predicates::from_scan_request(&Some(request)); diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 5a100fc6131e..6705a63e4251 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -23,7 +23,6 @@ use datatypes::prelude::ConcreteDataType; pub use expr::build_filter_from_timestamp; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; -pub use self::expr::{DfExpr, Expr}; pub use self::udaf::AggregateFunction; pub use self::udf::ScalarUdf; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index c63de87f6147..2d30bee2afb2 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -16,28 +16,9 @@ use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; -pub use datafusion_expr::expr::Expr as DfExpr; +use datafusion_expr::expr::Expr; use datafusion_expr::{and, binary_expr, Operator}; -/// Central struct of query API. -/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub struct Expr { - df_expr: DfExpr, -} - -impl Expr { - pub fn df_expr(&self) -> &DfExpr { - &self.df_expr - } -} - -impl From for Expr { - fn from(df_expr: DfExpr) -> Self { - Self { df_expr } - } -} - /// Builds an `Expr` that filters timestamp column from given timestamp range. /// Returns [None] if time range is [None] or full time range. pub fn build_filter_from_timestamp( @@ -45,12 +26,12 @@ pub fn build_filter_from_timestamp( time_range: Option<&TimestampRange>, ) -> Option { let time_range = time_range?; - let ts_col_expr = DfExpr::Column(Column { + let ts_col_expr = Expr::Column(Column { relation: None, name: ts_col_name.to_string(), }); - let df_expr = match (time_range.start(), time_range.end()) { + match (time_range.start(), time_range.end()) { (None, None) => None, (Some(start), None) => Some(binary_expr( ts_col_expr, @@ -70,20 +51,18 @@ pub fn build_filter_from_timestamp( ), binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)), )), - }; - - df_expr.map(Expr::from) + } } /// Converts a [Timestamp] to datafusion literal value. -fn timestamp_to_literal(timestamp: &Timestamp) -> DfExpr { +fn timestamp_to_literal(timestamp: &Timestamp) -> Expr { let scalar_value = match timestamp.unit() { TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None), TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None), TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), }; - DfExpr::Literal(scalar_value) + Expr::Literal(scalar_value) } #[cfg(test)] @@ -91,11 +70,21 @@ mod tests { use super::*; #[test] - fn test_from_df_expr() { - let df_expr = DfExpr::Wildcard { qualifier: None }; + fn test_timestamp_to_literal() { + let timestamp = Timestamp::new(123456789, TimeUnit::Second); + let expected = Expr::Literal(ScalarValue::TimestampSecond(Some(123456789), None)); + assert_eq!(timestamp_to_literal(×tamp), expected); + + let timestamp = Timestamp::new(123456789, TimeUnit::Millisecond); + let expected = Expr::Literal(ScalarValue::TimestampMillisecond(Some(123456789), None)); + assert_eq!(timestamp_to_literal(×tamp), expected); - let expr: Expr = df_expr.into(); + let timestamp = Timestamp::new(123456789, TimeUnit::Microsecond); + let expected = Expr::Literal(ScalarValue::TimestampMicrosecond(Some(123456789), None)); + assert_eq!(timestamp_to_literal(×tamp), expected); - assert_eq!(DfExpr::Wildcard { qualifier: None }, *expr.df_expr()); + let timestamp = Timestamp::new(123456789, TimeUnit::Nanosecond); + let expected = Expr::Literal(ScalarValue::TimestampNanosecond(Some(123456789), None)); + assert_eq!(timestamp_to_literal(×tamp), expected); } } diff --git a/src/common/query/src/prelude.rs b/src/common/query/src/prelude.rs index de71ee107979..8cfc125583db 100644 --- a/src/common/query/src/prelude.rs +++ b/src/common/query/src/prelude.rs @@ -16,7 +16,7 @@ pub use datafusion_common::ScalarValue; pub use crate::columnar_value::ColumnarValue; pub use crate::function::*; -pub use crate::logical_plan::{create_udf, AggregateFunction, Expr, ScalarUdf}; +pub use crate::logical_plan::{create_udf, AggregateFunction, ScalarUdf}; pub use crate::signature::{Signature, TypeSignature, Volatility}; /// Default timestamp column name for Prometheus metrics. diff --git a/src/file-engine/src/query.rs b/src/file-engine/src/query.rs index 31e14a989839..679c9112358d 100644 --- a/src/file-engine/src/query.rs +++ b/src/file-engine/src/query.rs @@ -21,11 +21,11 @@ use std::task::{Context, Poll}; use common_datasource::object_store::build_backend; use common_error::ext::BoxedError; -use common_query::prelude::Expr; use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::logical_expr::utils as df_logical_expr_utils; +use datafusion_expr::expr::Expr; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::VectorRef; @@ -113,7 +113,7 @@ impl FileRegion { let mut aux_column_set = HashSet::new(); for scan_filter in scan_filters { - df_logical_expr_utils::expr_to_columns(scan_filter.df_expr(), &mut aux_column_set) + df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set) .context(ExtractColumnFromFilterSnafu)?; let all_file_columns = aux_column_set diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 2fb7f3dc6dee..8f35edd34403 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -19,7 +19,6 @@ use common_datasource::file_format::json::{JsonFormat, JsonOpener}; use common_datasource::file_format::orc::{OrcFormat, OrcOpener}; use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat}; use common_datasource::file_format::Format; -use common_query::prelude::Expr; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; use datafusion::common::{Statistics, ToDFSchema}; @@ -32,6 +31,7 @@ use datafusion::physical_expr::execution_props::ExecutionProps; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; +use datafusion_expr::expr::Expr; use datafusion_expr::utils::conjunction; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::SchemaRef; @@ -182,10 +182,7 @@ fn new_parquet_stream_with_exec_plan( }; // build predicate filter - let filters = filters - .iter() - .map(|f| f.df_expr().clone()) - .collect::>(); + let filters = filters.to_vec(); let filters = if let Some(expr) = conjunction(filters) { let df_schema = file_schema .clone() diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 13aa59fe8b30..fbb2f11d2087 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -19,6 +19,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true itertools.workspace = true lazy_static = "1.4" diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index ed4d6b6e4f7a..1193f7a999b1 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -15,10 +15,10 @@ use std::sync::Arc; use api::v1::SemanticType; -use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr; +use datafusion_expr::expr::Expr; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef}; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; @@ -158,7 +158,6 @@ impl MetricEngineInner { fn table_id_filter(&self, logical_region_id: RegionId) -> Expr { logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) .eq(logical_expr::lit(logical_region_id.table_id())) - .into() } /// Transform the projection from logical region to physical region. @@ -309,7 +308,6 @@ mod test { scan_req.filters[0], logical_expr::col(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) .eq(logical_expr::lit(logical_region_id.table_id())) - .into() ); // check default projection diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 6c869c6e3f2f..a246977649ea 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -402,7 +402,7 @@ impl MetadataRegion { ScanRequest { projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), - filters: vec![filter_expr.into()], + filters: vec![filter_expr], output_ordering: None, limit: None, } @@ -562,7 +562,7 @@ mod test { let expected_filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key)); let expected_scan_request = ScanRequest { projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), - filters: vec![expected_filter_expr.into()], + filters: vec![expected_filter_expr], output_ordering: None, limit: None, }; diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 4c2c127e246f..f50d3e32dd5e 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -273,7 +273,7 @@ impl CpuDataGenerator { fn random_host_filter(&self) -> Predicate { let host = self.random_hostname(); let expr = Expr::Column(Column::from_name("hostname")).eq(lit(host)); - Predicate::new(vec![expr.into()]) + Predicate::new(vec![expr]) } fn random_hostname(&self) -> String { diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 7c20dd8a5d10..b297af136a95 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -13,10 +13,9 @@ // limitations under the License. use api::v1::Rows; -use common_query::logical_plan::DfExpr; -use common_query::prelude::Expr; use common_recordbatch::RecordBatches; use datafusion_common::ScalarValue; +use datafusion_expr::expr::Expr; use datafusion_expr::{col, lit}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; @@ -27,7 +26,7 @@ use crate::test_util::{ build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, }; -async fn check_prune_row_groups(expr: DfExpr, expected: &str) { +async fn check_prune_row_groups(expr: Expr, expected: &str) { let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; @@ -56,7 +55,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) { .scan_to_stream( region_id, ScanRequest { - filters: vec![Expr::from(expr)], + filters: vec![expr], ..Default::default() }, ) @@ -134,17 +133,15 @@ async fn test_prune_tag_and_field() { /// Creates a time range `[start_sec, end_sec)` fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr { - Expr::from( - col("ts") - .gt_eq(lit(ScalarValue::TimestampMillisecond( - Some(start_sec * 1000), - None, - ))) - .and(col("ts").lt(lit(ScalarValue::TimestampMillisecond( - Some(end_sec * 1000), - None, - )))), - ) + col("ts") + .gt_eq(lit(ScalarValue::TimestampMillisecond( + Some(start_sec * 1000), + None, + ))) + .and(col("ts").lt(lit(ScalarValue::TimestampMillisecond( + Some(end_sec * 1000), + None, + )))) } #[tokio::test] @@ -235,7 +232,7 @@ async fn test_prune_memtable_complex_expr() { .await; // ts filter will be ignored when pruning time series in memtable. - let filters = vec![time_range_expr(4, 7), Expr::from(col("tag_0").lt(lit("6")))]; + let filters = vec![time_range_expr(4, 7), col("tag_0").lt(lit("6"))]; let stream = engine .scan_to_stream( diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 7db9a4877b81..280442968d32 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -553,7 +553,7 @@ mod tests { right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))), }); let iter = memtable - .iter(None, Some(Predicate::new(vec![expr.into()]))) + .iter(None, Some(Predicate::new(vec![expr]))) .unwrap(); let read = collect_iter_timestamps(iter); assert_eq!(timestamps, read); diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 3f3dd236c423..110032a68ed5 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -215,7 +215,7 @@ impl PartitionTree { predicate .exprs() .iter() - .filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr())) + .filter_map(SimpleFilterEvaluator::try_new) .collect::>() }) .unwrap_or_default(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index fe93882681a2..79bd74b9ef84 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -434,7 +434,7 @@ impl Iter { predicate .exprs() .iter() - .filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr())) + .filter_map(SimpleFilterEvaluator::try_new) .collect::>() }) .unwrap_or_default(); diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 104085a7c5f7..c414e91deb48 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -21,10 +21,9 @@ mod regex_match; use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; -use common_query::logical_plan::Expr; use common_telemetry::warn; use datafusion_common::ScalarValue; -use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; +use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; @@ -84,7 +83,7 @@ impl<'a> SstIndexApplierBuilder<'a> { /// the expressions provided. If no predicates match, returns `None`. pub fn build(mut self, exprs: &[Expr]) -> Result> { for expr in exprs { - self.traverse_and_collect(expr.df_expr()); + self.traverse_and_collect(expr); } if self.output.is_empty() { @@ -108,12 +107,12 @@ impl<'a> SstIndexApplierBuilder<'a> { /// Recursively traverses expressions to collect predicates. /// Results are stored in `self.output`. - fn traverse_and_collect(&mut self, expr: &DfExpr) { + fn traverse_and_collect(&mut self, expr: &Expr) { let res = match expr { - DfExpr::Between(between) => self.collect_between(between), + Expr::Between(between) => self.collect_between(between), - DfExpr::InList(in_list) => self.collect_inlist(in_list), - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Expr::InList(in_list) => self.collect_inlist(in_list), + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { Operator::And => { self.traverse_and_collect(left); self.traverse_and_collect(right); @@ -170,17 +169,17 @@ impl<'a> SstIndexApplierBuilder<'a> { } /// Helper function to get a non-null literal. - fn nonnull_lit(expr: &DfExpr) -> Option<&ScalarValue> { + fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> { match expr { - DfExpr::Literal(lit) if !lit.is_null() => Some(lit), + Expr::Literal(lit) if !lit.is_null() => Some(lit), _ => None, } } /// Helper function to get the column name of a column expression. - fn column_name(expr: &DfExpr) -> Option<&str> { + fn column_name(expr: &Expr) -> Option<&str> { match expr { - DfExpr::Column(column) => Some(&column.name), + Expr::Column(column) => Some(&column.name), _ => None, } } @@ -247,40 +246,40 @@ mod tests { ObjectStore::new(Memory::default()).unwrap().finish() } - pub(crate) fn tag_column() -> DfExpr { - DfExpr::Column(Column { + pub(crate) fn tag_column() -> Expr { + Expr::Column(Column { relation: None, name: "a".to_string(), }) } - pub(crate) fn tag_column2() -> DfExpr { - DfExpr::Column(Column { + pub(crate) fn tag_column2() -> Expr { + Expr::Column(Column { relation: None, name: "b".to_string(), }) } - pub(crate) fn field_column() -> DfExpr { - DfExpr::Column(Column { + pub(crate) fn field_column() -> Expr { + Expr::Column(Column { relation: None, name: "c".to_string(), }) } - pub(crate) fn nonexistent_column() -> DfExpr { - DfExpr::Column(Column { + pub(crate) fn nonexistent_column() -> Expr { + Expr::Column(Column { relation: None, name: "nonexistent".to_string(), }) } - pub(crate) fn string_lit(s: impl Into) -> DfExpr { - DfExpr::Literal(ScalarValue::Utf8(Some(s.into()))) + pub(crate) fn string_lit(s: impl Into) -> Expr { + Expr::Literal(ScalarValue::Utf8(Some(s.into()))) } - pub(crate) fn int64_lit(i: impl Into) -> DfExpr { - DfExpr::Literal(ScalarValue::Int64(Some(i.into()))) + pub(crate) fn int64_lit(i: impl Into) -> Expr { + Expr::Literal(ScalarValue::Int64(Some(i.into()))) } pub(crate) fn encoded_string(s: impl Into) -> Vec { @@ -316,14 +315,14 @@ mod tests { HashSet::default(), ); - let expr = DfExpr::BinaryExpr(BinaryExpr { - left: Box::new(DfExpr::BinaryExpr(BinaryExpr { + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), op: Operator::RegexMatch, right: Box::new(string_lit("bar")), })), op: Operator::And, - right: Box::new(DfExpr::Between(Between { + right: Box::new(Expr::Between(Between { expr: Box::new(tag_column2()), negated: false, low: Box::new(int64_lit(123)), diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 0e6fdc6125ad..45b58f858eca 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -439,7 +439,7 @@ mod tests { ®ion_metadata, Default::default(), ) - .build(&[expr.into()]) + .build(&[expr]) .unwrap() .unwrap(); Box::pin(async move { diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 3a49d84a2d8e..2a2bc065fe21 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -311,8 +311,7 @@ mod tests { })), op: Operator::Eq, right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))), - }) - .into()])); + })])); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) .predicate(predicate); @@ -400,8 +399,7 @@ mod tests { })), op: Operator::GtEq, right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))), - }) - .into()])); + })])); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) .predicate(predicate); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index dff995b76ba6..b56d776e6722 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -233,7 +233,7 @@ impl ParquetReaderBuilder { SimpleFilterContext::new_opt( ®ion_meta, self.expected_metadata.as_deref(), - expr.df_expr(), + expr, ) }) .collect::>() diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 282bb0397984..f8450e31b3b9 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -110,7 +110,6 @@ impl StatementExecutor { req.timestamp_range.as_ref(), ) }) - .map(|filter| filter.df_expr().clone()) .into_iter() .collect::>(); diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 4a30928dc8c5..bc7907903681 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -23,7 +23,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; -use common_query::prelude::Expr; +use datafusion_expr::expr::Expr; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; use datatypes::prelude::ConcreteDataType; @@ -285,50 +285,50 @@ async fn test_find_regions() { // test simple filter test( - vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10 + vec![binary_expr(col("a"), Operator::Lt, lit(10))], // a < 10 vec![0], ); test( - vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10 + vec![binary_expr(col("a"), Operator::LtEq, lit(10))], // a <= 10 vec![0, 1], ); test( - vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a + vec![binary_expr(lit(20), Operator::Gt, col("a"))], // 20 > a vec![0, 1], ); test( - vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a + vec![binary_expr(lit(20), Operator::GtEq, col("a"))], // 20 >= a vec![0, 1, 2], ); test( - vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a + vec![binary_expr(lit(45), Operator::Eq, col("a"))], // 45 == a vec![2], ); test( - vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45 + vec![binary_expr(col("a"), Operator::NotEq, lit(45))], // a != 45 vec![0, 1, 2, 3], ); test( - vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50 + vec![binary_expr(col("a"), Operator::Gt, lit(50))], // a > 50 vec![3], ); // test multiple filters test( vec![ - binary_expr(col("a"), Operator::Gt, lit(10)).into(), - binary_expr(col("a"), Operator::Gt, lit(50)).into(), + binary_expr(col("a"), Operator::Gt, lit(10)), + binary_expr(col("a"), Operator::Gt, lit(50)), ], // [a > 10, a > 50] vec![3], ); // test finding all regions when provided with not supported filters or not partition column test( - vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123 + vec![binary_expr(col("row_id"), Operator::LtEq, lit(123))], // row_id <= 123 vec![0, 1, 2, 3], ); test( - vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789 + vec![binary_expr(col("c"), Operator::Gt, lit(123))], // c > 789 vec![0, 1, 2, 3], ); @@ -340,24 +340,21 @@ async fn test_find_regions() { binary_expr(col("row_id"), Operator::Lt, lit(1)), binary_expr(col("a"), Operator::Lt, lit(1)), ), - ) - .into()], // row_id < 1 OR (row_id < 1 AND a > 1) + )], // row_id < 1 OR (row_id < 1 AND a > 1) vec![0, 1, 2, 3], ); test( vec![or( binary_expr(col("a"), Operator::Lt, lit(20)), binary_expr(col("a"), Operator::GtEq, lit(20)), - ) - .into()], // a < 20 OR a >= 20 + )], // a < 20 OR a >= 20 vec![0, 1, 2, 3], ); test( vec![and( binary_expr(col("a"), Operator::Lt, lit(20)), binary_expr(col("a"), Operator::Lt, lit(50)), - ) - .into()], // a < 20 AND a < 50 + )], // a < 20 AND a < 50 vec![0, 1], ); @@ -367,8 +364,7 @@ async fn test_find_regions() { vec![and( binary_expr(col("a"), Operator::Lt, lit(20)), binary_expr(col("a"), Operator::GtEq, lit(20)), - ) - .into()] + )] .as_slice(), ); // a < 20 AND a >= 20 assert!(matches!( diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index f5a245dba335..ff0d668b23aa 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -17,8 +17,8 @@ use std::any::Any; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; -use common_query::prelude::Expr; use datafusion_common::ScalarValue; +use datafusion_expr::expr::Expr; use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 3d192fc97700..87ce4167a007 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -21,8 +21,7 @@ use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{self, RegionRoute}; -use common_query::prelude::Expr; -use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; +use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::prelude::Value; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; @@ -327,12 +326,11 @@ fn create_partitions_from_region_routes( } fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { - let expr = filter.df_expr(); - match expr { - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if op.is_comparison_operator() => { + match filter { + Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.is_comparison_operator() => { let column_op_value = match (left.as_ref(), right.as_ref()) { - (DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)), - (DfExpr::Literal(v), DfExpr::Column(c)) => Some(( + (Expr::Column(c), Expr::Literal(v)) => Some((&c.name, *op, v)), + (Expr::Literal(v), Expr::Column(c)) => Some(( &c.name, // Safety: previous branch ensures this is a comparison operator op.swap().unwrap(), @@ -352,11 +350,11 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result>()); } } - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) + Expr::BinaryExpr(BinaryExpr { left, op, right }) if matches!(op, Operator::And | Operator::Or) => { - let left_regions = find_regions0(partition_rule.clone(), &(*left.clone()).into())?; - let right_regions = find_regions0(partition_rule.clone(), &(*right.clone()).into())?; + let left_regions = find_regions0(partition_rule.clone(), &left.clone())?; + let right_regions = find_regions0(partition_rule.clone(), &right.clone())?; let regions = match op { Operator::And => left_regions .intersection(&right_regions) diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index c39b6bad59ee..bc7c94e12abb 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -162,10 +162,7 @@ impl TableProvider for DummyTableProvider { Some(x) if !x.is_empty() => Some(x.clone()), _ => None, }; - request.filters = filters - .iter() - .map(|e| common_query::logical_plan::Expr::from(e.clone())) - .collect(); + request.filters = filters.to_vec(); request.limit = limit; let scanner = self diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 9c19b1f9b15a..87b69c6588a9 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -18,11 +18,11 @@ use catalog::memory::new_memory_catalog_manager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; -use common_query::prelude::Expr; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; +use datafusion_expr::expr::Expr; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 7068f6686d1b..86d1f0597c42 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -18,6 +18,7 @@ common-query.workspace = true common-recordbatch.workspace = true common-wal.workspace = true datafusion-physical-plan.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index ad7edc897b91..5e43a2ab0dbc 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_query::logical_plan::Expr; use common_recordbatch::OrderOption; +use datafusion_expr::expr::Expr; #[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index aecb96f267ee..079022925ff7 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_query::logical_plan::{DfExpr, Expr}; use common_telemetry::{error, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; @@ -22,7 +21,7 @@ use common_time::Timestamp; use datafusion::common::ScalarValue; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::ToDFSchema; -use datafusion_expr::expr::InList; +use datafusion_expr::expr::{Expr, InList}; use datafusion_expr::{Between, BinaryExpr, Operator}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; @@ -88,9 +87,7 @@ impl Predicate { Ok(self .exprs .iter() - .filter_map(|expr| { - create_physical_expr(expr.df_expr(), df_schema.as_ref(), execution_props).ok() - }) + .filter_map(|expr| create_physical_expr(expr, df_schema.as_ref(), execution_props).ok()) .collect::>()) } @@ -154,7 +151,7 @@ impl<'a> TimeRangePredicateBuilder<'a> { let mut res = TimestampRange::min_to_max(); for expr in self.filters { let range = self - .extract_time_range_from_expr(expr.df_expr()) + .extract_time_range_from_expr(expr) .unwrap_or_else(TimestampRange::min_to_max); res = res.and(&range); } @@ -163,18 +160,18 @@ impl<'a> TimeRangePredicateBuilder<'a> { /// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. /// Return None if no time range can be found in expr. - fn extract_time_range_from_expr(&self, expr: &DfExpr) -> Option { + fn extract_time_range_from_expr(&self, expr: &Expr) -> Option { match expr { - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { self.extract_from_binary_expr(left, op, right) } - DfExpr::Between(Between { + Expr::Between(Between { expr, negated, low, high, }) => self.extract_from_between_expr(expr, negated, low, high), - DfExpr::InList(InList { + Expr::InList(InList { expr, list, negated, @@ -185,9 +182,9 @@ impl<'a> TimeRangePredicateBuilder<'a> { fn extract_from_binary_expr( &self, - left: &DfExpr, + left: &Expr, op: &Operator, - right: &DfExpr, + right: &Expr, ) -> Option { match op { Operator::Eq => self @@ -291,10 +288,10 @@ impl<'a> TimeRangePredicateBuilder<'a> { } } - fn get_timestamp_filter(&self, left: &DfExpr, right: &DfExpr) -> Option<(Timestamp, bool)> { + fn get_timestamp_filter(&self, left: &Expr, right: &Expr) -> Option<(Timestamp, bool)> { let (col, lit, reverse) = match (left, right) { - (DfExpr::Column(column), DfExpr::Literal(scalar)) => (column, scalar, false), - (DfExpr::Literal(scalar), DfExpr::Column(column)) => (column, scalar, true), + (Expr::Column(column), Expr::Literal(scalar)) => (column, scalar, false), + (Expr::Literal(scalar), Expr::Column(column)) => (column, scalar, true), _ => { return None; } @@ -309,12 +306,12 @@ impl<'a> TimeRangePredicateBuilder<'a> { fn extract_from_between_expr( &self, - expr: &DfExpr, + expr: &Expr, negated: &bool, - low: &DfExpr, - high: &DfExpr, + low: &Expr, + high: &Expr, ) -> Option { - let DfExpr::Column(col) = expr else { + let Expr::Column(col) = expr else { return None; }; if col.name != self.ts_col_name { @@ -326,7 +323,7 @@ impl<'a> TimeRangePredicateBuilder<'a> { } match (low, high) { - (DfExpr::Literal(low), DfExpr::Literal(high)) => { + (Expr::Literal(low), Expr::Literal(high)) => { return_none_if_utf8!(low); return_none_if_utf8!(high); @@ -343,14 +340,14 @@ impl<'a> TimeRangePredicateBuilder<'a> { /// Extract time range filter from `IN (...)` expr. fn extract_from_in_list_expr( &self, - expr: &DfExpr, + expr: &Expr, negated: bool, - list: &[DfExpr], + list: &[Expr], ) -> Option { if negated { return None; } - let DfExpr::Column(col) = expr else { + let Expr::Column(col) = expr else { return None; }; if col.name != self.ts_col_name { @@ -362,7 +359,7 @@ impl<'a> TimeRangePredicateBuilder<'a> { } let mut init_range = TimestampRange::empty(); for expr in list { - if let DfExpr::Literal(scalar) = expr { + if let Expr::Literal(scalar) = expr { return_none_if_utf8!(scalar); if let Some(timestamp) = scalar_value_to_timestamp(scalar, None) { init_range = init_range.or(&TimestampRange::single(timestamp)) @@ -395,11 +392,10 @@ mod tests { use super::*; use crate::predicate::stats::RowGroupPruningStatistics; - fn check_build_predicate(expr: DfExpr, expect: TimestampRange) { + fn check_build_predicate(expr: Expr, expect: TimestampRange) { assert_eq!( expect, - TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &[Expr::from(expr)]) - .build() + TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &[expr]).build() ); } @@ -598,11 +594,7 @@ mod tests { (path, schema) } - async fn assert_prune( - array_cnt: usize, - filters: Vec, - expect: Vec, - ) { + async fn assert_prune(array_cnt: usize, filters: Vec, expect: Vec) { let dir = create_temp_dir("prune_parquet"); let (path, arrow_schema) = gen_test_parquet_file(&dir, array_cnt).await; let schema = Arc::new(datatypes::schema::Schema::try_from(arrow_schema.clone()).unwrap()); @@ -624,16 +616,14 @@ mod tests { assert_eq!(expect, res); } - fn gen_predicate(max_val: i32, op: Operator) -> Vec { - vec![common_query::logical_plan::Expr::from( - datafusion_expr::Expr::BinaryExpr(BinaryExpr { - left: Box::new(datafusion_expr::Expr::Column(Column::from_name("cnt"))), - op, - right: Box::new(datafusion_expr::Expr::Literal(ScalarValue::Int32(Some( - max_val, - )))), - }), - )] + fn gen_predicate(max_val: i32, op: Operator) -> Vec { + vec![datafusion_expr::Expr::BinaryExpr(BinaryExpr { + left: Box::new(datafusion_expr::Expr::Column(Column::from_name("cnt"))), + op, + right: Box::new(datafusion_expr::Expr::Literal(ScalarValue::Int32(Some( + max_val, + )))), + })] } #[tokio::test] @@ -702,14 +692,14 @@ mod tests { let e = datafusion_expr::Expr::Column(Column::from_name("cnt")) .gt(30.lit()) .or(datafusion_expr::Expr::Column(Column::from_name("cnt")).lt(20.lit())); - assert_prune(40, vec![e.into()], vec![true, true, false, true]).await; + assert_prune(40, vec![e], vec![true, true, false, true]).await; } #[tokio::test] async fn test_to_physical_expr() { let predicate = Predicate::new(vec![ - Expr::from(col("host").eq(lit("host_a"))), - Expr::from(col("ts").gt(lit(ScalarValue::TimestampMicrosecond(Some(123), None)))), + col("host").eq(lit("host_a")), + col("ts").gt(lit(ScalarValue::TimestampMicrosecond(Some(123), None))), ]); let schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new( diff --git a/src/table/src/table.rs b/src/table/src/table.rs index ae07a905b92f..d0c8ede323bd 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -15,8 +15,8 @@ use std::collections::HashSet; use std::sync::Arc; -use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; +use datafusion_expr::expr::Expr; use datatypes::schema::{ColumnSchema, SchemaRef}; use snafu::ResultExt; use store_api::data_source::DataSourceRef; diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index d00988958531..565d3d7e621b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -15,14 +15,13 @@ use std::any::Any; use std::sync::{Arc, Mutex}; -use common_query::logical_plan::Expr; use common_recordbatch::OrderOption; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; -use datafusion_expr::expr::Expr as DfExpr; +use datafusion_expr::expr::Expr; use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; @@ -82,7 +81,7 @@ impl TableProvider for DfTableProviderAdapter { &self, _ctx: &SessionState, projection: Option<&Vec>, - filters: &[DfExpr], + filters: &[Expr], limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); @@ -121,12 +120,9 @@ impl TableProvider for DfTableProviderAdapter { fn supports_filters_pushdown( &self, - filters: &[&DfExpr], + filters: &[&Expr], ) -> DfResult> { - let filters = filters - .iter() - .map(|&x| x.clone().into()) - .collect::>(); + let filters = filters.iter().map(|&x| x.clone()).collect::>(); Ok(self .table .supports_filters_pushdown(&filters.iter().collect::>())