Skip to content

Commit

Permalink
feat: support evaluate expr in range query param (#3823)
Browse files Browse the repository at this point in the history
* feat: support evaluate expr in range query param

* chore: fix comment

* chore: fix code comment

* fix: disbale now in duration param
  • Loading branch information
Taylor-lagrange authored May 17, 2024
1 parent f696f41 commit e2a1cb5
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 29 deletions.
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sysinfo = "0.30"
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e4e496b8d62416ad50ce70a1b460c7313610cf5d", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
Expand Down
179 changes: 162 additions & 17 deletions src/query/src/range_select/plan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;
use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::Utc;
use common_time::interval::NANOS_PER_MILLI;
use common_time::timestamp::TimeUnit;
use common_time::{Interval, Timestamp, Timezone};
Expand All @@ -27,10 +28,13 @@ use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{
Aggregate, Analyze, Explain, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder,
Projection,
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -108,34 +112,84 @@ fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
/// Parse a duraion expr:
/// 1. duration string (e.g. `'1h'`)
/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
let interval_to_duration = |interval: Interval| -> Duration {
Duration::from_millis((interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u64)
};
match args.get(i) {
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
parse_duration(str).map_err(DataFusionError::Plan)
}
Some(Expr::Literal(ScalarValue::IntervalYearMonth(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i32(*i)))
Some(expr) => {
let ms = evaluate_expr_to_millisecond(args, i, true)?;
if ms <= 0 {
return Err(dispose_parse_error(Some(expr)));
}
Ok(Duration::from_millis(ms as u64))
}
None => Err(dispose_parse_error(None)),
}
}

/// Evaluate a time calculation expr, case like:
/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
///
/// Output a millisecond timestamp
///
/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
let Some(expr) = args.get(i) else {
return Err(dispose_parse_error(None));
};
if interval_only && !interval_only_in_expr(expr) {
return Err(dispose_parse_error(Some(expr)));
}
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let interval_to_ms =
|interval: Interval| -> i64 { (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as i64 };
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
| Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
ts_nanos.map(|v| v / 1_000_000)
}
Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
| Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
ts_micros.map(|v| v / 1_000)
}
Some(Expr::Literal(ScalarValue::IntervalDayTime(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i64(*i)))
Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
| Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
| Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i32(v)))
}
Some(Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i128(*i)))
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i64(v)))
}
other => Err(dispose_parse_error(other)),
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i128(v)))
}
_ => None,
}
.ok_or_else(|| {
DataFusionError::Plan(format!(
"{} is not a expr can be evaluate and use in range query",
expr.display_name().unwrap_or_default()
))
})
}

/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
/// which is used as the basis for dividing time slot during the align operation.
/// 1. NOW: align to current execute time
/// 2. Timestamp string: align to specific timestamp
/// 3. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
let s = parse_str_expr(args, i)?;
let Ok(s) = parse_str_expr(args, i) else {
return evaluate_expr_to_millisecond(args, i, false);
};
let upper = s.to_uppercase();
match upper.as_str() {
"NOW" => return Ok(Timestamp::current_millis().value()),
Expand Down Expand Up @@ -469,6 +523,25 @@ fn have_range_in_exprs(exprs: &[Expr]) -> bool {
})
}

fn interval_only_in_expr(expr: &Expr) -> bool {
let mut all_interval = true;
let _ = expr.apply(&mut |expr| {
if !matches!(
expr,
Expr::Literal(ScalarValue::IntervalDayTime(_))
| Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
| Expr::Literal(ScalarValue::IntervalYearMonth(_))
| Expr::BinaryExpr(_)
) {
all_interval = false;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
all_interval
}

#[cfg(test)]
mod test {

Expand All @@ -477,6 +550,7 @@ mod test {
use catalog::memory::MemoryCatalogManager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion_expr::{BinaryExpr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
Expand Down Expand Up @@ -754,8 +828,42 @@ mod test {
parse_duration_expr(&args, 0).unwrap(),
parse_duration("1y4w").unwrap()
);
// test err
// test index err
assert!(parse_duration_expr(&args, 10).is_err());
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(Interval::from_year_month(20))
);
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
// test zero interval error
assert!(parse_duration_expr(&args, 0).is_err());
// test must all be interval
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
})];
assert!(parse_duration_expr(&args, 0).is_err());
}

#[test]
Expand Down Expand Up @@ -787,19 +895,56 @@ mod test {
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00+08:00".into(),
)))];
assert!(parse_align_to(&args, 0, None).unwrap() == -8 * 60 * 60 * 1000);
assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
// timezone
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00".into(),
)))];
assert!(
assert_eq!(
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
)
.unwrap()
== -8 * 60 * 60 * 1000
.unwrap(),
-8 * 60 * 60 * 1000
);
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
assert_eq!(
parse_align_to(&args, 0, None).unwrap(),
// 20 month
20 * 30 * 24 * 60 * 60 * 1000
);
}

#[test]
fn test_interval_only() {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
});
assert!(!interval_only_in_expr(&expr));
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
});
assert!(interval_only_in_expr(&expr));
}
}
4 changes: 2 additions & 2 deletions tests/cases/standalone/common/range/error.result
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must b

SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day);

Error: 2000(InvalidSyntax), Range Query: Can't use 0 as align in Range Query
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query

SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s';

Error: 2000(InvalidSyntax), Range Query: Invalid Range expr `MIN(host.val) RANGE IntervalMonthDayNano("0")`, Can't use 0 as range in Range Query
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query

DROP TABLE host;

Expand Down
24 changes: 24 additions & 0 deletions tests/cases/standalone/common/range/to.result
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day)
| 2024-01-24T23:00:00 | 3 |
+---------------------+------------------------------------------------------------------+

SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts;

+---------------------+-----------------------------------------------------------------------------------------------------------------+
| ts | MIN(host.val) RANGE IntervalMonthDayNano("36893488147419103232") - IntervalMonthDayNano("18446744073709551616") |
+---------------------+-----------------------------------------------------------------------------------------------------------------+
| 2024-01-22T23:00:00 | 0 |
| 2024-01-23T23:00:00 | 1 |
| 2024-01-24T23:00:00 | 3 |
+---------------------+-----------------------------------------------------------------------------------------------------------------+

-- non-positive duration
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("36893488147419103232")` in range select query

SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("18446744073709551616")` in range select query

-- duration not all interval
SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `now() - IntervalMonthDayNano("18446744073709551616")` in range select query

--- ALIGN TO with time zone ---
set time_zone='Asia/Shanghai';

Expand Down
Loading

0 comments on commit e2a1cb5

Please sign in to comment.