From 46e106bcc3a09ad8a8186d139a53a273384b8278 Mon Sep 17 00:00:00 2001 From: WU Jingdi Date: Wed, 18 Oct 2023 02:03:26 -0500 Subject: [PATCH] feat: allow nest range expr in Range Query (#2557) * feat: eable range expr nest * fix: change range expr rewrite format * chore: organize range query tests * chore: change range expr name(e.g. MAX(v) RANGE 5s FILL 6) * chore: add range query test * chore: fix code advice * chore: fix ca --- Cargo.lock | 6 +- Cargo.toml | 2 +- src/common/function/src/scalars/math.rs | 12 +- src/query/src/error.rs | 7 +- src/query/src/planner.rs | 2 +- src/query/src/range_select/plan.rs | 642 ++++++++++++++---- src/query/src/range_select/plan_rewrite.rs | 314 ++++++--- src/servers/src/mysql/writer.rs | 10 +- tests/cases/standalone/common/range/by.result | 54 ++ tests/cases/standalone/common/range/by.sql | 27 + .../standalone/common/range/calculate.result | 194 ++++++ .../standalone/common/range/calculate.sql | 43 ++ .../standalone/common/range/error.result | 82 +++ tests/cases/standalone/common/range/error.sql | 59 ++ .../cases/standalone/common/range/fill.result | 112 +++ tests/cases/standalone/common/range/fill.sql | 31 + .../cases/standalone/common/range/nest.result | 51 ++ tests/cases/standalone/common/range/nest.sql | 25 + .../standalone/common/range/precisions.result | 44 ++ .../standalone/common/range/precisions.sql | 23 + .../common/select/range_select.result | 324 --------- .../standalone/common/select/range_select.sql | 96 --- 22 files changed, 1499 insertions(+), 661 deletions(-) create mode 100644 tests/cases/standalone/common/range/by.result create mode 100644 tests/cases/standalone/common/range/by.sql create mode 100644 tests/cases/standalone/common/range/calculate.result create mode 100644 tests/cases/standalone/common/range/calculate.sql create mode 100644 tests/cases/standalone/common/range/error.result create mode 100644 tests/cases/standalone/common/range/error.sql create mode 100644 tests/cases/standalone/common/range/fill.result create mode 100644 tests/cases/standalone/common/range/fill.sql create mode 100644 tests/cases/standalone/common/range/nest.result create mode 100644 tests/cases/standalone/common/range/nest.sql create mode 100644 tests/cases/standalone/common/range/precisions.result create mode 100644 tests/cases/standalone/common/range/precisions.sql delete mode 100644 tests/cases/standalone/common/select/range_select.result delete mode 100644 tests/cases/standalone/common/select/range_select.sql diff --git a/Cargo.lock b/Cargo.lock index e9b0ffd779b7..507fbf6601a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9266,13 +9266,13 @@ dependencies = [ [[package]] name = "sqlparser" version = "0.34.0" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4#296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da#6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da" dependencies = [ "lazy_static", "log", "regex", "sqlparser 0.35.0", - "sqlparser_derive 0.1.1 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4)", + "sqlparser_derive 0.1.1 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da)", ] [[package]] @@ -9299,7 +9299,7 @@ dependencies = [ [[package]] name = "sqlparser_derive" version = "0.1.1" -source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4#296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4" +source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da#6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index cfb8eb0b06e5..49f275710d4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smallvec = "1" snafu = { version = "0.7", features = ["backtraces"] } -sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4", features = [ +sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6cf9d23d5b8fbecd65efc1d9afb7e80ad7a424da", features = [ "visitor", ] } strum = { version = "0.25", features = ["derive"] } diff --git a/src/common/function/src/scalars/math.rs b/src/common/function/src/scalars/math.rs index cf68b8ff3733..b38bf553b804 100644 --- a/src/common/function/src/scalars/math.rs +++ b/src/common/function/src/scalars/math.rs @@ -58,9 +58,15 @@ impl Function for RangeFunction { "range_fn" } - // range_fn will never been used, return_type could be arbitrary value, is not important - fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result { - Ok(ConcreteDataType::float64_datatype()) + // The first argument to range_fn is the expression to be evaluated + fn return_type(&self, input_types: &[ConcreteDataType]) -> Result { + input_types + .first() + .cloned() + .ok_or(DataFusionError::Internal( + "No expr found in range_fn".into(), + )) + .context(GeneralDataFusionSnafu) } /// `range_fn` will never been used. As long as a legal signature is returned, the specific content of the signature does not matter. diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 5449ece0b1f7..ff2842f20820 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -274,6 +274,9 @@ pub enum Error { #[snafu(display("Missing table mutation handler"))] MissingTableMutationHandler { location: Location }, + + #[snafu(display("Range Query: {}", msg))] + RangeQuery { msg: String, location: Location }, } impl ErrorExt for Error { @@ -281,7 +284,9 @@ impl ErrorExt for Error { use Error::*; match self { - QueryParse { .. } | MultipleStatements { .. } => StatusCode::InvalidSyntax, + QueryParse { .. } | MultipleStatements { .. } | RangeQuery { .. } => { + StatusCode::InvalidSyntax + } UnsupportedExpr { .. } | Unimplemented { .. } | CatalogNotFound { .. } diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 5fe7949948d9..20749446ec96 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -79,7 +79,7 @@ impl DfLogicalPlanner { let result = sql_to_rel .statement_to_plan(df_stmt) .context(PlanSqlSnafu)?; - let plan = RangePlanRewriter::new(table_provider, context_provider) + let plan = RangePlanRewriter::new(table_provider) .rewrite(result) .await?; Ok(LogicalPlan::DfPlan(plan)) diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index 299d2949a593..601161c529ce 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -21,7 +21,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use ahash::RandomState; -use arrow::compute; +use arrow::compute::{self, cast_with_options, CastOptions}; use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use common_query::DfPhysicalPlan; use common_recordbatch::DfSendableRecordBatchStream; @@ -33,6 +33,7 @@ use datafusion::physical_plan::udaf::create_aggregate_expr as create_aggr_udf_ex use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; +use datafusion::physical_planner::create_physical_sort_expr; use datafusion_common::utils::get_arrayref_at_indices; use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, ScalarValue}; use datafusion_expr::utils::exprlist_to_fields; @@ -54,22 +55,135 @@ use crate::error::{DataFusionSnafu, Result}; type Millisecond = ::Native; -#[derive(PartialEq, Eq, Hash, Clone, Debug)] +#[derive(PartialEq, Eq, Debug, Hash, Clone)] +pub enum Fill { + Null, + Prev, + Linear, + Const(ScalarValue), +} + +impl Display for Fill { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Fill::Null => write!(f, "NULL"), + Fill::Prev => write!(f, "PREV"), + Fill::Linear => write!(f, "LINEAR"), + Fill::Const(x) => write!(f, "{}", x), + } + } +} + +impl Fill { + pub fn try_from_str(value: &str, datatype: &DataType) -> DfResult { + let s = value.to_uppercase(); + match s.as_str() { + "NULL" | "" => Ok(Self::Null), + "PREV" => Ok(Self::Prev), + "LINEAR" => { + if datatype.is_numeric() { + Ok(Self::Linear) + } else { + Err(DataFusionError::Plan(format!( + "Use FILL LINEAR on Non-numeric DataType {}", + datatype + ))) + } + } + _ => ScalarValue::try_from_string(s.clone(), datatype) + .map_err(|err| { + DataFusionError::Plan(format!( + "{} is not a valid fill option, fail to convert to a const value. {{ {} }}", + s, err + )) + }) + .map(Fill::Const), + } + } + + /// The input `data` contains data on a complete time series. + /// If the filling strategy is `PREV` or `LINEAR`, caller must be ensured that the incoming `data` is ascending time order. + pub fn apply_fill_strategy(&self, data: &mut [ScalarValue]) -> DfResult<()> { + let len = data.len(); + for i in 0..len { + if data[i].is_null() { + match self { + Fill::Null => continue, + Fill::Prev => { + if i != 0 { + data[i] = data[i - 1].clone() + } + } + Fill::Linear => { + if 0 < i && i < len - 1 { + match (&data[i - 1], &data[i + 1]) { + (ScalarValue::Float64(Some(a)), ScalarValue::Float64(Some(b))) => { + data[i] = ScalarValue::Float64(Some((a + b) / 2.0)); + } + (ScalarValue::Float32(Some(a)), ScalarValue::Float32(Some(b))) => { + data[i] = ScalarValue::Float32(Some((a + b) / 2.0)); + } + (a, b) => { + if !a.is_null() && !b.is_null() { + return Err(DataFusionError::Execution( + "RangePlan: Apply Fill LINEAR strategy on Non-floating type".to_string())); + } else { + continue; + } + } + } + } + } + Fill::Const(v) => data[i] = v.clone(), + } + } + } + Ok(()) + } +} + +#[derive(Eq, Clone, Debug)] pub struct RangeFn { + /// with format like `max(a) 300s null` + pub name: String, + pub data_type: DataType, pub expr: Expr, pub range: Duration, - pub fill: String, + pub fill: Fill, + /// If the `FIll` strategy is `Linear` and the output is an integer, + /// it is possible to calculate a floating point number. + /// So for `FILL==LINEAR`, the entire data will be implicitly converted to Float type + /// If `need_cast==true`, `data_type` may not consist with type `expr` generated. + pub need_cast: bool, +} + +impl PartialEq for RangeFn { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} + +impl PartialOrd for RangeFn { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for RangeFn { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.name.cmp(&other.name) + } +} + +impl std::hash::Hash for RangeFn { + fn hash(&self, state: &mut H) { + self.name.hash(state); + } } impl Display for RangeFn { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "RangeFn {{ expr:{} range:{}s fill:{} }}", - self.expr.display_name().unwrap_or("?".into()), - self.range.as_secs(), - self.fill, - ) + write!(f, "{}", self.name) } } @@ -105,16 +219,21 @@ impl RangeSelect { ) -> Result { let mut fields = range_expr .iter() - .map(|RangeFn { expr, .. }| { - Ok(DFField::new_unqualified( - &expr.display_name()?, - expr.get_type(input.schema())?, - // TODO(Taylor-lagrange): We have not implemented fill currently, - // it is possible that some columns may not be able to aggregate data, - // so we temporarily set that all data is nullable - true, - )) - }) + .map( + |RangeFn { + name, + data_type, + fill, + .. + }| { + Ok(DFField::new_unqualified( + name, + data_type.clone(), + // Only when data fill with Const option, the data can't be null + !matches!(fill, Fill::Const(..)), + )) + }, + ) .collect::>>() .context(DataFusionSnafu)?; // add align_ts @@ -135,10 +254,8 @@ impl RangeSelect { DFSchema::new_with_metadata(by_fields, input.schema().metadata().clone()) .context(DataFusionSnafu)?, ); - // If the result of the project plan happens to be the schema of the range plan, no project plan is required - // that need project is identical to range plan schema. - // 1. all exprs in project must belong to range schema - // 2. range schema and project exprs must have same size + // If the results of project plan can be obtained directly from range plan without any additional calculations, no project plan is required. + // We can simply project the final output of the range plan to produce the final result. let schema_project = projection_expr .iter() .map(|project_expr| { @@ -268,52 +385,68 @@ impl RangeSelect { .range_expr .iter() .map(|range_fn| { - let (expr, args) = match &range_fn.expr { + let expr = match &range_fn.expr { Expr::AggregateFunction(aggr) => { - let args = self.create_physical_expr_list( - &aggr.args, - input_dfschema, - &input_schema, - session_state, - )?; - Ok(( - create_aggr_expr( - &aggr.fun, - false, - &args, - &[], + let order_by = if let Some(exprs) = &aggr.order_by { + exprs + .iter() + .map(|x| { + create_physical_sort_expr( + x, + input_dfschema, + &input_schema, + session_state.execution_props(), + ) + }) + .collect::>>()? + } else { + vec![] + }; + let expr = create_aggr_expr( + &aggr.fun, + false, + &self.create_physical_expr_list( + &aggr.args, + input_dfschema, &input_schema, - range_fn.expr.display_name()?, + session_state, )?, - args, - )) - } - Expr::AggregateUDF(aggr_udf) => { - let args = self.create_physical_expr_list( - &aggr_udf.args, - input_dfschema, + &order_by, &input_schema, - session_state, + range_fn.expr.display_name()?, )?; - Ok(( - create_aggr_udf_expr( - &aggr_udf.fun, - &args, + Ok(expr) + } + Expr::AggregateUDF(aggr_udf) => { + let expr = create_aggr_udf_expr( + &aggr_udf.fun, + &self.create_physical_expr_list( + &aggr_udf.args, + input_dfschema, &input_schema, - range_fn.expr.display_name()?, + session_state, )?, - args, - )) + &input_schema, + range_fn.expr.display_name()?, + )?; + Ok(expr) } _ => Err(DataFusionError::Plan(format!( "Unexpected Expr:{} in RangeSelect", range_fn.expr.display_name()? ))), }?; + let args = expr.expressions(); Ok(RangeFnExec { expr, args, range: range_fn.range.as_millis() as Millisecond, + fill: range_fn.fill.clone(), + need_cast: if range_fn.need_cast { + Some(range_fn.data_type.clone()) + } else { + None + }, }) }) .collect::>>()?; @@ -348,6 +481,8 @@ struct RangeFnExec { pub expr: Arc, pub args: Vec>, pub range: Millisecond, + pub fill: Fill, + pub need_cast: Option, } #[derive(Debug)] @@ -540,6 +675,15 @@ fn align_to_calendar( } } +fn cast_scalar_values(values: &mut [ScalarValue], data_type: &DataType) -> DfResult<()> { + let array = ScalarValue::iter_to_array(values.to_vec())?; + let cast_array = cast_with_options(&array, data_type, &CastOptions::default())?; + for (i, value) in values.iter_mut().enumerate() { + *value = ScalarValue::try_from_array(&cast_array, i)?; + } + Ok(()) +} + impl RangeSelectStream { fn evaluate_many( &self, @@ -648,20 +792,57 @@ impl RangeSelectStream { let mut columns: Vec> = Vec::with_capacity(1 + self.range_exec.len() + self.by.len()); let mut ts_builder = TimestampMillisecondBuilder::with_capacity(self.output_num_rows); - let mut all_scalar = vec![vec![]; self.range_exec.len()]; + let mut all_scalar = vec![Vec::with_capacity(self.output_num_rows); self.range_exec.len()]; let mut by_rows = Vec::with_capacity(self.output_num_rows); + let mut start_index = 0; + // RangePlan is calculated on a row basis. If a column uses the PREV or LINEAR filling strategy, + // we must arrange the data in the entire data row to determine the NULL filling value. + let need_sort_output = self + .range_exec + .iter() + .any(|range| range.fill == Fill::Linear || range.fill == Fill::Prev); for SeriesState { row, align_ts_accumulator, } in self.series_map.values() { - for (ts, accumulators) in align_ts_accumulator { - for (i, accumulator) in accumulators.iter().enumerate() { - all_scalar[i].push(accumulator.evaluate()?); + // collect data on time series + if !need_sort_output { + for (ts, accumulators) in align_ts_accumulator { + for (i, accumulator) in accumulators.iter().enumerate() { + all_scalar[i].push(accumulator.evaluate()?); + } + ts_builder.append_value(*ts); } - by_rows.push(row.row()); - ts_builder.append_value(*ts); + } else { + let mut keys = align_ts_accumulator.keys().copied().collect::>(); + keys.sort(); + for key in &keys { + for (i, accumulator) in + align_ts_accumulator.get(key).unwrap().iter().enumerate() + { + all_scalar[i].push(accumulator.evaluate()?); + } + } + ts_builder.append_slice(&keys); + } + // apply fill strategy on time series + for ( + i, + RangeFnExec { + fill, need_cast, .. + }, + ) in self.range_exec.iter().enumerate() + { + let time_series_data = + &mut all_scalar[i][start_index..start_index + align_ts_accumulator.len()]; + if let Some(data_type) = need_cast { + cast_scalar_values(time_series_data, data_type)?; + } + fill.apply_fill_strategy(time_series_data)?; } + by_rows.resize(by_rows.len() + align_ts_accumulator.len(), row.row()); + start_index += align_ts_accumulator.len(); } for column_scalar in all_scalar { columns.push(ScalarValue::iter_to_array(column_scalar)?); @@ -720,15 +901,15 @@ impl Stream for RangeSelectStream { } ExecutionState::ProducingOutput => { let result = self.generate_output(); - match result { + return match result { // made output Ok(batch) => { self.exec_state = ExecutionState::Done; - return Poll::Ready(Some(Ok(batch))); + Poll::Ready(Some(Ok(batch))) } // error making output - Err(error) => return Poll::Ready(Some(Err(error))), - } + Err(error) => Poll::Ready(Some(Err(error))), + }; } ExecutionState::Done => return Poll::Ready(None), } @@ -738,6 +919,34 @@ impl Stream for RangeSelectStream { #[cfg(test)] mod test { + macro_rules! nullable_array { + ($builder:ident,) => { + }; + ($array_type:ident ; $($tail:tt)*) => { + paste::item! { + { + let mut builder = arrow::array::[<$array_type Builder>]::new(); + nullable_array!(builder, $($tail)*); + builder.finish() + } + } + }; + ($builder:ident, null) => { + $builder.append_null(); + }; + ($builder:ident, null, $($tail:tt)*) => { + $builder.append_null(); + nullable_array!($builder, $($tail)*); + }; + ($builder:ident, $value:literal) => { + $builder.append_value($value); + }; + ($builder:ident, $value:literal, $($tail:tt)*) => { + $builder.append_value($value); + nullable_array!($builder, $($tail)*); + }; + } + use arrow_schema::SortOptions; use datafusion::arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType, @@ -747,33 +956,45 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_physical_expr::expressions::{self, Column}; use datafusion_physical_expr::PhysicalSortExpr; - use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray}; + use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow_array::StringArray; use super::*; const TIME_INDEX_COLUMN: &str = "timestamp"; - fn prepare_test_data() -> MemoryExec { + fn prepare_test_data(is_float: bool) -> MemoryExec { let schema = Arc::new(Schema::new(vec![ Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), - Field::new("value", DataType::Int64, true), + Field::new( + "value", + if is_float { + DataType::Float64 + } else { + DataType::Int64 + }, + true, + ), Field::new("host", DataType::Utf8, true), ])); - let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![ - // host 1 every 5s - 0, 5_000, 10_000, 15_000, 20_000, 25_000, 30_000, 35_000, 40_000, - // host 2 every 5s - 0, 5_000, 10_000, 15_000, 20_000, 25_000, 30_000, 35_000, 40_000, + let timestamp_column: Arc = Arc::new(TimestampMillisecondArray::from(vec![ + 0, 5_000, 10_000, 15_000, 20_000, // host 1 every 5s + 0, 5_000, 10_000, 15_000, 20_000, // host 2 every 5s ])) as _; - let values = vec![ - 0, 1, 2, 3, 4, 5, 6, 7, 8, // data for host 1 - 9, 10, 11, 12, 13, 14, 15, 16, 17, // data for host 2 - ]; - let mut host = vec!["host1"; 9]; - host.extend(vec!["host2"; 9]); - let value_column = Arc::new(Int64Array::from(values)) as _; - let host_column = Arc::new(StringArray::from(host)) as _; + let mut host = vec!["host1"; 5]; + host.extend(vec!["host2"; 5]); + let value_column: Arc = if is_float { + Arc::new(nullable_array!(Float64; + 0.0, null, 1.0, null, 2.0, // data for host 1 + 3.0, null, 4.0, null, 5.0 // data for host 2 + )) as _ + } else { + Arc::new(nullable_array!(Int64; + 0, null, 1, null, 2, // data for host 1 + 3, null, 4, null, 5 // data for host 2 + )) as _ + }; + let host_column: Arc = Arc::new(StringArray::from(host)) as _; let data = RecordBatch::try_new( schema.clone(), vec![timestamp_column, value_column, host_column], @@ -787,12 +1008,25 @@ mod test { range1: Millisecond, range2: Millisecond, align: Millisecond, + fill: Fill, + is_float: bool, expected: String, ) { - let memory_exec = Arc::new(prepare_test_data()); + let data_type = if is_float { + DataType::Float64 + } else { + DataType::Int64 + }; + let (need_cast, schema_data_type) = if !is_float && fill == Fill::Linear { + // data_type = DataType::Float64; + (Some(DataType::Float64), DataType::Float64) + } else { + (None, data_type.clone()) + }; + let memory_exec = Arc::new(prepare_test_data(is_float)); let schema = Arc::new(Schema::new(vec![ - Field::new("MIN(value)", DataType::Int64, true), - Field::new("MAX(value)", DataType::Int64, true), + Field::new("MIN(value)", schema_data_type.clone(), true), + Field::new("MAX(value)", schema_data_type, true), Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), Field::new("host", DataType::Utf8, true), ])); @@ -803,19 +1037,23 @@ mod test { expr: Arc::new(expressions::Min::new( Arc::new(Column::new("value", 1)), "MIN(value)", - DataType::Int64, + data_type.clone(), )), args: vec![Arc::new(Column::new("value", 1))], range: range1, + fill: fill.clone(), + need_cast: need_cast.clone(), }, RangeFnExec { expr: Arc::new(expressions::Max::new( Arc::new(Column::new("value", 1)), "MAX(value)", - DataType::Int64, + data_type, )), args: vec![Arc::new(Column::new("value", 1))], range: range2, + fill, + need_cast, }, ], align, @@ -852,7 +1090,7 @@ mod test { .await .unwrap(); - let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + let result_literal = arrow::util::pretty::pretty_format_batches(&result) .unwrap() .to_string(); @@ -860,77 +1098,217 @@ mod test { } #[tokio::test] - async fn range_10s_align_5s() { + async fn range_10s_align_1000s() { let expected = String::from( "+------------+------------+---------------------+-------+\ \n| MIN(value) | MAX(value) | timestamp | host |\ \n+------------+------------+---------------------+-------+\ - \n| 0 | 0 | 1970-01-01T00:00:00 | host1 |\ - \n| 0 | 1 | 1970-01-01T00:00:05 | host1 |\ - \n| 1 | 2 | 1970-01-01T00:00:10 | host1 |\ - \n| 2 | 3 | 1970-01-01T00:00:15 | host1 |\ - \n| 3 | 4 | 1970-01-01T00:00:20 | host1 |\ - \n| 4 | 5 | 1970-01-01T00:00:25 | host1 |\ - \n| 5 | 6 | 1970-01-01T00:00:30 | host1 |\ - \n| 6 | 7 | 1970-01-01T00:00:35 | host1 |\ - \n| 7 | 8 | 1970-01-01T00:00:40 | host1 |\ - \n| 8 | 8 | 1970-01-01T00:00:45 | host1 |\ - \n| 9 | 9 | 1970-01-01T00:00:00 | host2 |\ - \n| 9 | 10 | 1970-01-01T00:00:05 | host2 |\ - \n| 10 | 11 | 1970-01-01T00:00:10 | host2 |\ - \n| 11 | 12 | 1970-01-01T00:00:15 | host2 |\ - \n| 12 | 13 | 1970-01-01T00:00:20 | host2 |\ - \n| 13 | 14 | 1970-01-01T00:00:25 | host2 |\ - \n| 14 | 15 | 1970-01-01T00:00:30 | host2 |\ - \n| 15 | 16 | 1970-01-01T00:00:35 | host2 |\ - \n| 16 | 17 | 1970-01-01T00:00:40 | host2 |\ - \n| 17 | 17 | 1970-01-01T00:00:45 | host2 |\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ \n+------------+------------+---------------------+-------+", ); - do_range_select_test(10_000, 10_000, 5_000, expected).await; + do_range_select_test(10_000, 10_000, 1_000_000, Fill::Null, true, expected).await; } #[tokio::test] - async fn range_10s_align_1000s() { + async fn range_fill_null() { let expected = String::from( "+------------+------------+---------------------+-------+\ \n| MIN(value) | MAX(value) | timestamp | host |\ \n+------------+------------+---------------------+-------+\ - \n| 0 | 0 | 1970-01-01T00:00:00 | host1 |\ - \n| 9 | 9 | 1970-01-01T00:00:00 | host2 |\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 0.0 | | 1970-01-01T00:00:05 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ + \n| 1.0 | | 1970-01-01T00:00:15 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ + \n| 2.0 | | 1970-01-01T00:00:25 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ + \n| 3.0 | | 1970-01-01T00:00:05 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ + \n| 4.0 | | 1970-01-01T00:00:15 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ + \n| 5.0 | | 1970-01-01T00:00:25 | host2 |\ \n+------------+------------+---------------------+-------+", ); - do_range_select_test(10_000, 10_000, 1_000_000, expected).await; + do_range_select_test(10_000, 5_000, 5_000, Fill::Null, true, expected).await; } #[tokio::test] - async fn range_10s_5s_align_5s() { + async fn range_fill_prev() { + let expected = String::from( + "+------------+------------+---------------------+-------+\ + \n| MIN(value) | MAX(value) | timestamp | host |\ + \n+------------+------------+---------------------+-------+\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:05 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:15 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:25 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:05 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:15 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:25 | host2 |\ + \n+------------+------------+---------------------+-------+", + ); + do_range_select_test(10_000, 5_000, 5_000, Fill::Prev, true, expected).await; + } + + #[tokio::test] + async fn range_fill_linear() { + let expected = String::from( + "+------------+------------+---------------------+-------+\ + \n| MIN(value) | MAX(value) | timestamp | host |\ + \n+------------+------------+---------------------+-------+\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 0.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ + \n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ + \n| 2.0 | | 1970-01-01T00:00:25 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ + \n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ + \n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ + \n| 5.0 | | 1970-01-01T00:00:25 | host2 |\ + \n+------------+------------+---------------------+-------+", + ); + do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, true, expected).await; + } + + #[tokio::test] + async fn range_fill_integer_null() { let expected = String::from( "+------------+------------+---------------------+-------+\ \n| MIN(value) | MAX(value) | timestamp | host |\ \n+------------+------------+---------------------+-------+\ \n| 0 | 0 | 1970-01-01T00:00:00 | host1 |\ - \n| 0 | 1 | 1970-01-01T00:00:05 | host1 |\ - \n| 1 | 2 | 1970-01-01T00:00:10 | host1 |\ - \n| 2 | 3 | 1970-01-01T00:00:15 | host1 |\ - \n| 3 | 4 | 1970-01-01T00:00:20 | host1 |\ - \n| 4 | 5 | 1970-01-01T00:00:25 | host1 |\ - \n| 5 | 6 | 1970-01-01T00:00:30 | host1 |\ - \n| 6 | 7 | 1970-01-01T00:00:35 | host1 |\ - \n| 7 | 8 | 1970-01-01T00:00:40 | host1 |\ - \n| 8 | | 1970-01-01T00:00:45 | host1 |\ - \n| 9 | 9 | 1970-01-01T00:00:00 | host2 |\ - \n| 9 | 10 | 1970-01-01T00:00:05 | host2 |\ - \n| 10 | 11 | 1970-01-01T00:00:10 | host2 |\ - \n| 11 | 12 | 1970-01-01T00:00:15 | host2 |\ - \n| 12 | 13 | 1970-01-01T00:00:20 | host2 |\ - \n| 13 | 14 | 1970-01-01T00:00:25 | host2 |\ - \n| 14 | 15 | 1970-01-01T00:00:30 | host2 |\ - \n| 15 | 16 | 1970-01-01T00:00:35 | host2 |\ - \n| 16 | 17 | 1970-01-01T00:00:40 | host2 |\ - \n| 17 | | 1970-01-01T00:00:45 | host2 |\ + \n| 0 | | 1970-01-01T00:00:05 | host1 |\ + \n| 1 | 1 | 1970-01-01T00:00:10 | host1 |\ + \n| 1 | | 1970-01-01T00:00:15 | host1 |\ + \n| 2 | 2 | 1970-01-01T00:00:20 | host1 |\ + \n| 2 | | 1970-01-01T00:00:25 | host1 |\ + \n| 3 | 3 | 1970-01-01T00:00:00 | host2 |\ + \n| 3 | | 1970-01-01T00:00:05 | host2 |\ + \n| 4 | 4 | 1970-01-01T00:00:10 | host2 |\ + \n| 4 | | 1970-01-01T00:00:15 | host2 |\ + \n| 5 | 5 | 1970-01-01T00:00:20 | host2 |\ + \n| 5 | | 1970-01-01T00:00:25 | host2 |\ \n+------------+------------+---------------------+-------+", ); - do_range_select_test(10_000, 5_000, 5_000, expected).await; + do_range_select_test(10_000, 5_000, 5_000, Fill::Null, false, expected).await; + } + + #[tokio::test] + async fn range_fill_integer_linear() { + let expected = String::from( + "+------------+------------+---------------------+-------+\ + \n| MIN(value) | MAX(value) | timestamp | host |\ + \n+------------+------------+---------------------+-------+\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 0.0 | 0.5 | 1970-01-01T00:00:05 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ + \n| 1.0 | 1.5 | 1970-01-01T00:00:15 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ + \n| 2.0 | | 1970-01-01T00:00:25 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ + \n| 3.0 | 3.5 | 1970-01-01T00:00:05 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ + \n| 4.0 | 4.5 | 1970-01-01T00:00:15 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ + \n| 5.0 | | 1970-01-01T00:00:25 | host2 |\ + \n+------------+------------+---------------------+-------+", + ); + do_range_select_test(10_000, 5_000, 5_000, Fill::Linear, false, expected).await; + } + + #[tokio::test] + async fn range_fill_const() { + let expected = String::from( + "+------------+------------+---------------------+-------+\ + \n| MIN(value) | MAX(value) | timestamp | host |\ + \n+------------+------------+---------------------+-------+\ + \n| 0.0 | 0.0 | 1970-01-01T00:00:00 | host1 |\ + \n| 0.0 | 6.6 | 1970-01-01T00:00:05 | host1 |\ + \n| 1.0 | 1.0 | 1970-01-01T00:00:10 | host1 |\ + \n| 1.0 | 6.6 | 1970-01-01T00:00:15 | host1 |\ + \n| 2.0 | 2.0 | 1970-01-01T00:00:20 | host1 |\ + \n| 2.0 | 6.6 | 1970-01-01T00:00:25 | host1 |\ + \n| 3.0 | 3.0 | 1970-01-01T00:00:00 | host2 |\ + \n| 3.0 | 6.6 | 1970-01-01T00:00:05 | host2 |\ + \n| 4.0 | 4.0 | 1970-01-01T00:00:10 | host2 |\ + \n| 4.0 | 6.6 | 1970-01-01T00:00:15 | host2 |\ + \n| 5.0 | 5.0 | 1970-01-01T00:00:20 | host2 |\ + \n| 5.0 | 6.6 | 1970-01-01T00:00:25 | host2 |\ + \n+------------+------------+---------------------+-------+", + ); + do_range_select_test( + 10_000, + 5_000, + 5_000, + Fill::Const(ScalarValue::Float64(Some(6.6))), + true, + expected, + ) + .await; + } + + #[test] + fn fill_test() { + assert!(Fill::try_from_str("Linear", &DataType::UInt8).unwrap() == Fill::Linear); + assert_eq!( + Fill::try_from_str("Linear", &DataType::Boolean) + .unwrap_err() + .to_string(), + "Error during planning: Use FILL LINEAR on Non-numeric DataType Boolean" + ); + assert_eq!( + Fill::try_from_str("WHAT", &DataType::UInt8) + .unwrap_err() + .to_string(), + "Error during planning: WHAT is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string 'WHAT' to value of UInt8 type }" + ); + assert_eq!( + Fill::try_from_str("8.0", &DataType::UInt8) + .unwrap_err() + .to_string(), + "Error during planning: 8.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '8.0' to value of UInt8 type }" + ); + assert!( + Fill::try_from_str("8", &DataType::UInt8).unwrap() + == Fill::Const(ScalarValue::UInt8(Some(8))) + ); + let mut test1 = vec![ + ScalarValue::UInt8(Some(8)), + ScalarValue::UInt8(None), + ScalarValue::UInt8(Some(9)), + ]; + Fill::Null.apply_fill_strategy(&mut test1).unwrap(); + assert_eq!(test1[1], ScalarValue::UInt8(None)); + Fill::Prev.apply_fill_strategy(&mut test1).unwrap(); + assert_eq!(test1[1], ScalarValue::UInt8(Some(8))); + test1[1] = ScalarValue::UInt8(None); + Fill::Const(ScalarValue::UInt8(Some(10))) + .apply_fill_strategy(&mut test1) + .unwrap(); + assert_eq!(test1[1], ScalarValue::UInt8(Some(10))); + test1[1] = ScalarValue::UInt8(None); + assert_eq!( + Fill::Linear + .apply_fill_strategy(&mut test1) + .unwrap_err() + .to_string(), + "Execution error: RangePlan: Apply Fill LINEAR strategy on Non-floating type" + ); + let mut test2 = vec![ + ScalarValue::Float32(Some(8.0)), + ScalarValue::Float32(None), + ScalarValue::Float32(Some(9.0)), + ]; + Fill::Linear.apply_fill_strategy(&mut test2).unwrap(); + assert_eq!(test2[1], ScalarValue::Float32(Some(8.5))); } } diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index e7ae66bac8b8..5ac78f2013f0 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; +use arrow_schema::DataType; use async_recursion::async_recursion; use catalog::table_source::DfTableSourceProvider; use datafusion::datasource::DefaultTableSource; @@ -23,47 +24,62 @@ use datafusion::prelude::Column; use datafusion::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter, VisitRecursion}; use datafusion_common::{DFSchema, DataFusionError, Result as DFResult}; -use datafusion_expr::expr::{AggregateFunction, AggregateUDF, ScalarUDF}; +use datafusion_expr::expr::ScalarUDF; use datafusion_expr::{ - AggregateFunction as AggregateFn, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Projection, + Aggregate, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder, Projection, }; -use datafusion_sql::planner::ContextProvider; use datatypes::prelude::ConcreteDataType; use promql_parser::util::parse_duration; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; +use super::plan::Fill; use crate::error::{ - CatalogSnafu, DataFusionSnafu, Result, TimeIndexNotFoundSnafu, UnknownTableSnafu, + CatalogSnafu, DataFusionSnafu, RangeQuerySnafu, Result, TimeIndexNotFoundSnafu, + UnknownTableSnafu, }; use crate::range_select::plan::{RangeFn, RangeSelect}; -use crate::DfContextProviderAdapter; /// `RangeExprRewriter` will recursively search certain `Expr`, find all `range_fn` scalar udf contained in `Expr`, /// and collect the information required by the RangeSelect query, /// and finally modify the `range_fn` scalar udf to an ordinary column field. pub struct RangeExprRewriter<'a> { + input_plan: &'a Arc, align: Duration, by: Vec, - range_fn: Vec, - context_provider: &'a DfContextProviderAdapter, + /// Use `BTreeSet` to avoid in case like `avg(a) RANGE '5m' + avg(a) RANGE '5m'`, duplicate range expr `avg(a) RANGE '5m'` be calculate twice + range_fn: BTreeSet, + sub_aggr: &'a Aggregate, +} + +#[inline] +fn dispose_parse_error(expr: Option<&Expr>) -> DataFusionError { + DataFusionError::Plan( + expr.map(|x| { + format!( + "Illegal argument `{}` in range select query", + x.display_name().unwrap_or_default() + ) + }) + .unwrap_or("Missing argument in range select query".into()), + ) } impl<'a> RangeExprRewriter<'a> { - pub fn gen_range_expr(&self, func_name: &str, args: Vec) -> DFResult { - match AggregateFn::from_str(func_name) { - Ok(agg_fn) => Ok(Expr::AggregateFunction(AggregateFunction::new( - agg_fn, args, false, None, None, - ))), - Err(_) => match self.context_provider.get_aggregate_meta(func_name) { - Some(agg_udf) => Ok(Expr::AggregateUDF(AggregateUDF::new( - agg_udf, args, None, None, - ))), - None => Err(DataFusionError::Plan(format!( - "{} is not a Aggregate function or a Aggregate UDF", - func_name - ))), - }, + pub fn get_range_expr(&self, args: &[Expr], i: usize) -> DFResult { + match args.get(i) { + Some(Expr::Column(column)) => { + let index = self.sub_aggr.schema.index_of_column(column)?; + let len = self.sub_aggr.group_expr.len(); + self.sub_aggr + .aggr_expr + .get(index - len) + .cloned() + .ok_or(DataFusionError::Plan( + "Range expr not found in underlying Aggregate Plan".into(), + )) + } + other => Err(dispose_parse_error(other)), } } } @@ -71,9 +87,7 @@ impl<'a> RangeExprRewriter<'a> { fn parse_str_expr(args: &[Expr], i: usize) -> DFResult<&str> { match args.get(i) { Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => Ok(str.as_str()), - _ => Err(DataFusionError::Plan( - "Illegal argument in range select query".into(), - )), + other => Err(dispose_parse_error(other)), } } @@ -88,10 +102,8 @@ fn parse_expr_list(args: &[Expr], start: usize, len: usize) -> DFResult args[i].clone(), - _ => { - return Err(DataFusionError::Plan( - "Illegal expr argument in range select query".into(), - )) + other => { + return Err(dispose_parse_error(*other)); } }); } @@ -104,23 +116,22 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { fn mutate(&mut self, node: Expr) -> DFResult { if let Expr::ScalarUDF(func) = &node { if func.fun.name == "range_fn" { - // `range_fn(func_name, argc, [argv], range, fill, byc, [byv], align)` - // `argsv` and `byv` are variadic arguments, argc/byc indicate the length of arguments - let func_name = parse_str_expr(&func.args, 0)?; - let argc = str::parse::(parse_str_expr(&func.args, 1)?) - .map_err(|e| DataFusionError::Plan(e.to_string()))?; - let byc = str::parse::(parse_str_expr(&func.args, argc + 4)?) + // `range_fn(func, range, fill, byc, [byv], align)` + // `[byv]` are variadic arguments, byc indicate the length of arguments + let range_expr = self.get_range_expr(&func.args, 0)?; + let range_str = parse_str_expr(&func.args, 1)?; + let byc = str::parse::(parse_str_expr(&func.args, 3)?) .map_err(|e| DataFusionError::Plan(e.to_string()))?; - let mut range_fn = RangeFn { - expr: Expr::Wildcard, - range: parse_duration(parse_str_expr(&func.args, argc + 2)?) - .map_err(DataFusionError::Plan)?, - fill: parse_str_expr(&func.args, argc + 3)?.to_string(), - }; - let args = parse_expr_list(&func.args, 2, argc)?; - let by = parse_expr_list(&func.args, argc + 5, byc)?; - let align = parse_duration(parse_str_expr(&func.args, argc + byc + 5)?) + let by = parse_expr_list(&func.args, 4, byc)?; + let align = parse_duration(parse_str_expr(&func.args, byc + 4)?) .map_err(DataFusionError::Plan)?; + let mut data_type = range_expr.get_type(self.input_plan.schema())?; + let mut need_cast = false; + let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?; + if matches!(fill, Fill::Linear) && data_type.is_integer() { + data_type = DataType::Float64; + need_cast = true; + } if !self.by.is_empty() && self.by != by { return Err(DataFusionError::Plan( "Inconsistent by given in Range Function Rewrite".into(), @@ -135,9 +146,21 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { } else { self.align = align; } - range_fn.expr = self.gen_range_expr(func_name, args)?; - let alias = Expr::Column(Column::from_name(range_fn.expr.display_name()?)); - self.range_fn.push(range_fn); + let range_fn = RangeFn { + name: format!( + "{} RANGE {} FILL {}", + range_expr.display_name()?, + range_str, + fill + ), + data_type, + expr: range_expr, + range: parse_duration(range_str).map_err(DataFusionError::Plan)?, + fill, + need_cast, + }; + let alias = Expr::Column(Column::from_name(range_fn.name.clone())); + self.range_fn.insert(range_fn); return Ok(alias); } } @@ -146,25 +169,18 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> { } /// In order to implement RangeSelect query like `avg(field_0) RANGE '5m' FILL NULL`, -/// All RangeSelect query items are converted into udf scalar function in sql parse stage, with format like `range_fn('avg', .....)`. +/// All RangeSelect query items are converted into udf scalar function in sql parse stage, with format like `range_fn(avg(field_0), .....)`. /// `range_fn` contains all the parameters we need to execute RangeSelect. /// In order to correctly execute the query process of range select, we need to modify the query plan generated by datafusion. /// We need to recursively find the entire LogicalPlan, and find all `range_fn` scalar udf contained in the project plan, /// collecting info we need to generate RangeSelect Query LogicalPlan and rewrite th original LogicalPlan. pub struct RangePlanRewriter { table_provider: DfTableSourceProvider, - context_provider: DfContextProviderAdapter, } impl RangePlanRewriter { - pub fn new( - table_provider: DfTableSourceProvider, - context_provider: DfContextProviderAdapter, - ) -> Self { - Self { - table_provider, - context_provider, - } + pub fn new(table_provider: DfTableSourceProvider) -> Self { + Self { table_provider } } pub async fn rewrite(&mut self, plan: LogicalPlan) -> Result { @@ -185,17 +201,28 @@ impl RangePlanRewriter { LogicalPlan::Projection(Projection { expr, input, .. }) if have_range_in_exprs(expr) => { - let input = if let Some(new_input) = new_inputs[0].take() { - Arc::new(new_input) + let (aggr_plan, input) = if let LogicalPlan::Aggregate(aggr) = input.as_ref() { + // Expr like `rate(max(a) RANGE '6m') RANGE '6m'` have legal syntax but illegal semantic. + if have_range_in_exprs(&aggr.aggr_expr) { + return RangeQuerySnafu { + msg: "Nest Range Query is not allowed", + } + .fail(); + } + (aggr, aggr.input.clone()) } else { - input.clone() + return RangeQuerySnafu { + msg: "Window functions is not allowed in Range Query", + } + .fail(); }; - let (time_index, default_by) = self.get_index_by(input.schema().clone()).await?; + let (time_index, default_by) = self.get_index_by(input.schema()).await?; let mut range_rewriter = RangeExprRewriter { + input_plan: &input, align: Duration::default(), by: vec![], - range_fn: vec![], - context_provider: &self.context_provider, + range_fn: BTreeSet::new(), + sub_aggr: aggr_plan, }; let new_expr = expr .iter() @@ -207,7 +234,7 @@ impl RangePlanRewriter { } let range_select = RangeSelect::try_new( input.clone(), - range_rewriter.range_fn, + range_rewriter.range_fn.into_iter().collect(), range_rewriter.align, time_index, range_rewriter.by, @@ -252,7 +279,7 @@ impl RangePlanRewriter { /// return `(time_index, [row_columns])` to the rewriter. /// If the user does not explicitly use the `by` keyword to indicate time series, /// `[row_columns]` will be use as default time series - async fn get_index_by(&mut self, schema: Arc) -> Result<(Expr, Vec)> { + async fn get_index_by(&mut self, schema: &Arc) -> Result<(Expr, Vec)> { let mut time_index_expr = Expr::Wildcard; let mut default_by = vec![]; for field in schema.fields() { @@ -303,28 +330,27 @@ impl RangePlanRewriter { } } -fn have_range_in_exprs(exprs: &Vec) -> bool { - let mut have = false; - for expr in exprs { +fn have_range_in_exprs(exprs: &[Expr]) -> bool { + exprs.iter().any(|expr| { + let mut find_range = false; let _ = expr.apply(&mut |expr| { if let Expr::ScalarUDF(ScalarUDF { fun, .. }) = expr { if fun.name == "range_fn" { - have = true; + find_range = true; return Ok(VisitRecursion::Stop); } } Ok(VisitRecursion::Continue) }); - if have { - break; - } - } - have + find_range + }) } #[cfg(test)] mod test { + use std::error::Error; + use catalog::memory::MemoryCatalogManager; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -391,14 +417,14 @@ mod test { QueryEngineFactory::new(catalog_list, None, None, false).query_engine() } - async fn query_plan_compare(sql: &str, expected: String) { + async fn do_query(sql: &str) -> Result { let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); let engine = create_test_engine().await; - let GreptimeLogicalPlan::DfPlan(plan) = engine - .planner() - .plan(stmt, QueryContext::arc()) - .await - .unwrap(); + engine.planner().plan(stmt, QueryContext::arc()).await + } + + async fn query_plan_compare(sql: &str, expected: String) { + let GreptimeLogicalPlan::DfPlan(plan) = do_query(sql).await.unwrap(); assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -406,7 +432,7 @@ mod test { async fn range_no_project() { let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( - "RangeSelect: range_exprs=[RangeFn { expr:AVG(test.field_0 + test.field_1) range:300s fill: }], align=3600s time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1):Float64;N]\ + "RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -414,11 +440,10 @@ mod test { #[tokio::test] async fn range_expr_calculation() { - let query = - r#"SELECT avg(field_0 + field_1)/4 RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; + let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#; let expected = String::from( - "Projection: AVG(test.field_0 + test.field_1) / Int64(4) [AVG(test.field_0 + test.field_1) / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[RangeFn { expr:AVG(test.field_0 + test.field_1) range:300s fill: }], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1):Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + "Projection: AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -427,10 +452,10 @@ mod test { #[tokio::test] async fn range_multi_args() { let query = - r#"SELECT covar(field_0 + field_1, field_1)/4 RANGE '5m' FROM test ALIGN '1h';"#; + r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#; let expected = String::from( - "Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[RangeFn { expr:COVARIANCE(test.field_0 + test.field_1,test.field_1) range:300s fill: }], align=3600s time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1):Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\ + "Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ + \n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -438,10 +463,10 @@ mod test { #[tokio::test] async fn range_calculation() { - let query = r#"SELECT (avg(field_0)+sum(field_1))/4 RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#; + let query = r#"SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL;"#; let expected = String::from( - "Projection: (AVG(test.field_0) + SUM(test.field_1)) / Int64(4) [AVG(test.field_0) + SUM(test.field_1) / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[RangeFn { expr:AVG(test.field_0) range:300s fill:NULL }, RangeFn { expr:SUM(test.field_1) range:300s fill:NULL }], align=3600s time_index=timestamp [AVG(test.field_0):Float64;N, SUM(test.field_1):Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + "Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) [AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -449,12 +474,12 @@ mod test { #[tokio::test] async fn range_as_sub_query() { - let query = r#"SELECT foo + 1 from (SELECT (avg(field_0)+sum(field_1))/4 RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#; + let query = r#"SELECT foo + 1 from (SELECT ((avg(field_0)+sum(field_1))/4) RANGE '5m' as foo FROM test ALIGN '1h' by (tag_0,tag_1) FILL NULL) where foo > 1;"#; let expected = String::from( "Projection: foo + Int64(1) [foo + Int64(1):Float64;N]\ \n Filter: foo > Int64(1) [foo:Float64;N]\ - \n Projection: (AVG(test.field_0) + SUM(test.field_1)) / Int64(4) AS foo [foo:Float64;N]\ - \n RangeSelect: range_exprs=[RangeFn { expr:AVG(test.field_0) range:300s fill:NULL }, RangeFn { expr:SUM(test.field_1) range:300s fill:NULL }], align=3600s time_index=timestamp [AVG(test.field_0):Float64;N, SUM(test.field_1):Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n Projection: (AVG(test.field_0) RANGE 5m FILL NULL + SUM(test.field_1) RANGE 5m FILL NULL) / Int64(4) AS foo [foo:Float64;N]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL NULL, SUM(test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL NULL:Float64;N, SUM(test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; @@ -462,14 +487,109 @@ mod test { #[tokio::test] async fn range_from_nest_query() { - let query = r#"SELECT (avg(a)+sum(b))/4 RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#; + let query = r#"SELECT ((avg(a)+sum(b))/4) RANGE '5m' FROM (SELECT field_0 as a, field_1 as b, tag_0 as c, tag_1 as d, timestamp from test where field_0 > 1.0) ALIGN '1h' by (c, d) FILL NULL;"#; let expected = String::from( - "Projection: (AVG(a) + SUM(b)) / Int64(4) [AVG(a) + SUM(b) / Int64(4):Float64;N]\ - \n RangeSelect: range_exprs=[RangeFn { expr:AVG(a) range:300s fill:NULL }, RangeFn { expr:SUM(b) range:300s fill:NULL }], align=3600s time_index=timestamp [AVG(a):Float64;N, SUM(b):Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\ + "Projection: (AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL) / Int64(4) [AVG(a) RANGE 5m FILL NULL + SUM(b) RANGE 5m FILL NULL / Int64(4):Float64;N]\ + \n RangeSelect: range_exprs=[AVG(a) RANGE 5m FILL NULL, SUM(b) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(a) RANGE 5m FILL NULL:Float64;N, SUM(b) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), c:Utf8, d:Utf8]\ \n Projection: test.field_0 AS a, test.field_1 AS b, test.tag_0 AS c, test.tag_1 AS d, test.timestamp [a:Float64;N, b:Float64;N, c:Utf8, d:Utf8, timestamp:Timestamp(Millisecond, None)]\ \n Filter: test.field_0 > Float64(1) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]\ \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" + ); + query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn range_in_expr() { + let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#; + let expected = String::from( + "Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)):Float64;N]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" + ); + query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn duplicate_range_expr() { + let query = r#"SELECT avg(field_0) RANGE '5m' FILL 6.0 + avg(field_0) RANGE '5m' FILL 6.0 FROM test ALIGN '1h' by (tag_0,tag_1);"#; + let expected = String::from( + "Projection: AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6 [AVG(test.field_0) RANGE 5m FILL 6 + AVG(test.field_0) RANGE 5m FILL 6:Float64]\ + \n RangeSelect: range_exprs=[AVG(test.field_0) RANGE 5m FILL 6], align=3600s time_index=timestamp [AVG(test.field_0) RANGE 5m FILL 6:Float64, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" ); query_plan_compare(query, expected).await; } + + #[tokio::test] + async fn deep_nest_range_expr() { + let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#; + let expected = String::from( + "Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))):Float64;N]\ + \n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600s time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" + ); + query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn complex_range_expr() { + let query = r#"SELECT gcd(CAST(max(field_0 + 1) Range '5m' FILL NULL AS Int64), CAST(tag_0 AS Int64)) + round(max(field_2+1) Range '6m' FILL NULL + 1) + max(field_2+3) Range '10m' FILL NULL * CAST(tag_1 AS Float64) + 1 FROM test ALIGN '1h' by (tag_0, tag_1);"#; + let expected = String::from( + "Projection: gcd(CAST(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL AS Int64), CAST(test.tag_0 AS Int64)) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * CAST(test.tag_1 AS Float64) + Int64(1) [gcd(MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL,test.tag_0) + round(MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL + Int64(1)) + MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL * test.tag_1 + Int64(1):Float64;N]\ + \n RangeSelect: range_exprs=[MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL], align=3600s time_index=timestamp [MAX(test.field_0 + Int64(1)) RANGE 5m FILL NULL:Float64;N, MAX(test.field_2 + Int64(1)) RANGE 6m FILL NULL:Float64;N, MAX(test.field_2 + Int64(3)) RANGE 10m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\ + \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" + ); + query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn range_linear_on_integer() { + let query = r#"SELECT min(CAST(field_0 AS Int64) + CAST(field_1 AS Int64)) RANGE '5m' FILL LINEAR FROM test ALIGN '1h' by (tag_0,tag_1);"#; + let expected = String::from( + "RangeSelect: range_exprs=[MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR], align=3600s time_index=timestamp [MIN(test.field_0 + test.field_1) RANGE 5m FILL LINEAR:Float64;N]\ + \n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]" + ); + query_plan_compare(query, expected).await; + } + + #[tokio::test] + async fn range_nest_range_err() { + let query = r#"SELECT sum(avg(field_0 + field_1) RANGE '5m' + 1) RANGE '5m' + 1 FROM test ALIGN '1h' by (tag_0,tag_1);"#; + assert_eq!( + do_query(query).await.unwrap_err().to_string(), + "Range Query: Nest Range Query is not allowed" + ) + } + + #[tokio::test] + /// Start directly from the rewritten SQL and check whether the error reported by the range expression rewriting is as expected. + /// the right argument is `range_fn(avg(field_0), '5m', 'NULL', '0', '1h')` + async fn range_argument_err_1() { + let query = r#"SELECT range_fn('5m', avg(field_0), 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#; + let error = do_query(query) + .await + .unwrap_err() + .source() + .unwrap() + .to_string(); + assert_eq!( + error, + "Error during planning: Illegal argument `Utf8(\"5m\")` in range select query" + ) + } + + #[tokio::test] + async fn range_argument_err_2() { + let query = r#"SELECT range_fn(avg(field_0), 5, 'NULL', '1', tag_0, '1h') FROM test group by tag_0;"#; + let error = do_query(query) + .await + .unwrap_err() + .source() + .unwrap() + .to_string(); + assert_eq!( + error, + "Error during planning: Illegal argument `Int64(5)` in range select query" + ) + } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index b0eb5c3c4c6a..6e41f7285159 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -14,7 +14,7 @@ use std::ops::Deref; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datatypes::prelude::{ConcreteDataType, Value}; @@ -28,7 +28,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use tokio::io::AsyncWrite; -use crate::error::{self, Error, OtherSnafu, Result}; +use crate::error::{self, Error, Result}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. @@ -148,7 +148,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { .await? } Err(e) => { - return Err(e).map_err(BoxedError::new).context(OtherSnafu); + let err = e.to_string(); + row_writer + .finish_error(ErrorKind::ER_INTERNAL_ERROR, &err.as_bytes()) + .await?; + return Ok(()); } } } diff --git a/tests/cases/standalone/common/range/by.result b/tests/cases/standalone/common/range/by.result new file mode 100644 index 000000000000..251f3a67e414 --- /dev/null +++ b/tests/cases/standalone/common/range/by.result @@ -0,0 +1,54 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +Affected Rows: 10 + +-- Test by calculate +SELECT ts, length(host), max(val) RANGE '5s' FROM host ALIGN '20s' BY (length(host)) ORDER BY ts; + ++---------------------+-----------------------------+----------------------------------+ +| ts | character_length(host.host) | MAX(host.val) RANGE 5s FILL NULL | ++---------------------+-----------------------------+----------------------------------+ +| 1970-01-01T00:00:00 | 5 | 3 | +| 1970-01-01T00:00:20 | 5 | 5 | ++---------------------+-----------------------------+----------------------------------+ + +SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' BY (2) ORDER BY ts; + ++---------------------+----------------------------------+ +| ts | MAX(host.val) RANGE 5s FILL NULL | ++---------------------+----------------------------------+ +| 1970-01-01T00:00:00 | 3 | +| 1970-01-01T00:00:20 | 5 | ++---------------------+----------------------------------+ + +SELECT ts, CAST(length(host) as INT64) + 2, max(val) RANGE '5s' FROM host ALIGN '20s' BY (CAST(length(host) as INT64) + 2) ORDER BY ts; + ++---------------------+----------------------------------------+----------------------------------+ +| ts | character_length(host.host) + Int64(2) | MAX(host.val) RANGE 5s FILL NULL | ++---------------------+----------------------------------------+----------------------------------+ +| 1970-01-01T00:00:00 | 7 | 3 | +| 1970-01-01T00:00:20 | 7 | 5 | ++---------------------+----------------------------------------+----------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/by.sql b/tests/cases/standalone/common/range/by.sql new file mode 100644 index 000000000000..0ffa2afc826a --- /dev/null +++ b/tests/cases/standalone/common/range/by.sql @@ -0,0 +1,27 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +-- Test by calculate + +SELECT ts, length(host), max(val) RANGE '5s' FROM host ALIGN '20s' BY (length(host)) ORDER BY ts; + +SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' BY (2) ORDER BY ts; + +SELECT ts, CAST(length(host) as INT64) + 2, max(val) RANGE '5s' FROM host ALIGN '20s' BY (CAST(length(host) as INT64) + 2) ORDER BY ts; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/calculate.result b/tests/cases/standalone/common/range/calculate.result new file mode 100644 index 000000000000..29e292b409b1 --- /dev/null +++ b/tests/cases/standalone/common/range/calculate.result @@ -0,0 +1,194 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +Affected Rows: 10 + +-- Test range expr calculate +SELECT ts, host, covar(val, val) RANGE '20s' FROM host ALIGN '10s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------------+ +| ts | host | COVARIANCE(host.val,host.val) RANGE 20s FILL NULL | ++---------------------+-------+---------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | | +| 1970-01-01T00:00:10 | host1 | 0.5 | +| 1970-01-01T00:00:20 | host1 | 0.5 | +| 1970-01-01T00:00:30 | host1 | | +| 1970-01-01T00:00:00 | host2 | | +| 1970-01-01T00:00:10 | host2 | 0.5 | +| 1970-01-01T00:00:20 | host2 | 0.5 | +| 1970-01-01T00:00:30 | host2 | | ++---------------------+-------+---------------------------------------------------+ + +SELECT ts, host, 2 * min(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------+ +| ts | host | Int64(2) * MIN(host.val) RANGE 5s FILL NULL | ++---------------------+-------+---------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 2 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 4 | +| 1970-01-01T00:00:00 | host2 | 6 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 8 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 10 | ++---------------------+-------+---------------------------------------------+ + +SELECT ts, host, min(val * 2) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------+ +| ts | host | MIN(host.val * Int64(2)) RANGE 5s FILL NULL | ++---------------------+-------+---------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 2 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 4 | +| 1970-01-01T00:00:00 | host2 | 6 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 8 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 10 | ++---------------------+-------+---------------------------------------------+ + +SELECT ts, host, min(CAST(val as Float64)) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0.0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1.0 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2.0 | +| 1970-01-01T00:00:00 | host2 | 3.0 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4.0 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5.0 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(floor(CAST(val as Float64))) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+-----------------------------------------+ +| ts | host | MIN(floor(host.val)) RANGE 5s FILL NULL | ++---------------------+-------+-----------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0.0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1.0 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2.0 | +| 1970-01-01T00:00:00 | host2 | 3.0 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4.0 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5.0 | ++---------------------+-------+-----------------------------------------+ + +SELECT ts, host, floor(min(val) RANGE '5s') FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+-----------------------------------------+ +| ts | host | floor(MIN(host.val) RANGE 5s FILL NULL) | ++---------------------+-------+-----------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0.0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1.0 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2.0 | +| 1970-01-01T00:00:00 | host2 | 3.0 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4.0 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5.0 | ++---------------------+-------+-----------------------------------------+ + +-- Test complex range expr calculate +SELECT ts, host, (min(val) + max(val)) RANGE '20s' + 1.0 FROM host ALIGN '10s' ORDER BY host, ts; + ++---------------------+-------+------------------------------------------------------------------------------------+ +| ts | host | MIN(host.val) RANGE 20s FILL NULL + MAX(host.val) RANGE 20s FILL NULL + Float64(1) | ++---------------------+-------+------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1.0 | +| 1970-01-01T00:00:10 | host1 | 2.0 | +| 1970-01-01T00:00:20 | host1 | 4.0 | +| 1970-01-01T00:00:30 | host1 | 5.0 | +| 1970-01-01T00:00:00 | host2 | 7.0 | +| 1970-01-01T00:00:10 | host2 | 8.0 | +| 1970-01-01T00:00:20 | host2 | 10.0 | +| 1970-01-01T00:00:30 | host2 | 11.0 | ++---------------------+-------+------------------------------------------------------------------------------------+ + +SELECT ts, host, covar(ceil(CAST(val as Float64)), floor(CAST(val as Float64))) RANGE '20s' FROM host ALIGN '10s' ORDER BY host, ts; + ++---------------------+-------+----------------------------------------------------------------+ +| ts | host | COVARIANCE(ceil(host.val),floor(host.val)) RANGE 20s FILL NULL | ++---------------------+-------+----------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | | +| 1970-01-01T00:00:10 | host1 | 0.5 | +| 1970-01-01T00:00:20 | host1 | 0.5 | +| 1970-01-01T00:00:30 | host1 | | +| 1970-01-01T00:00:00 | host2 | | +| 1970-01-01T00:00:10 | host2 | 0.5 | +| 1970-01-01T00:00:20 | host2 | 0.5 | +| 1970-01-01T00:00:30 | host2 | | ++---------------------+-------+----------------------------------------------------------------+ + +SELECT ts, host, floor(cos(ceil(sin(min(val) RANGE '5s')))) FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+---------------------------------------------------------+ +| ts | host | floor(cos(ceil(sin(MIN(host.val) RANGE 5s FILL NULL)))) | ++---------------------+-------+---------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1.0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 0.0 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 0.0 | +| 1970-01-01T00:00:00 | host2 | 0.0 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 1.0 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 1.0 | ++---------------------+-------+---------------------------------------------------------+ + +SELECT ts, host, gcd(CAST(max(floor(CAST(val as Float64))) RANGE '10s' FILL PREV as INT64) * 4, max(val * 4) RANGE '10s' FILL PREV) * length(host) + 1 FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------+ +| ts | host | gcd(MAX(floor(host.val)) RANGE 10s FILL PREV * Int64(4),MAX(host.val * Int64(4)) RANGE 10s FILL PREV) * character_length(host.host) + Int64(1) | ++---------------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 1 | +| 1970-01-01T00:00:05 | host1 | 1 | +| 1970-01-01T00:00:10 | host1 | 21 | +| 1970-01-01T00:00:15 | host1 | 21 | +| 1970-01-01T00:00:20 | host1 | 41 | +| 1970-01-01T00:00:25 | host1 | 41 | +| 1970-01-01T00:00:00 | host2 | 61 | +| 1970-01-01T00:00:05 | host2 | 61 | +| 1970-01-01T00:00:10 | host2 | 81 | +| 1970-01-01T00:00:15 | host2 | 81 | +| 1970-01-01T00:00:20 | host2 | 101 | +| 1970-01-01T00:00:25 | host2 | 101 | ++---------------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/calculate.sql b/tests/cases/standalone/common/range/calculate.sql new file mode 100644 index 000000000000..59b19e923237 --- /dev/null +++ b/tests/cases/standalone/common/range/calculate.sql @@ -0,0 +1,43 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +-- Test range expr calculate + +SELECT ts, host, covar(val, val) RANGE '20s' FROM host ALIGN '10s' ORDER BY host, ts; + +SELECT ts, host, 2 * min(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, min(val * 2) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, min(CAST(val as Float64)) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, min(floor(CAST(val as Float64))) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, floor(min(val) RANGE '5s') FROM host ALIGN '5s' ORDER BY host, ts; + +-- Test complex range expr calculate + +SELECT ts, host, (min(val) + max(val)) RANGE '20s' + 1.0 FROM host ALIGN '10s' ORDER BY host, ts; + +SELECT ts, host, covar(ceil(CAST(val as Float64)), floor(CAST(val as Float64))) RANGE '20s' FROM host ALIGN '10s' ORDER BY host, ts; + +SELECT ts, host, floor(cos(ceil(sin(min(val) RANGE '5s')))) FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, gcd(CAST(max(floor(CAST(val as Float64))) RANGE '10s' FILL PREV as INT64) * 4, max(val * 4) RANGE '10s' FILL PREV) * length(host) + 1 FROM host ALIGN '5s' ORDER BY host, ts; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/error.result b/tests/cases/standalone/common/range/error.result new file mode 100644 index 000000000000..508cad0adff3 --- /dev/null +++ b/tests/cases/standalone/common/range/error.result @@ -0,0 +1,82 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +Affected Rows: 10 + +-- Test Invalid cases +-- 1. error timestamp +SELECT min(val) RANGE 'not_time' FROM host ALIGN '5s'; + +Error: 2000(InvalidSyntax), sql parser error: not a valid duration string: not_time + +SELECT min(val) RANGE '5s' FROM host ALIGN 'not_time'; + +Error: 2000(InvalidSyntax), sql parser error: not a valid duration string: not_time + +-- 2.1 no range param +SELECT min(val) FROM host ALIGN '5s'; + +Error: 2000(InvalidSyntax), sql parser error: Illegal Range select, no RANGE keyword found in any SelectItem + +SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s'; + +Error: 1003(Internal), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host. + +SELECT min(val) * 2 RANGE '10s' FROM host ALIGN '5s'; + +Error: 2000(InvalidSyntax), sql parser error: Can't use the RANGE keyword in Expr 2 without function + +SELECT 1 RANGE '10s' FILL NULL FROM host ALIGN '1h' FILL NULL; + +Error: 2000(InvalidSyntax), sql parser error: Can't use the RANGE keyword in Expr 1 without function + +-- 2.2 no align param +SELECT min(val) RANGE '5s' FROM host; + +Error: 1003(Internal), Error during planning: Missing argument in range select query + +-- 2.3 type mismatch +SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s'; + +Error: 1003(Internal), Internal error: Unsupported data type Int64 for function ceil. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker + +-- 2.4 nest query +SELECT min(max(val) RANGE '20s') RANGE '20s' FROM host ALIGN '10s'; + +Error: 2000(InvalidSyntax), Range Query: Nest Range Query is not allowed + +-- 2.5 wrong Aggregate +SELECT rank() OVER (PARTITION BY host ORDER BY ts DESC) RANGE '10s' FROM host ALIGN '5s'; + +Error: 2000(InvalidSyntax), Range Query: Window functions is not allowed in Range Query + +-- 2.6 invalid fill +SELECT min(val) RANGE '5s', min(val) RANGE '5s' FILL NULL FROM host ALIGN '5s'; + +Error: 1003(Internal), Schema contains duplicate unqualified field name "MIN(host.val) RANGE 5s FILL NULL" + +SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0; + +Error: 1003(Internal), Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type } + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/error.sql b/tests/cases/standalone/common/range/error.sql new file mode 100644 index 000000000000..19cf55d2285e --- /dev/null +++ b/tests/cases/standalone/common/range/error.sql @@ -0,0 +1,59 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +-- Test Invalid cases + +-- 1. error timestamp + +SELECT min(val) RANGE 'not_time' FROM host ALIGN '5s'; + +SELECT min(val) RANGE '5s' FROM host ALIGN 'not_time'; + +-- 2.1 no range param + +SELECT min(val) FROM host ALIGN '5s'; + +SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s'; + +SELECT min(val) * 2 RANGE '10s' FROM host ALIGN '5s'; + +SELECT 1 RANGE '10s' FILL NULL FROM host ALIGN '1h' FILL NULL; + +-- 2.2 no align param + +SELECT min(val) RANGE '5s' FROM host; + +-- 2.3 type mismatch + +SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s'; + +-- 2.4 nest query + +SELECT min(max(val) RANGE '20s') RANGE '20s' FROM host ALIGN '10s'; + +-- 2.5 wrong Aggregate + +SELECT rank() OVER (PARTITION BY host ORDER BY ts DESC) RANGE '10s' FROM host ALIGN '5s'; + +-- 2.6 invalid fill + +SELECT min(val) RANGE '5s', min(val) RANGE '5s' FILL NULL FROM host ALIGN '5s'; + +SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/fill.result b/tests/cases/standalone/common/range/fill.result new file mode 100644 index 000000000000..00a371b0fd34 --- /dev/null +++ b/tests/cases/standalone/common/range/fill.result @@ -0,0 +1,112 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +Affected Rows: 10 + +-- Test Fill +SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s' FILL NULL ORDER BY host, ts; + ++---------------------+-------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | ++---------------------+-------+----------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5 | ++---------------------+-------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL 6 FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+-------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | MIN(host.val) RANGE 5s FILL 6 | ++---------------------+-------+----------------------------------+-------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | 0 | +| 1970-01-01T00:00:05 | host1 | | 6 | +| 1970-01-01T00:00:10 | host1 | 1 | 1 | +| 1970-01-01T00:00:15 | host1 | | 6 | +| 1970-01-01T00:00:20 | host1 | 2 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | 3 | +| 1970-01-01T00:00:05 | host2 | | 6 | +| 1970-01-01T00:00:10 | host2 | 4 | 4 | +| 1970-01-01T00:00:15 | host2 | | 6 | +| 1970-01-01T00:00:20 | host2 | 5 | 5 | ++---------------------+-------+----------------------------------+-------------------------------+ + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL PREV FROM host ALIGN '5s'ORDER BY host, ts; + ++---------------------+-------+----------------------------------+----------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | MIN(host.val) RANGE 5s FILL PREV | ++---------------------+-------+----------------------------------+----------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | 0 | +| 1970-01-01T00:00:05 | host1 | | 0 | +| 1970-01-01T00:00:10 | host1 | 1 | 1 | +| 1970-01-01T00:00:15 | host1 | | 1 | +| 1970-01-01T00:00:20 | host1 | 2 | 2 | +| 1970-01-01T00:00:00 | host2 | 3 | 3 | +| 1970-01-01T00:00:05 | host2 | | 3 | +| 1970-01-01T00:00:10 | host2 | 4 | 4 | +| 1970-01-01T00:00:15 | host2 | | 4 | +| 1970-01-01T00:00:20 | host2 | 5 | 5 | ++---------------------+-------+----------------------------------+----------------------------------+ + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL LINEAR FROM host ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+----------------------------------+------------------------------------+ +| ts | host | MIN(host.val) RANGE 5s FILL NULL | MIN(host.val) RANGE 5s FILL LINEAR | ++---------------------+-------+----------------------------------+------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | 0.0 | +| 1970-01-01T00:00:05 | host1 | | 0.5 | +| 1970-01-01T00:00:10 | host1 | 1 | 1.0 | +| 1970-01-01T00:00:15 | host1 | | 1.5 | +| 1970-01-01T00:00:20 | host1 | 2 | 2.0 | +| 1970-01-01T00:00:00 | host2 | 3 | 3.0 | +| 1970-01-01T00:00:05 | host2 | | 3.5 | +| 1970-01-01T00:00:10 | host2 | 4 | 4.0 | +| 1970-01-01T00:00:15 | host2 | | 4.5 | +| 1970-01-01T00:00:20 | host2 | 5 | 5.0 | ++---------------------+-------+----------------------------------+------------------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/fill.sql b/tests/cases/standalone/common/range/fill.sql new file mode 100644 index 000000000000..66ae5f247c17 --- /dev/null +++ b/tests/cases/standalone/common/range/fill.sql @@ -0,0 +1,31 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +-- Test Fill + +SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s' FILL NULL ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL 6 FROM host ALIGN '5s' ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL PREV FROM host ALIGN '5s'ORDER BY host, ts; + +SELECT ts, host, min(val) RANGE '5s', min(val) RANGE '5s' FILL LINEAR FROM host ALIGN '5s' ORDER BY host, ts; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result new file mode 100644 index 000000000000..ae46183cc254 --- /dev/null +++ b/tests/cases/standalone/common/range/nest.result @@ -0,0 +1,51 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +Affected Rows: 10 + +-- Test range query in nest sql +SELECT ts, host, foo FROM (SELECT ts, host, min(val) RANGE '5s' AS foo FROM host ALIGN '5s') WHERE host = 'host1' ORDER BY host, ts; + ++---------------------+-------+-----+ +| ts | host | foo | ++---------------------+-------+-----+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2 | ++---------------------+-------+-----+ + +SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE host = 'host1') ALIGN '5s' BY (b) ORDER BY b, ts; + ++---------------------+-------+---------------------------+ +| ts | b | MIN(c) RANGE 5s FILL NULL | ++---------------------+-------+---------------------------+ +| 1970-01-01T00:00:00 | host1 | 0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2 | ++---------------------+-------+---------------------------+ + +DROP TABLE host; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/nest.sql b/tests/cases/standalone/common/range/nest.sql new file mode 100644 index 000000000000..d2478b57311f --- /dev/null +++ b/tests/cases/standalone/common/range/nest.sql @@ -0,0 +1,25 @@ +CREATE TABLE host ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, +); + +INSERT INTO TABLE host VALUES + (0, 'host1', 0), + (5000, 'host1', null), + (10000, 'host1', 1), + (15000, 'host1', null), + (20000, 'host1', 2), + (0, 'host2', 3), + (5000, 'host2', null), + (10000, 'host2', 4), + (15000, 'host2', null), + (20000, 'host2', 5); + +-- Test range query in nest sql + +SELECT ts, host, foo FROM (SELECT ts, host, min(val) RANGE '5s' AS foo FROM host ALIGN '5s') WHERE host = 'host1' ORDER BY host, ts; + +SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE host = 'host1') ALIGN '5s' BY (b) ORDER BY b, ts; + +DROP TABLE host; diff --git a/tests/cases/standalone/common/range/precisions.result b/tests/cases/standalone/common/range/precisions.result new file mode 100644 index 000000000000..e70f4a5c0ad3 --- /dev/null +++ b/tests/cases/standalone/common/range/precisions.result @@ -0,0 +1,44 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 0), + (5, 'host1', null), + (10, 'host1', 1), + (15, 'host1', null), + (20, 'host1', 2), + (0, 'host2', 3), + (5, 'host2', null), + (10, 'host2', 4), + (15, 'host2', null), + (20, 'host2', 5); + +Affected Rows: 10 + +-- Test on Timestamps of different precisions +SELECT ts, host, min(val) RANGE '5s' FROM host_sec ALIGN '5s' ORDER BY host, ts; + ++---------------------+-------+--------------------------------------+ +| ts | host | MIN(host_sec.val) RANGE 5s FILL NULL | ++---------------------+-------+--------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 0.0 | +| 1970-01-01T00:00:05 | host1 | | +| 1970-01-01T00:00:10 | host1 | 1.0 | +| 1970-01-01T00:00:15 | host1 | | +| 1970-01-01T00:00:20 | host1 | 2.0 | +| 1970-01-01T00:00:00 | host2 | 3.0 | +| 1970-01-01T00:00:05 | host2 | | +| 1970-01-01T00:00:10 | host2 | 4.0 | +| 1970-01-01T00:00:15 | host2 | | +| 1970-01-01T00:00:20 | host2 | 5.0 | ++---------------------+-------+--------------------------------------+ + +DROP TABLE host_sec; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/range/precisions.sql b/tests/cases/standalone/common/range/precisions.sql new file mode 100644 index 000000000000..ced8597d2c81 --- /dev/null +++ b/tests/cases/standalone/common/range/precisions.sql @@ -0,0 +1,23 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 0), + (5, 'host1', null), + (10, 'host1', 1), + (15, 'host1', null), + (20, 'host1', 2), + (0, 'host2', 3), + (5, 'host2', null), + (10, 'host2', 4), + (15, 'host2', null), + (20, 'host2', 5); + +-- Test on Timestamps of different precisions + +SELECT ts, host, min(val) RANGE '5s' FROM host_sec ALIGN '5s' ORDER BY host, ts; + +DROP TABLE host_sec; diff --git a/tests/cases/standalone/common/select/range_select.result b/tests/cases/standalone/common/select/range_select.result deleted file mode 100644 index 17dfe05d3999..000000000000 --- a/tests/cases/standalone/common/select/range_select.result +++ /dev/null @@ -1,324 +0,0 @@ -CREATE TABLE host ( - ts timestamp(3) time index, - host STRING PRIMARY KEY, - val DOUBLE, -); - -Affected Rows: 0 - -INSERT INTO TABLE host VALUES - (0, 'host1', 0.0), - (5000, 'host1', 1.0), - (10000, 'host1', 2.0), - (15000, 'host1', 3.0), - (20000, 'host1', 4.0), - (25000, 'host1', 5.0), - (30000, 'host1', 6.0), - (35000, 'host1', 7.0), - (40000, 'host1', 8.0), - (0, 'host2', 9.0), - (5000, 'host2', 10.0), - (10000, 'host2', 11.0), - (15000, 'host2', 12.0), - (20000, 'host2', 13.0), - (25000, 'host2', 14.0), - (30000, 'host2', 15.0), - (35000, 'host2', 16.0), - (40000, 'host2', 17.0); - -Affected Rows: 18 - -SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------+---------------+---------------+ -| ts | host | MIN(host.val) | MAX(host.val) | -+---------------------+-------+---------------+---------------+ -| 1970-01-01T00:00:00 | host1 | 0.0 | 0.0 | -| 1970-01-01T00:00:05 | host1 | 0.0 | 1.0 | -| 1970-01-01T00:00:10 | host1 | 1.0 | 2.0 | -| 1970-01-01T00:00:15 | host1 | 2.0 | 3.0 | -| 1970-01-01T00:00:20 | host1 | 3.0 | 4.0 | -| 1970-01-01T00:00:25 | host1 | 4.0 | 5.0 | -| 1970-01-01T00:00:30 | host1 | 5.0 | 6.0 | -| 1970-01-01T00:00:35 | host1 | 6.0 | 7.0 | -| 1970-01-01T00:00:40 | host1 | 7.0 | 8.0 | -| 1970-01-01T00:00:45 | host1 | 8.0 | 8.0 | -| 1970-01-01T00:00:00 | host2 | 9.0 | 9.0 | -| 1970-01-01T00:00:05 | host2 | 9.0 | 10.0 | -| 1970-01-01T00:00:10 | host2 | 10.0 | 11.0 | -| 1970-01-01T00:00:15 | host2 | 11.0 | 12.0 | -| 1970-01-01T00:00:20 | host2 | 12.0 | 13.0 | -| 1970-01-01T00:00:25 | host2 | 13.0 | 14.0 | -| 1970-01-01T00:00:30 | host2 | 14.0 | 15.0 | -| 1970-01-01T00:00:35 | host2 | 15.0 | 16.0 | -| 1970-01-01T00:00:40 | host2 | 16.0 | 17.0 | -| 1970-01-01T00:00:45 | host2 | 17.0 | 17.0 | -+---------------------+-------+---------------+---------------+ - -SELECT ts, host, min(val / 2.0)/2 RANGE '10s', max(val / 2.0)/2 RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------+---------------------------------------+---------------------------------------+ -| ts | host | MIN(host.val / Float64(2)) / Int64(2) | MAX(host.val / Float64(2)) / Int64(2) | -+---------------------+-------+---------------------------------------+---------------------------------------+ -| 1970-01-01T00:00:00 | host1 | 0.0 | 0.0 | -| 1970-01-01T00:00:05 | host1 | 0.0 | 0.25 | -| 1970-01-01T00:00:10 | host1 | 0.25 | 0.5 | -| 1970-01-01T00:00:15 | host1 | 0.5 | 0.75 | -| 1970-01-01T00:00:20 | host1 | 0.75 | 1.0 | -| 1970-01-01T00:00:25 | host1 | 1.0 | 1.25 | -| 1970-01-01T00:00:30 | host1 | 1.25 | 1.5 | -| 1970-01-01T00:00:35 | host1 | 1.5 | 1.75 | -| 1970-01-01T00:00:40 | host1 | 1.75 | 2.0 | -| 1970-01-01T00:00:45 | host1 | 2.0 | 2.0 | -| 1970-01-01T00:00:00 | host2 | 2.25 | 2.25 | -| 1970-01-01T00:00:05 | host2 | 2.25 | 2.5 | -| 1970-01-01T00:00:10 | host2 | 2.5 | 2.75 | -| 1970-01-01T00:00:15 | host2 | 2.75 | 3.0 | -| 1970-01-01T00:00:20 | host2 | 3.0 | 3.25 | -| 1970-01-01T00:00:25 | host2 | 3.25 | 3.5 | -| 1970-01-01T00:00:30 | host2 | 3.5 | 3.75 | -| 1970-01-01T00:00:35 | host2 | 3.75 | 4.0 | -| 1970-01-01T00:00:40 | host2 | 4.0 | 4.25 | -| 1970-01-01T00:00:45 | host2 | 4.25 | 4.25 | -+---------------------+-------+---------------------------------------+---------------------------------------+ - -SELECT ts, covar(val, val) RANGE '10s', host FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------------------------------+-------+ -| ts | COVARIANCE(host.val,host.val) | host | -+---------------------+-------------------------------+-------+ -| 1970-01-01T00:00:00 | | host1 | -| 1970-01-01T00:00:05 | 0.5 | host1 | -| 1970-01-01T00:00:10 | 0.5 | host1 | -| 1970-01-01T00:00:15 | 0.5 | host1 | -| 1970-01-01T00:00:20 | 0.5 | host1 | -| 1970-01-01T00:00:25 | 0.5 | host1 | -| 1970-01-01T00:00:30 | 0.5 | host1 | -| 1970-01-01T00:00:35 | 0.5 | host1 | -| 1970-01-01T00:00:40 | 0.5 | host1 | -| 1970-01-01T00:00:45 | | host1 | -| 1970-01-01T00:00:00 | | host2 | -| 1970-01-01T00:00:05 | 0.5 | host2 | -| 1970-01-01T00:00:10 | 0.5 | host2 | -| 1970-01-01T00:00:15 | 0.5 | host2 | -| 1970-01-01T00:00:20 | 0.5 | host2 | -| 1970-01-01T00:00:25 | 0.5 | host2 | -| 1970-01-01T00:00:30 | 0.5 | host2 | -| 1970-01-01T00:00:35 | 0.5 | host2 | -| 1970-01-01T00:00:40 | 0.5 | host2 | -| 1970-01-01T00:00:45 | | host2 | -+---------------------+-------------------------------+-------+ - -SELECT covar(ceil(val), floor(val)) RANGE '10s', ts, host FROM host ALIGN '5s' ORDER BY host, ts; - -+--------------------------------------------+---------------------+-------+ -| COVARIANCE(ceil(host.val),floor(host.val)) | ts | host | -+--------------------------------------------+---------------------+-------+ -| | 1970-01-01T00:00:00 | host1 | -| 0.5 | 1970-01-01T00:00:05 | host1 | -| 0.5 | 1970-01-01T00:00:10 | host1 | -| 0.5 | 1970-01-01T00:00:15 | host1 | -| 0.5 | 1970-01-01T00:00:20 | host1 | -| 0.5 | 1970-01-01T00:00:25 | host1 | -| 0.5 | 1970-01-01T00:00:30 | host1 | -| 0.5 | 1970-01-01T00:00:35 | host1 | -| 0.5 | 1970-01-01T00:00:40 | host1 | -| | 1970-01-01T00:00:45 | host1 | -| | 1970-01-01T00:00:00 | host2 | -| 0.5 | 1970-01-01T00:00:05 | host2 | -| 0.5 | 1970-01-01T00:00:10 | host2 | -| 0.5 | 1970-01-01T00:00:15 | host2 | -| 0.5 | 1970-01-01T00:00:20 | host2 | -| 0.5 | 1970-01-01T00:00:25 | host2 | -| 0.5 | 1970-01-01T00:00:30 | host2 | -| 0.5 | 1970-01-01T00:00:35 | host2 | -| 0.5 | 1970-01-01T00:00:40 | host2 | -| | 1970-01-01T00:00:45 | host2 | -+--------------------------------------------+---------------------+-------+ - -SELECT ts, host, covar((sin(val) + cos(val))/2.0 + 1.0, 2.0) RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------+--------------------------------------------------------------------------------+ -| ts | host | COVARIANCE(sin(host.val) + cos(host.val) / Float64(2) + Float64(1),Float64(2)) | -+---------------------+-------+--------------------------------------------------------------------------------+ -| 1970-01-01T00:00:00 | host1 | | -| 1970-01-01T00:00:05 | host1 | 0.0 | -| 1970-01-01T00:00:10 | host1 | 0.0 | -| 1970-01-01T00:00:15 | host1 | 0.0 | -| 1970-01-01T00:00:20 | host1 | 0.0 | -| 1970-01-01T00:00:25 | host1 | 0.0 | -| 1970-01-01T00:00:30 | host1 | 0.0 | -| 1970-01-01T00:00:35 | host1 | 0.0 | -| 1970-01-01T00:00:40 | host1 | 0.0 | -| 1970-01-01T00:00:45 | host1 | | -| 1970-01-01T00:00:00 | host2 | | -| 1970-01-01T00:00:05 | host2 | 0.0 | -| 1970-01-01T00:00:10 | host2 | 0.0 | -| 1970-01-01T00:00:15 | host2 | 0.0 | -| 1970-01-01T00:00:20 | host2 | 0.0 | -| 1970-01-01T00:00:25 | host2 | 0.0 | -| 1970-01-01T00:00:30 | host2 | 0.0 | -| 1970-01-01T00:00:35 | host2 | 0.0 | -| 1970-01-01T00:00:40 | host2 | 0.0 | -| 1970-01-01T00:00:45 | host2 | | -+---------------------+-------+--------------------------------------------------------------------------------+ - -SELECT ts, min(val) RANGE '10s', host, max(val) RANGE '10s' FROM host ALIGN '1000s' ORDER BY host, ts; - -+---------------------+---------------+-------+---------------+ -| ts | MIN(host.val) | host | MAX(host.val) | -+---------------------+---------------+-------+---------------+ -| 1970-01-01T00:00:00 | 0.0 | host1 | 0.0 | -| 1970-01-01T00:00:00 | 9.0 | host2 | 9.0 | -+---------------------+---------------+-------+---------------+ - -SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------+---------------+---------------+ -| ts | host | MIN(host.val) | MAX(host.val) | -+---------------------+-------+---------------+---------------+ -| 1970-01-01T00:00:00 | host1 | 0.0 | 0.0 | -| 1970-01-01T00:00:05 | host1 | 0.0 | 1.0 | -| 1970-01-01T00:00:10 | host1 | 1.0 | 2.0 | -| 1970-01-01T00:00:15 | host1 | 2.0 | 3.0 | -| 1970-01-01T00:00:20 | host1 | 3.0 | 4.0 | -| 1970-01-01T00:00:25 | host1 | 4.0 | 5.0 | -| 1970-01-01T00:00:30 | host1 | 5.0 | 6.0 | -| 1970-01-01T00:00:35 | host1 | 6.0 | 7.0 | -| 1970-01-01T00:00:40 | host1 | 7.0 | 8.0 | -| 1970-01-01T00:00:45 | host1 | 8.0 | | -| 1970-01-01T00:00:00 | host2 | 9.0 | 9.0 | -| 1970-01-01T00:00:05 | host2 | 9.0 | 10.0 | -| 1970-01-01T00:00:10 | host2 | 10.0 | 11.0 | -| 1970-01-01T00:00:15 | host2 | 11.0 | 12.0 | -| 1970-01-01T00:00:20 | host2 | 12.0 | 13.0 | -| 1970-01-01T00:00:25 | host2 | 13.0 | 14.0 | -| 1970-01-01T00:00:30 | host2 | 14.0 | 15.0 | -| 1970-01-01T00:00:35 | host2 | 15.0 | 16.0 | -| 1970-01-01T00:00:40 | host2 | 16.0 | 17.0 | -| 1970-01-01T00:00:45 | host2 | 17.0 | | -+---------------------+-------+---------------+---------------+ - -SELECT ts, host, (min(val)+max(val))/4 RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -+---------------------+-------+------------------------------------------+ -| ts | host | MIN(host.val) + MAX(host.val) / Int64(4) | -+---------------------+-------+------------------------------------------+ -| 1970-01-01T00:00:00 | host1 | 0.0 | -| 1970-01-01T00:00:05 | host1 | 0.25 | -| 1970-01-01T00:00:10 | host1 | 0.75 | -| 1970-01-01T00:00:15 | host1 | 1.25 | -| 1970-01-01T00:00:20 | host1 | 1.75 | -| 1970-01-01T00:00:25 | host1 | 2.25 | -| 1970-01-01T00:00:30 | host1 | 2.75 | -| 1970-01-01T00:00:35 | host1 | 3.25 | -| 1970-01-01T00:00:40 | host1 | 3.75 | -| 1970-01-01T00:00:45 | host1 | 4.0 | -| 1970-01-01T00:00:00 | host2 | 4.5 | -| 1970-01-01T00:00:05 | host2 | 4.75 | -| 1970-01-01T00:00:10 | host2 | 5.25 | -| 1970-01-01T00:00:15 | host2 | 5.75 | -| 1970-01-01T00:00:20 | host2 | 6.25 | -| 1970-01-01T00:00:25 | host2 | 6.75 | -| 1970-01-01T00:00:30 | host2 | 7.25 | -| 1970-01-01T00:00:35 | host2 | 7.75 | -| 1970-01-01T00:00:40 | host2 | 8.25 | -| 1970-01-01T00:00:45 | host2 | 8.5 | -+---------------------+-------+------------------------------------------+ - -SELECT ts, host, foo FROM (SELECT ts, host, (min(val)+max(val))/4 RANGE '10s' AS foo FROM host ALIGN '5s' ORDER BY host, ts) WHERE foo > 5 ORDER BY host, ts; - -+---------------------+-------+------+ -| ts | host | foo | -+---------------------+-------+------+ -| 1970-01-01T00:00:10 | host2 | 5.25 | -| 1970-01-01T00:00:15 | host2 | 5.75 | -| 1970-01-01T00:00:20 | host2 | 6.25 | -| 1970-01-01T00:00:25 | host2 | 6.75 | -| 1970-01-01T00:00:30 | host2 | 7.25 | -| 1970-01-01T00:00:35 | host2 | 7.75 | -| 1970-01-01T00:00:40 | host2 | 8.25 | -| 1970-01-01T00:00:45 | host2 | 8.5 | -+---------------------+-------+------+ - -SELECT ts, b, (min(c)+max(c))/4 RANGE '10s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE val > 8.0) ALIGN '5s' BY (b) ORDER BY b, ts; - -+---------------------+-------+----------------------------+ -| ts | b | MIN(c) + MAX(c) / Int64(4) | -+---------------------+-------+----------------------------+ -| 1970-01-01T00:00:00 | host2 | 4.5 | -| 1970-01-01T00:00:05 | host2 | 4.75 | -| 1970-01-01T00:00:10 | host2 | 5.25 | -| 1970-01-01T00:00:15 | host2 | 5.75 | -| 1970-01-01T00:00:20 | host2 | 6.25 | -| 1970-01-01T00:00:25 | host2 | 6.75 | -| 1970-01-01T00:00:30 | host2 | 7.25 | -| 1970-01-01T00:00:35 | host2 | 7.75 | -| 1970-01-01T00:00:40 | host2 | 8.25 | -| 1970-01-01T00:00:45 | host2 | 8.5 | -+---------------------+-------+----------------------------+ - --- Test Invalid cases --- 1. error timestamp -SELECT min(val) RANGE 'not_time' FROM host ALIGN '5s'; - -Error: 2000(InvalidSyntax), sql parser error: not a valid duration string: not_time - -SELECT min(val) RANGE '5s' FROM host ALIGN 'not_time'; - -Error: 2000(InvalidSyntax), sql parser error: not a valid duration string: not_time - --- 2.1 no range param -SELECT min(val) FROM host ALIGN '5s'; - -Error: 2000(InvalidSyntax), sql parser error: RANGE argument not found in min(val) - -SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s'; - -Error: 2000(InvalidSyntax), sql parser error: RANGE argument not found in max(val) - --- 2.2 no align param -SELECT min(val) RANGE '5s' FROM host; - -Error: 1003(Internal), Error during planning: Illegal argument in range select query - -DROP TABLE host; - -Affected Rows: 0 - -CREATE TABLE host_sec ( - ts timestamp(0) time index, - host STRING PRIMARY KEY, - val DOUBLE, -); - -Affected Rows: 0 - -INSERT INTO TABLE host_sec VALUES - (0, 'host1', 0.0), - (5, 'host1', 1.0), - (10, 'host1', 2.0), - (15, 'host1', 3.0), - (20, 'host1', 4.0), - (25, 'host1', 5.0), - (30, 'host1', 6.0), - (35, 'host1', 7.0), - (40, 'host1', 8.0), - (0, 'host2', 9.0), - (5, 'host2', 10.0), - (10, 'host2', 11.0), - (15, 'host2', 12.0), - (20, 'host2', 13.0), - (25, 'host2', 14.0), - (30, 'host2', 15.0), - (35, 'host2', 16.0), - (40, 'host2', 17.0); - -Affected Rows: 18 - --- TODO(ruihang): This query returns incorrect result. --- SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '10s' FROM host_sec ALIGN '5s' ORDER BY host, ts; -DROP TABLE host_sec; - -Affected Rows: 0 - diff --git a/tests/cases/standalone/common/select/range_select.sql b/tests/cases/standalone/common/select/range_select.sql deleted file mode 100644 index 2ff4ee6d28d8..000000000000 --- a/tests/cases/standalone/common/select/range_select.sql +++ /dev/null @@ -1,96 +0,0 @@ -CREATE TABLE host ( - ts timestamp(3) time index, - host STRING PRIMARY KEY, - val DOUBLE, -); - -INSERT INTO TABLE host VALUES - (0, 'host1', 0.0), - (5000, 'host1', 1.0), - (10000, 'host1', 2.0), - (15000, 'host1', 3.0), - (20000, 'host1', 4.0), - (25000, 'host1', 5.0), - (30000, 'host1', 6.0), - (35000, 'host1', 7.0), - (40000, 'host1', 8.0), - (0, 'host2', 9.0), - (5000, 'host2', 10.0), - (10000, 'host2', 11.0), - (15000, 'host2', 12.0), - (20000, 'host2', 13.0), - (25000, 'host2', 14.0), - (30000, 'host2', 15.0), - (35000, 'host2', 16.0), - (40000, 'host2', 17.0); - -SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, host, min(val / 2.0)/2 RANGE '10s', max(val / 2.0)/2 RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, covar(val, val) RANGE '10s', host FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT covar(ceil(val), floor(val)) RANGE '10s', ts, host FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, host, covar((sin(val) + cos(val))/2.0 + 1.0, 2.0) RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, min(val) RANGE '10s', host, max(val) RANGE '10s' FROM host ALIGN '1000s' ORDER BY host, ts; - -SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, host, (min(val)+max(val))/4 RANGE '10s' FROM host ALIGN '5s' ORDER BY host, ts; - -SELECT ts, host, foo FROM (SELECT ts, host, (min(val)+max(val))/4 RANGE '10s' AS foo FROM host ALIGN '5s' ORDER BY host, ts) WHERE foo > 5 ORDER BY host, ts; - -SELECT ts, b, (min(c)+max(c))/4 RANGE '10s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE val > 8.0) ALIGN '5s' BY (b) ORDER BY b, ts; - --- Test Invalid cases - --- 1. error timestamp - -SELECT min(val) RANGE 'not_time' FROM host ALIGN '5s'; - -SELECT min(val) RANGE '5s' FROM host ALIGN 'not_time'; - --- 2.1 no range param - -SELECT min(val) FROM host ALIGN '5s'; - -SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s'; - --- 2.2 no align param - -SELECT min(val) RANGE '5s' FROM host; - -DROP TABLE host; - -CREATE TABLE host_sec ( - ts timestamp(0) time index, - host STRING PRIMARY KEY, - val DOUBLE, -); - -INSERT INTO TABLE host_sec VALUES - (0, 'host1', 0.0), - (5, 'host1', 1.0), - (10, 'host1', 2.0), - (15, 'host1', 3.0), - (20, 'host1', 4.0), - (25, 'host1', 5.0), - (30, 'host1', 6.0), - (35, 'host1', 7.0), - (40, 'host1', 8.0), - (0, 'host2', 9.0), - (5, 'host2', 10.0), - (10, 'host2', 11.0), - (15, 'host2', 12.0), - (20, 'host2', 13.0), - (25, 'host2', 14.0), - (30, 'host2', 15.0), - (35, 'host2', 16.0), - (40, 'host2', 17.0); - --- TODO(ruihang): This query returns incorrect result. --- SELECT ts, host, min(val) RANGE '10s', max(val) RANGE '10s' FROM host_sec ALIGN '5s' ORDER BY host, ts; - -DROP TABLE host_sec;