diff --git a/Cargo.lock b/Cargo.lock index 099d0ead5708..2e8621b0fe66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1866,7 +1866,7 @@ dependencies = [ "datatypes", "serde", "snafu", - "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd)", + "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "sqlparser_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "statrs", "tokio", @@ -3269,7 +3269,7 @@ dependencies = [ "session", "snafu", "sql", - "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd)", + "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "store-api", "strfmt", "substrait 0.4.4", @@ -5562,7 +5562,7 @@ dependencies = [ "session", "snafu", "sql", - "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd)", + "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "store-api", "substrait 0.4.4", "table", @@ -8562,7 +8562,7 @@ dependencies = [ "once_cell", "regex", "snafu", - "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd)", + "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "sqlparser_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "table", ] @@ -8625,13 +8625,13 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.38.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd#0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef#6a93567ae38d42be5c8d08b13c8ff4dde26502ef" dependencies = [ "lazy_static", "log", "regex", "sqlparser 0.38.0 (registry+https://github.com/rust-lang/crates.io-index)", - "sqlparser_derive 0.1.1 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd)", + "sqlparser_derive 0.1.1 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", ] [[package]] @@ -8648,7 +8648,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.1.1" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd#0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef#6a93567ae38d42be5c8d08b13c8ff4dde26502ef" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 449e17298310..d32e6c188ea2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,7 @@ serde_json = "1.0" smallvec = "1" snafu = "0.7" # on branch v0.38.x -sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "0fbae07d0c46dc18e3381c406d8b9b8abef6b1fd", features = [ +sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [ "visitor", ] } strum = { version = "0.25", features = ["derive"] } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 54506f6c93bd..6ed714a50d6c 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(let_chains)] +#![feature(int_roundings)] pub mod dataframe; pub mod datafusion; diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index fd6920cc4b9c..bedb3242a976 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -52,9 +52,9 @@ use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow::row::{OwnedRow, RowConverter, SortField}; use futures::{ready, Stream}; use futures_util::StreamExt; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; -use crate::error::{DataFusionSnafu, Result}; +use crate::error::{DataFusionSnafu, RangeQuerySnafu, Result}; type Millisecond = ::Native; @@ -147,7 +147,7 @@ impl Fill { #[derive(Eq, Clone, Debug)] pub struct RangeFn { - /// with format like `max(a) 300s null` + /// with format like `max(a) RANGE 300s FILL NULL` pub name: String, pub data_type: DataType, pub expr: Expr, @@ -197,6 +197,7 @@ pub struct RangeSelect { /// all range expressions pub range_expr: Vec, pub align: Duration, + pub align_to: i64, pub time_index: String, pub by: Vec, pub schema: DFSchemaRef, @@ -216,10 +217,28 @@ impl RangeSelect { input: Arc, range_expr: Vec, align: Duration, + align_to: i64, time_index: Expr, by: Vec, projection_expr: &[Expr], ) -> Result { + ensure!( + align.as_millis() != 0, + RangeQuerySnafu { + msg: "Can't use 0 as align in Range Query" + } + ); + for expr in &range_expr { + ensure!( + expr.range.as_millis() != 0, + RangeQuerySnafu { + msg: format!( + "Invalid Range expr `{}`, Can't use 0 as range in Range Query", + expr.name + ) + } + ); + } let mut fields = range_expr .iter() .map( @@ -289,6 +308,7 @@ impl RangeSelect { input, range_expr, align, + align_to, time_index: time_index_name, schema, by_schema, @@ -322,13 +342,19 @@ impl UserDefinedLogicalNodeCore for RangeSelect { fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "RangeSelect: range_exprs=[{}], align={}s time_index={}", + "RangeSelect: range_exprs=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}", self.range_expr .iter() .map(ToString::to_string) .collect::>() .join(", "), - self.align.as_secs(), + self.align.as_millis(), + self.align_to, + self.by + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "), self.time_index ) } @@ -338,6 +364,7 @@ impl UserDefinedLogicalNodeCore for RangeSelect { Self { align: self.align, + align_to: self.align_to, range_expr: self.range_expr.clone(), input: Arc::new(inputs[0].clone()), time_index: self.time_index.clone(), @@ -463,6 +490,7 @@ impl RangeSelect { input: exec_input, range_exec, align: self.align.as_millis() as Millisecond, + align_to: self.align_to, by: self.create_physical_expr_list( &self.by, input_dfschema, @@ -493,6 +521,7 @@ pub struct RangeSelectExec { input: Arc, range_exec: Vec, align: Millisecond, + align_to: i64, time_index: String, by: Vec>, schema: SchemaRef, @@ -510,16 +539,24 @@ impl DisplayAs for RangeSelectExec { let range_expr_strs: Vec = self .range_exec .iter() - .map(|e| format!("RangeFnExec{{ {}, range: {:?}}}", e.expr.name(), e.range)) + .map(|e| { + format!( + "{} RANGE {}s FILL {}", + e.expr.name(), + e.range / 1000, + e.fill + ) + }) .collect(); let by: Vec = self.by.iter().map(|e| e.to_string()).collect(); write!( f, - "range_expr=[{}], align={}, time_index={}, by=[{}]", + "range_expr=[{}], align={}ms, align_to={}ms, align_by=[{}], time_index={}", range_expr_strs.join(", "), self.align, + self.align_to, + by.join(", "), self.time_index, - by.join(", ") )?; } } @@ -563,6 +600,7 @@ impl ExecutionPlan for RangeSelectExec { time_index: self.time_index.clone(), by: self.by.clone(), align: self.align, + align_to: self.align_to, schema: self.schema.clone(), by_schema: self.by_schema.clone(), metric: self.metric.clone(), @@ -599,6 +637,7 @@ impl ExecutionPlan for RangeSelectExec { random_state: RandomState::new(), time_index, align: self.align, + align_to: self.align_to, by: self.by.clone(), series_map: HashMap::new(), exec_state: ExecutionState::ReadingInput, @@ -629,6 +668,7 @@ struct RangeSelectStream { time_index: usize, /// the unit of `align` is millisecond align: Millisecond, + align_to: i64, by: Vec>, exec_state: ExecutionState, /// Converter for the by values @@ -657,11 +697,13 @@ struct SeriesState { align_ts_accumulator: HashMap>>, } -/// According to `align`, produces a calendar-based aligned time. +/// Use `align_to` as time origin. +/// According to `align` as time interval, produces aligned time. /// Combining the parameters related to the range query, /// determine for each `Accumulator` `(hash, align_ts)` define, /// which rows of data will be applied to it. -fn align_to_calendar( +fn produce_align_time( + align_to: i64, range: Millisecond, align: Millisecond, ts_column: &TimestampMillisecondArray, @@ -672,7 +714,8 @@ fn align_to_calendar( // make modify_map for range_fn[i] for (row, hash) in by_columns_hash.iter().enumerate() { let ts = ts_column.value(row); - let mut align_ts = ((ts + align - 1) / align) * align; + let ith_slot = (ts - align_to).div_ceil(align); + let mut align_ts = ith_slot * align + align_to; while align_ts - range < ts && ts <= align_ts { modify_map .entry((*hash, align_ts)) @@ -733,7 +776,8 @@ impl RangeSelectStream { for i in 0..self.range_exec.len() { let args = self.evaluate_many(&batch, &self.range_exec[i].args)?; // use self.modify_map record (hash, align_ts) => [row_nums] - align_to_calendar( + produce_align_time( + self.align_to, self.range_exec[i].range, self.align, ts_column_ref, @@ -1065,6 +1109,7 @@ mod test { }, ], align, + align_to: 0, by: vec![Arc::new(Column::new("host", 2))], time_index: TIME_INDEX_COLUMN.to_string(), schema: schema.clone(), diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index fb1872c37580..980de4c3579e 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -13,12 +13,16 @@ // limitations under the License. use std::collections::BTreeSet; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use arrow_schema::DataType; use async_recursion::async_recursion; use catalog::table_source::DfTableSourceProvider; +use common_time::interval::NANOS_PER_MILLI; +use common_time::timestamp::TimeUnit; +use common_time::{Interval, Timestamp}; use datafusion::datasource::DefaultTableSource; use datafusion::prelude::Column; use datafusion::scalar::ScalarValue; @@ -47,25 +51,13 @@ use crate::range_select::plan::{RangeFn, RangeSelect}; pub struct RangeExprRewriter<'a> { input_plan: &'a Arc, align: Duration, + align_to: i64, by: Vec, /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice range_fn: BTreeSet, sub_aggr: &'a Aggregate, } -#[inline] -fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError { - DataFusionError::Plan( - expr.map(|x| { - format!( - "Illegal argument `{}` in range select query", - x.display_name().unwrap_or_default() - ) - }) - .unwrap_or("Missing argument in range select query".into()), - ) -} - impl<'a> RangeExprRewriter<'a> { pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult { match args.get(i) { @@ -85,6 +77,19 @@ impl<'a> RangeExprRewriter<'a> { } } +#[inline] +fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError { + DataFusionError::Plan( + expr.map(|x| { + format!( + "Illegal argument `{}` in range select query", + x.display_name().unwrap_or_default() + ) + }) + .unwrap_or("Missing argument in range select query".into()), + ) +} + fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> { match args.get(i) { Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.as_str()), @@ -92,6 +97,64 @@ fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> { } } +fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult { + match args.get(i) { + Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.to_string()), + Some(expr) => Ok(expr.display_name().unwrap_or_default()), + None => Err(dispose_parse_error(None)), + } +} + +/// Parse a duraion expr: +/// 1. duration string (e.g. `'1h'`) +/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`) +fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult { + let interval_to_duration = |interval: Interval| -> Duration { + Duration::from_millis((interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u64) + }; + match args.get(i) { + Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => { + parse_duration(str).map_err(DataFusionError::Plan) + } + Some(Expr::Literal(ScalarValue::IntervalYearMonth(Some(i)))) => { + Ok(interval_to_duration(Interval::from_i32(*i))) + } + Some(Expr::Literal(ScalarValue::IntervalDayTime(Some(i)))) => { + Ok(interval_to_duration(Interval::from_i64(*i))) + } + Some(Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(i)))) => { + Ok(interval_to_duration(Interval::from_i128(*i))) + } + other => Err(dispose_parse_error(other)), + } +} + +/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond, +/// which is used as the basis for dividing time slot during the align operation. +/// 1. NOW: align to current execute time +/// 2. CALENDAR (as Default Option): align to timestamp `0` +/// 2. Timestamp string: align to specific timestamp +fn parse_align_to(args: &[Expr], i: usize) -> DFResult { + let s = parse_str_expr(args, i)?; + let upper = s.to_uppercase(); + match upper.as_str() { + "NOW" => return Ok(Timestamp::current_millis().value()), + "CALENDAR" | "" => return Ok(0), + _ => (), + } + Timestamp::from_str(s) + .map_err(|e| { + DataFusionError::Plan(format!( + "Illegal `align to` argument `{}` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: {}", + s, e + )) + })?.convert_to(TimeUnit::Millisecond).map(|x|x.value()).ok_or(DataFusionError::Plan(format!( + "Illegal `align to` argument `{}` in range select query, can't be convert to a valid Timestamp", + s + )) + ) +} + fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult> { let mut outs = Vec::with_capacity(len); for i in start..start + len { @@ -111,21 +174,38 @@ fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult { + if $cond && $self.$name != $name { + return Err(DataFusionError::Plan( + concat!( + "Inconsistent ", + stringify!($name), + " given in Range Function Rewrite" + ) + .into(), + )); + } else { + $self.$name = $name; + } + }; +} + impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { type N = Expr; fn mutate(&mut self, node: Expr) -> DFResult { if let Expr::ScalarUDF(func) = &node { if func.fun.name == "range_fn" { - // `range_fn(func, range, fill, byc, [byv], align)` + // `range_fn(func, range, fill, byc, [byv], align, to)` // `[byv]` are variadic arguments, byc indicate the length of arguments let range_expr = self.get_range_expr(&func.args, 0)?; - let range_str = parse_str_expr(&func.args, 1)?; + let range = parse_duration_expr(&func.args, 1)?; let byc = str::parse::(parse_str_expr(&func.args, 3)?) .map_err(|e| DataFusionError::Plan(e.to_string()))?; let by = parse_expr_list(&func.args, 4, byc)?; - let align = parse_duration(parse_str_expr(&func.args, byc + 4)?) - .map_err(DataFusionError::Plan)?; + let align = parse_duration_expr(&func.args, byc + 4)?; + let align_to = parse_align_to(&func.args, byc + 5)?; let mut data_type = range_expr.get_type(self.input_plan.schema())?; let mut need_cast = false; let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?; @@ -133,30 +213,19 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { data_type = DataType::Float64; need_cast = true; } - if !self.by.is_empty() && self.by != by { - return Err(DataFusionError::Plan( - "Inconsistent by given in Range Function Rewrite".into(), - )); - } else { - self.by = by; - } - if self.align != Duration::default() && self.align != align { - return Err(DataFusionError::Plan( - "Inconsistent align given in Range Function Rewrite".into(), - )); - } else { - self.align = align; - } + inconsistent_check!(self.by, !self.by.is_empty()); + inconsistent_check!(self.align, self.align != Duration::default()); + inconsistent_check!(self.align_to, self.align_to != 0); let range_fn = RangeFn { name: format!( "{} RANGE {} FILL {}", range_expr.display_name()?, - range_str, + parse_expr_to_string(&func.args, 1)?, fill ), data_type, expr: range_expr, - range: parse_duration(range_str).map_err(DataFusionError::Plan)?, + range, fill, need_cast, }; @@ -221,6 +290,7 @@ impl RangePlanRewriter { let mut range_rewriter = RangeExprRewriter { input_plan: &input, align: Duration::default(), + align_to: 0, by: vec![], range_fn: BTreeSet::new(), sub_aggr: aggr_plan, @@ -237,6 +307,7 @@ impl RangePlanRewriter { input.clone(), range_rewriter.range_fn.into_iter().collect(), range_rewriter.align, + range_rewriter.align_to, time_index, range_rewriter.by, &new_expr, @@ -468,7 +539,7 @@ mod test { async fn range_no_project() { let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( - "RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N]\ + "RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -479,7 +550,7 @@ mod test { let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( "Projection: AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -491,7 +562,7 @@ mod test { r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#; let expected = String::from( "Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\ + \n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -502,7 +573,7 @@ mod test { let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#; let expected = String::from( "Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) [AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -515,7 +586,7 @@ mod test { "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\ \n Filter: foo > Int64(1) [foo:Float64;N]\ \n Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\ - \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -526,7 +597,7 @@ mod test { let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#; let expected = String::from( "Projection: (AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL) / Int64(4) [AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[AVG(a) RANGE 5m FILL NULL, SUM(b) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(a) RANGE 5m FILL NULL:Float64;N, SUM(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\ + \n RangeSelect: range_exprs=[AVG(a) RANGE 5m FILL NULL, SUM(b) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[c, d], time_index=timestamp [AVG(a) RANGE 5m FILL NULL:Float64;N, SUM(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\ \n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\ \n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" @@ -539,7 +610,7 @@ mod test { let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( "Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)):Float64;N]\ - \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -550,7 +621,7 @@ mod test { let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( "Projection: AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6 [AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6:Float64]\ - \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL 6], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL 6], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -561,7 +632,7 @@ mod test { let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( "Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))):Float64;N]\ - \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -572,7 +643,7 @@ mod test { let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#; let expected = String::from( "Projection: gcd(CAST(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL AS Int64), CAST(test.tag_0 AS Int64)) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * CAST(test.tag_1 AS Float64) + Int64(1) [gcd(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL,test.tag_0) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * test.tag_1 + Int64(1):Float64;N]\ - \n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600s time_index=timestamp [MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -582,7 +653,7 @@ mod test { async fn range_linear_on_integer() { let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( - "RangeSelect: range_exprs=[MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR], align=3600s time_index=timestamp [MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR:Float64;N]\ + "RangeSelect: range_exprs=[MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR:Float64;N]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -628,4 +699,68 @@ mod test { "Error during planning: Illegal argument `Int64(5)` in range select query" ) } + + #[test] + fn test_parse_duration_expr() { + let interval_to_ms = |interval: Interval| -> u128 { + (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u128 + }; + // test IntervalYearMonth + let interval = Interval::from_year_month(10); + let args = vec![Expr::Literal(ScalarValue::IntervalYearMonth(Some( + interval.to_i32(), + )))]; + assert_eq!( + parse_duration_expr(&args, 0).unwrap().as_millis(), + interval_to_ms(interval) + ); + // test IntervalDayTime + let interval = Interval::from_day_time(10, 10); + let args = vec![Expr::Literal(ScalarValue::IntervalDayTime(Some( + interval.to_i64(), + )))]; + assert_eq!( + parse_duration_expr(&args, 0).unwrap().as_millis(), + interval_to_ms(interval) + ); + // test IntervalMonthDayNano + let interval = Interval::from_month_day_nano(10, 10, 10); + let args = vec![Expr::Literal(ScalarValue::IntervalMonthDayNano(Some( + interval.to_i128(), + )))]; + assert_eq!( + parse_duration_expr(&args, 0).unwrap().as_millis(), + interval_to_ms(interval) + ); + // test Duration + let args = vec![Expr::Literal(ScalarValue::Utf8(Some("1y4w".into())))]; + assert_eq!( + parse_duration_expr(&args, 0).unwrap(), + parse_duration("1y4w").unwrap() + ); + // test err + assert!(parse_duration_expr(&args, 10).is_err()); + } + + #[test] + fn test_parse_align_to() { + // test NOW + let args = vec![Expr::Literal(ScalarValue::Utf8(Some("NOW".into())))]; + let epsinon = parse_align_to(&args, 0).unwrap() - Timestamp::current_millis().value(); + assert!(epsinon.abs() < 100); + // test CALENDAR + let args = vec![ + Expr::Literal(ScalarValue::Utf8(Some("".into()))), + Expr::Literal(ScalarValue::Utf8(Some("CALENDAR".into()))), + ]; + assert!( + parse_align_to(&args, 0).unwrap() == parse_align_to(&args, 1).unwrap() + && parse_align_to(&args, 0).unwrap() == 0 + ); + // test CALENDAR + let args = vec![Expr::Literal(ScalarValue::Utf8(Some( + "1970-01-01T00:00:00+08:00".into(), + )))]; + assert!(parse_align_to(&args, 0).unwrap() == -8 * 60 * 60 * 1000); + } } diff --git a/tests/cases/standalone/common/range/error.result b/tests/cases/standalone/common/range/error.result index 556e14846aff..cb86b0c46293 100644 --- a/tests/cases/standalone/common/range/error.result +++ b/tests/cases/standalone/common/range/error.result @@ -81,6 +81,23 @@ SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0; Error: 3000(PlanQuery), DataFusion error: Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type } +-- 2.7 zero align/range +SELECT min(val) RANGE '5s' FROM host ALIGN '0s'; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must be greater than 0 + +SELECT min(val) RANGE '0s' FROM host ALIGN '5s'; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must be greater than 0 + +SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day); + +Error: 2000(InvalidSyntax), Range Query: Can't use 0 as align in Range Query + +SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s'; + +Error: 2000(InvalidSyntax), Range Query: Invalid Range expr `MIN(host.val) RANGE IntervalMonthDayNano("0") FILL NULL`, Can't use 0 as range in Range Query + DROP TABLE host; Affected Rows: 0 diff --git a/tests/cases/standalone/common/range/error.sql b/tests/cases/standalone/common/range/error.sql index 86ceda4ea1ab..cda3569f1fe5 100644 --- a/tests/cases/standalone/common/range/error.sql +++ b/tests/cases/standalone/common/range/error.sql @@ -58,4 +58,14 @@ SELECT min(val) RANGE '5s', min(val) RANGE '5s' FILL NULL FROM host ALIGN '5s'; SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0; +-- 2.7 zero align/range + +SELECT min(val) RANGE '5s' FROM host ALIGN '0s'; + +SELECT min(val) RANGE '0s' FROM host ALIGN '5s'; + +SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day); + +SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s'; + DROP TABLE host; diff --git a/tests/cases/standalone/common/range/interval.result b/tests/cases/standalone/common/range/interval.result new file mode 100644 index 000000000000..bbc524555dce --- /dev/null +++ b/tests/cases/standalone/common/range/interval.result @@ -0,0 +1,46 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + ("1970-01-01T01:00:00+08:00", 'host1', 0), + ("1970-01-01T02:00:00+08:00", 'host1', 1), + ("1971-01-02T03:00:00+08:00", 'host1', 2), + ("1971-01-02T04:00:00+08:00", 'host1', 3), + ("1970-01-01T01:00:00+08:00", 'host2', 4), + ("1970-01-01T02:00:00+08:00", 'host2', 5), + ("1971-01-02T03:00:00+08:00", 'host2', 6), + ("1971-01-02T04:00:00+08:00", 'host2', 7); + +Affected Rows: 8 + +SELECT ts, host, min(val) RANGE (INTERVAL '1 year') FROM host ALIGN (INTERVAL '1 year') ORDER BY host, ts; + ++---------------------+-------+--------------------------------------------------------------------------------------+ +| ts | host | MIN(host.val) RANGE IntervalMonthDayNano("950737950171172051122527404032") FILL NULL | ++---------------------+-------+--------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1971-12-22T00:00:00 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 4 | +| 1971-12-22T00:00:00 | host2 | 6 | ++---------------------+-------+--------------------------------------------------------------------------------------+ + +SELECT ts, host, min(val) RANGE (INTERVAL '1' year) FROM host ALIGN (INTERVAL '1' year) ORDER BY host, ts; + ++---------------------+-------+--------------------------------------------------------------------------------------+ +| ts | host | MIN(host.val) RANGE IntervalMonthDayNano("950737950171172051122527404032") FILL NULL | ++---------------------+-------+--------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1971-12-22T00:00:00 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 4 | +| 1971-12-22T00:00:00 | host2 | 6 | ++---------------------+-------+--------------------------------------------------------------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/interval.sql b/tests/cases/standalone/common/range/interval.sql new file mode 100644 index 000000000000..cae339a7a570 --- /dev/null +++ b/tests/cases/standalone/common/range/interval.sql @@ -0,0 +1,21 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + ("1970-01-01T01:00:00+08:00", 'host1', 0), + ("1970-01-01T02:00:00+08:00", 'host1', 1), + ("1971-01-02T03:00:00+08:00", 'host1', 2), + ("1971-01-02T04:00:00+08:00", 'host1', 3), + ("1970-01-01T01:00:00+08:00", 'host2', 4), + ("1970-01-01T02:00:00+08:00", 'host2', 5), + ("1971-01-02T03:00:00+08:00", 'host2', 6), + ("1971-01-02T04:00:00+08:00", 'host2', 7); + +SELECT ts, host, min(val) RANGE (INTERVAL '1 year') FROM host ALIGN (INTERVAL '1 year') ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE (INTERVAL '1' year) FROM host ALIGN (INTERVAL '1' year) ORDER BY host, ts; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 7675b8384c06..bf749adc8171 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -55,9 +55,9 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| RangeSelect: range_exprs=[MIN(host.val) RANGE 5s FILL NULL], align=5s time_index=ts_| +| logical_plan_| RangeSelect: range_exprs=[MIN(host.val) RANGE 5s FILL NULL], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts | |_|_MergeScan [is_placeholder=false]_| -| physical_plan | RangeSelectExec: range_expr=[RangeFnExec{ MIN(host.val), range: 5000}], align=5000, time_index=ts, by=[host@1] | +| physical_plan | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s FILL NULL], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts | |_|_MergeScanExec: REDACTED |_|_| +-+-+ @@ -71,7 +71,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; +-+-+ | plan_type_| plan_| +-+-+ -| Plan with Metrics | RangeSelectExec: range_expr=[RangeFnExec{ MIN(host.val), range: 5000}], align=5000, time_index=ts, by=[host@1], REDACTED +| Plan with Metrics | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s FILL NULL], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts, REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/standalone/common/range/to.result b/tests/cases/standalone/common/range/to.result new file mode 100644 index 000000000000..2666cfe45085 --- /dev/null +++ b/tests/cases/standalone/common/range/to.result @@ -0,0 +1,99 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + ("1970-01-01T23:30:00+00:00", 'host1', 0), + ("1970-01-01T22:30:00+00:00", 'host1', 1), + ("1970-01-02T23:30:00+00:00", 'host1', 2), + ("1970-01-02T22:30:00+00:00", 'host1', 3), + ("1970-01-01T23:30:00+00:00", 'host2', 4), + ("1970-01-01T22:30:00+00:00", 'host2', 5), + ("1970-01-02T23:30:00+00:00", 'host2', 6), + ("1970-01-02T22:30:00+00:00", 'host2', 7); + +Affected Rows: 8 + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 1d FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-02T00:00:00 | host1 | 0 | +| 1970-01-03T00:00:00 | host1 | 2 | +| 1970-01-02T00:00:00 | host2 | 4 | +| 1970-01-03T00:00:00 | host2 | 6 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO CALENDAR ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 1d FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-02T00:00:00 | host1 | 0 | +| 1970-01-03T00:00:00 | host1 | 2 | +| 1970-01-02T00:00:00 | host2 | 4 | +| 1970-01-03T00:00:00 | host2 | 6 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO UNKNOWN ORDER BY host, ts; + +Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal `align to` argument `UNKNOWN` in range select query, can't be parse as NOW/CALENDAR/Timestamp, error: Failed to parse a string into Timestamp, raw string: UNKNOWN + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '1900-01-01T00:00:00+01:00' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 1d FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T23:00:00 | host1 | 1 | +| 1970-01-02T23:00:00 | host1 | 0 | +| 1970-01-03T23:00:00 | host1 | 2 | +| 1970-01-01T23:00:00 | host2 | 5 | +| 1970-01-02T23:00:00 | host2 | 4 | +| 1970-01-03T23:00:00 | host2 | 6 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '1970-01-01T00:00:00+01:00' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 1d FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T23:00:00 | host1 | 1 | +| 1970-01-02T23:00:00 | host1 | 0 | +| 1970-01-03T23:00:00 | host1 | 2 | +| 1970-01-01T23:00:00 | host2 | 5 | +| 1970-01-02T23:00:00 | host2 | 4 | +| 1970-01-03T23:00:00 | host2 | 6 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '2023-01-01T00:00:00+01:00' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 1d FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T23:00:00 | host1 | 1 | +| 1970-01-02T23:00:00 | host1 | 0 | +| 1970-01-03T23:00:00 | host1 | 2 | +| 1970-01-01T23:00:00 | host2 | 5 | +| 1970-01-02T23:00:00 | host2 | 4 | +| 1970-01-03T23:00:00 | host2 | 6 | ++---------------------+-------+----------------------------------+ + +SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + ++---------------------+----------------------------------------------------------------------------+ +| ts | MIN(host.val) RANGE IntervalMonthDayNano("18446744073709551616") FILL NULL | ++---------------------+----------------------------------------------------------------------------+ +| 1970-01-01T23:00:00 | 1 | +| 1970-01-02T23:00:00 | 0 | +| 1970-01-03T23:00:00 | 2 | ++---------------------+----------------------------------------------------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/to.sql b/tests/cases/standalone/common/range/to.sql new file mode 100644 index 000000000000..2ec32e3c2e26 --- /dev/null +++ b/tests/cases/standalone/common/range/to.sql @@ -0,0 +1,31 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + ("1970-01-01T23:30:00+00:00", 'host1', 0), + ("1970-01-01T22:30:00+00:00", 'host1', 1), + ("1970-01-02T23:30:00+00:00", 'host1', 2), + ("1970-01-02T22:30:00+00:00", 'host1', 3), + ("1970-01-01T23:30:00+00:00", 'host2', 4), + ("1970-01-01T22:30:00+00:00", 'host2', 5), + ("1970-01-02T23:30:00+00:00", 'host2', 6), + ("1970-01-02T22:30:00+00:00", 'host2', 7); + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO CALENDAR ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO UNKNOWN ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '1900-01-01T00:00:00+01:00' ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '1970-01-01T00:00:00+01:00' ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '1d' FROM host ALIGN '1d' TO '2023-01-01T00:00:00+01:00' ORDER BY host, ts; + +SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts; + +DROP TABLE host;