From e2a1cb58401f7b98c1d4081bc06963e6741f8f74 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Fri, 17 May 2024 16:31:55 +0800 Subject: [PATCH] feat: support evaluate expr in range query param (#3823) * feat: support evaluate expr in range query param * chore: fix comment * chore: fix code comment * fix: disbale now in duration param --- Cargo.lock | 18 +- Cargo.toml | 2 +- src/query/src/range_select/plan_rewrite.rs | 179 ++++++++++++++++-- .../standalone/common/range/error.result | 4 +- tests/cases/standalone/common/range/to.result | 24 +++ tests/cases/standalone/common/range/to.sql | 12 ++ 6 files changed, 210 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea8de8ee8bae..23bf8f1200ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2043,7 +2043,7 @@ dependencies = [ "datatypes", "serde", "snafu 0.8.2", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "sqlparser_derive 0.1.1", "statrs", "tokio", @@ -3947,7 +3947,7 @@ dependencies = [ "session", "snafu 0.8.2", "sql", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "store-api", "strfmt", "table", @@ -6670,7 +6670,7 @@ dependencies = [ "session", "snafu 0.8.2", "sql", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "store-api", "substrait 0.7.2", "table", @@ -6932,7 +6932,7 @@ dependencies = [ "serde_json", "snafu 0.8.2", "sql", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "store-api", "table", ] @@ -9903,7 +9903,7 @@ dependencies = [ "lazy_static", "regex", "snafu 0.8.2", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "sqlparser_derive 0.1.1", "table", ] @@ -9967,13 +9967,13 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.44.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0#c919990bf62ad38d2b0c0a3bc90b26ad919d51b0" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d#e4e496b8d62416ad50ce70a1b460c7313610cf5d" dependencies = [ "lazy_static", "log", "regex", "sqlparser 0.44.0 (registry+https://github.com/rust-lang/crates.io-index)", - "sqlparser_derive 0.2.2 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser_derive 0.2.2 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", ] [[package]] @@ -10001,7 +10001,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.2.2" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0#c919990bf62ad38d2b0c0a3bc90b26ad919d51b0" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d#e4e496b8d62416ad50ce70a1b460c7313610cf5d" dependencies = [ "proc-macro2", "quote", @@ -10645,7 +10645,7 @@ dependencies = [ "serde_json", "snafu 0.8.2", "sql", - "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", + "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=e4e496b8d62416ad50ce70a1b460c7313610cf5d)", "sqlx", "tinytemplate", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 1ece9e77fabc..0cf22de319cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,7 +159,7 @@ smallvec = { version = "1", features = ["serde"] } snafu = "0.8" sysinfo = "0.30" # on branch v0.44.x -sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [ +sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e4e496b8d62416ad50ce70a1b460c7313610cf5d", features = [ "visitor", ] } strum = { version = "0.25", features = ["derive"] } diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 4035f20a63e2..087fa2c9010a 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -19,6 +19,7 @@ use std::time::Duration; use arrow_schema::DataType; use async_recursion::async_recursion; use catalog::table_source::DfTableSourceProvider; +use chrono::Utc; use common_time::interval::NANOS_PER_MILLI; use common_time::timestamp::TimeUnit; use common_time::{Interval, Timestamp, Timezone}; @@ -27,10 +28,13 @@ use datafusion::prelude::Column; use datafusion::scalar::ScalarValue; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; use datafusion_common::{DFSchema, DataFusionError, Result as DFResult}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ Aggregate, Analyze, Explain, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, Projection, }; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datatypes::prelude::ConcreteDataType; use promql_parser::util::parse_duration; use session::context::QueryContextRef; @@ -108,34 +112,84 @@ fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult { /// Parse a duraion expr: /// 1. duration string (e.g. `'1h'`) /// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`) +/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`) fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult { - let interval_to_duration = |interval: Interval| -> Duration { - Duration::from_millis((interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u64) - }; match args.get(i) { Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => { parse_duration(str).map_err(DataFusionError::Plan) } - Some(Expr::Literal(ScalarValue::IntervalYearMonth(Some(i)))) => { - Ok(interval_to_duration(Interval::from_i32(*i))) + Some(expr) => { + let ms = evaluate_expr_to_millisecond(args, i, true)?; + if ms <= 0 { + return Err(dispose_parse_error(Some(expr))); + } + Ok(Duration::from_millis(ms as u64)) + } + None => Err(dispose_parse_error(None)), + } +} + +/// Evaluate a time calculation expr, case like: +/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'` +/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`) +/// +/// Output a millisecond timestamp +/// +/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error) +fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult { + let Some(expr) = args.get(i) else { + return Err(dispose_parse_error(None)); + }; + if interval_only && !interval_only_in_expr(expr) { + return Err(dispose_parse_error(Some(expr))); + } + let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now()); + let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty())); + let interval_to_ms = + |interval: Interval| -> i64 { (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as i64 }; + let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?; + match simplify_expr { + Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _)) + | Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => { + ts_nanos.map(|v| v / 1_000_000) + } + Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _)) + | Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => { + ts_micros.map(|v| v / 1_000) } - Some(Expr::Literal(ScalarValue::IntervalDayTime(Some(i)))) => { - Ok(interval_to_duration(Interval::from_i64(*i))) + Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _)) + | Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis, + Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _)) + | Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000), + Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => { + interval.map(|v| interval_to_ms(Interval::from_i32(v))) } - Some(Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(i)))) => { - Ok(interval_to_duration(Interval::from_i128(*i))) + Expr::Literal(ScalarValue::IntervalDayTime(interval)) => { + interval.map(|v| interval_to_ms(Interval::from_i64(v))) } - other => Err(dispose_parse_error(other)), + Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => { + interval.map(|v| interval_to_ms(Interval::from_i128(v))) + } + _ => None, } + .ok_or_else(|| { + DataFusionError::Plan(format!( + "{} is not a expr can be evaluate and use in range query", + expr.display_name().unwrap_or_default() + )) + }) } /// Parse the `align to` clause and return a UTC timestamp with unit of millisecond, /// which is used as the basis for dividing time slot during the align operation. /// 1. NOW: align to current execute time /// 2. Timestamp string: align to specific timestamp -/// 3. leave empty (as Default Option): align to unix epoch 0 (timezone aware) +/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`) +/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware) fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult { - let s = parse_str_expr(args, i)?; + let Ok(s) = parse_str_expr(args, i) else { + return evaluate_expr_to_millisecond(args, i, false); + }; let upper = s.to_uppercase(); match upper.as_str() { "NOW" => return Ok(Timestamp::current_millis().value()), @@ -469,6 +523,25 @@ fn have_range_in_exprs(exprs: &[Expr]) -> bool { }) } +fn interval_only_in_expr(expr: &Expr) -> bool { + let mut all_interval = true; + let _ = expr.apply(&mut |expr| { + if !matches!( + expr, + Expr::Literal(ScalarValue::IntervalDayTime(_)) + | Expr::Literal(ScalarValue::IntervalMonthDayNano(_)) + | Expr::Literal(ScalarValue::IntervalYearMonth(_)) + | Expr::BinaryExpr(_) + ) { + all_interval = false; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }); + all_interval +} + #[cfg(test)] mod test { @@ -477,6 +550,7 @@ mod test { use catalog::memory::MemoryCatalogManager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use datafusion_expr::{BinaryExpr, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use session::context::QueryContext; @@ -754,8 +828,42 @@ mod test { parse_duration_expr(&args, 0).unwrap(), parse_duration("1y4w").unwrap() ); - // test err + // test index err assert!(parse_duration_expr(&args, 10).is_err()); + // test evaluate expr + let args = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + op: Operator::Plus, + right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + })]; + assert_eq!( + parse_duration_expr(&args, 0).unwrap().as_millis(), + interval_to_ms(Interval::from_year_month(20)) + ); + let args = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + op: Operator::Minus, + right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + })]; + // test zero interval error + assert!(parse_duration_expr(&args, 0).is_err()); + // test must all be interval + let args = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + op: Operator::Minus, + right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))), + })]; + assert!(parse_duration_expr(&args, 0).is_err()); } #[test] @@ -787,19 +895,56 @@ mod test { let args = vec![Expr::Literal(ScalarValue::Utf8(Some( "1970-01-01T00:00:00+08:00".into(), )))]; - assert!(parse_align_to(&args, 0, None).unwrap() == -8 * 60 * 60 * 1000); + assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000); // timezone let args = vec![Expr::Literal(ScalarValue::Utf8(Some( "1970-01-01T00:00:00".into(), )))]; - assert!( + assert_eq!( parse_align_to( &args, 0, Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap()) ) - .unwrap() - == -8 * 60 * 60 * 1000 + .unwrap(), + -8 * 60 * 60 * 1000 ); + // test evaluate expr + let args = vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + op: Operator::Plus, + right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + })]; + assert_eq!( + parse_align_to(&args, 0, None).unwrap(), + // 20 month + 20 * 30 * 24 * 60 * 60 * 1000 + ); + } + + #[test] + fn test_interval_only() { + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))), + op: Operator::Minus, + right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + }); + assert!(!interval_only_in_expr(&expr)); + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + op: Operator::Minus, + right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some( + Interval::from_year_month(10).to_i32(), + )))), + }); + assert!(interval_only_in_expr(&expr)); } } diff --git a/tests/cases/standalone/common/range/error.result b/tests/cases/standalone/common/range/error.result index 01beca591f2a..ecdaac7d782e 100644 --- a/tests/cases/standalone/common/range/error.result +++ b/tests/cases/standalone/common/range/error.result @@ -98,11 +98,11 @@ Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must b SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day); -Error: 2000(InvalidSyntax), Range Query: Can't use 0 as align in Range Query +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s'; -Error: 2000(InvalidSyntax), Range Query: Invalid Range expr `MIN(host.val) RANGE IntervalMonthDayNano("0")`, Can't use 0 as range in Range Query +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query DROP TABLE host; diff --git a/tests/cases/standalone/common/range/to.result b/tests/cases/standalone/common/range/to.result index a2bfb3de4cd8..e7e0445316b3 100644 --- a/tests/cases/standalone/common/range/to.result +++ b/tests/cases/standalone/common/range/to.result @@ -82,6 +82,30 @@ SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) | 2024-01-24T23:00:00 | 3 | +---------------------+------------------------------------------------------------------+ +SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts; + ++---------------------+-----------------------------------------------------------------------------------------------------------------+ +| ts | MIN(host.val) RANGE IntervalMonthDayNano("36893488147419103232") - IntervalMonthDayNano("18446744073709551616") | ++---------------------+-----------------------------------------------------------------------------------------------------------------+ +| 2024-01-22T23:00:00 | 0 | +| 2024-01-23T23:00:00 | 1 | +| 2024-01-24T23:00:00 | 3 | ++---------------------+-----------------------------------------------------------------------------------------------------------------+ + +-- non-positive duration +SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("36893488147419103232")` in range select query + +SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("18446744073709551616")` in range select query + +-- duration not all interval +SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `now() - IntervalMonthDayNano("18446744073709551616")` in range select query + --- ALIGN TO with time zone --- set time_zone='Asia/Shanghai'; diff --git a/tests/cases/standalone/common/range/to.sql b/tests/cases/standalone/common/range/to.sql index 29610ca16558..70b6849c0d03 100644 --- a/tests/cases/standalone/common/range/to.sql +++ b/tests/cases/standalone/common/range/to.sql @@ -26,6 +26,18 @@ SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '2023-01-01T00:00:0 SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; +SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts; + +-- non-positive duration + +SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +-- duration not all interval + +SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + --- ALIGN TO with time zone --- set time_zone='Asia/Shanghai';