diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index 5c1a3cbbb3ba..1fb6c6b1ee3e 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -20,7 +20,7 @@ mod range_manipulate; mod series_divide; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; -pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream}; +pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream}; pub use planner::PromExtensionPlanner; diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs index c760312b590d..27e3d34063cb 100644 --- a/src/promql/src/extension_plan/empty_metric.rs +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -14,22 +14,23 @@ use std::any::Any; use std::collections::HashMap; +use std::ops::Div; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use datafusion::arrow::array::Float64Array; -use datafusion::arrow::datatypes::{DataType, TimeUnit}; +use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema, TimeUnit}; use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::error::DataFusionError; -use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortExpr}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + DisplayFormatType, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream, + SendableRecordBatchStream, }; -use datafusion::prelude::Expr; +use datafusion::prelude::{col, lit, Expr}; use datatypes::arrow::array::TimestampMillisecondArray; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; @@ -37,12 +38,21 @@ use futures::Stream; use crate::extension_plan::Millisecond; +/// Empty source plan that generate record batch with two columns: +/// - time index column, computed from start, end and interval +/// - value column, generated by the input expr. The expr should not +/// reference any column except the time index column. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct EmptyMetric { start: Millisecond, end: Millisecond, interval: Millisecond, - schema: DFSchemaRef, + expr: Expr, + /// Schema that only contains the time index column. + /// This is for intermediate result only. + time_index_schema: DFSchemaRef, + /// Schema of the output record batch + result_schema: DFSchemaRef, } impl EmptyMetric { @@ -52,16 +62,14 @@ impl EmptyMetric { interval: Millisecond, time_index_column_name: String, field_column_name: String, + field_expr: Expr, ) -> DataFusionResult { + let ts_only_schema = build_ts_only_schema(&time_index_column_name); + let field_data_type = field_expr.get_type(&ts_only_schema)?; let schema = Arc::new(DFSchema::new_with_metadata( vec![ - DFField::new( - Some(""), - &time_index_column_name, - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - DFField::new(Some(""), &field_column_name, DataType::Float64, true), + ts_only_schema.field(0).clone(), + DFField::new(Some(""), &field_column_name, field_data_type, true), ], HashMap::new(), )?); @@ -70,18 +78,33 @@ impl EmptyMetric { start, end, interval, - schema, + time_index_schema: Arc::new(ts_only_schema), + result_schema: schema, + expr: field_expr, }) } - pub fn to_execution_plan(&self) -> Arc { - Arc::new(EmptyMetricExec { + pub fn to_execution_plan( + &self, + session_state: &SessionState, + physical_planner: &dyn PhysicalPlanner, + ) -> DataFusionResult> { + let physical_expr = physical_planner.create_physical_expr( + &self.expr, + &self.result_schema, + &ArrowSchema::from(self.result_schema.as_ref()), + session_state, + )?; + + Ok(Arc::new(EmptyMetricExec { start: self.start, end: self.end, interval: self.interval, - schema: Arc::new(self.schema.as_ref().into()), + time_index_schema: Arc::new(self.time_index_schema.as_ref().into()), + result_schema: Arc::new(self.result_schema.as_ref().into()), + expr: physical_expr, metric: ExecutionPlanMetricsSet::new(), - }) + })) } } @@ -95,7 +118,7 @@ impl UserDefinedLogicalNodeCore for EmptyMetric { } fn schema(&self) -> &DFSchemaRef { - &self.schema + &self.result_schema } fn expressions(&self) -> Vec { @@ -120,7 +143,12 @@ pub struct EmptyMetricExec { start: Millisecond, end: Millisecond, interval: Millisecond, - schema: SchemaRef, + /// Schema that only contains the time index column. + /// This is for intermediate result only. + time_index_schema: SchemaRef, + /// Schema of the output record batch + result_schema: SchemaRef, + expr: PhysicalExprRef, metric: ExecutionPlanMetricsSet, } @@ -131,7 +159,7 @@ impl ExecutionPlan for EmptyMetricExec { } fn schema(&self) -> SchemaRef { - self.schema.clone() + self.result_schema.clone() } fn output_partitioning(&self) -> Partitioning { @@ -167,8 +195,10 @@ impl ExecutionPlan for EmptyMetricExec { start: self.start, end: self.end, interval: self.interval, + expr: self.expr.clone(), is_first_poll: true, - schema: self.schema.clone(), + time_index_schema: self.time_index_schema.clone(), + result_schema: self.result_schema.clone(), metric: baseline_metric, })) } @@ -204,15 +234,20 @@ pub struct EmptyMetricStream { start: Millisecond, end: Millisecond, interval: Millisecond, - // only generate one record batch at the first poll + expr: PhysicalExprRef, + /// This stream only generate one record batch at the first poll is_first_poll: bool, - schema: SchemaRef, + /// Schema that only contains the time index column. + /// This is for intermediate result only. + time_index_schema: SchemaRef, + /// Schema of the output record batch + result_schema: SchemaRef, metric: BaselineMetrics, } impl RecordBatchStream for EmptyMetricStream { fn schema(&self) -> SchemaRef { - self.schema.clone() + self.result_schema.clone() } } @@ -223,17 +258,28 @@ impl Stream for EmptyMetricStream { let result = if self.is_first_poll { self.is_first_poll = false; let _timer = self.metric.elapsed_compute().timer(); - let result_array = (self.start..=self.end) + + // build the time index array, and a record batch that + // only contains that array as the input of field expr + let time_array = (self.start..=self.end) .step_by(self.interval as _) .collect::>(); - let float_array = - Float64Array::from_iter(result_array.iter().map(|v| *v as f64 / 1000.0)); - let millisecond_array = TimestampMillisecondArray::from(result_array); - let batch = RecordBatch::try_new( - self.schema.clone(), - vec![Arc::new(millisecond_array), Arc::new(float_array)], - ) - .map_err(DataFusionError::ArrowError); + let time_array = Arc::new(TimestampMillisecondArray::from(time_array)); + let input_record_batch = + RecordBatch::try_new(self.time_index_schema.clone(), vec![time_array.clone()]) + .map_err(DataFusionError::ArrowError)?; + + // evaluate the field expr and get the result + let field_array = self + .expr + .evaluate(&input_record_batch)? + .into_array(time_array.len()); + + // assemble the output record batch + let batch = + RecordBatch::try_new(self.result_schema.clone(), vec![time_array, field_array]) + .map_err(DataFusionError::ArrowError); + Poll::Ready(Some(batch)) } else { Poll::Ready(None) @@ -242,8 +288,34 @@ impl Stream for EmptyMetricStream { } } +/// Build a schema that only contains **millisecond** timestamp column +fn build_ts_only_schema(column_name: &str) -> DFSchema { + let ts_field = DFField::new( + Some(""), + column_name, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ); + // safety: should not fail (UT covers this) + DFSchema::new_with_metadata(vec![ts_field], HashMap::new()).unwrap() +} + +// Convert timestamp column to UNIX epoch second: +// https://prometheus.io/docs/prometheus/latest/querying/functions/#time +pub fn build_special_time_expr(time_index_column_name: &str) -> Expr { + let input_schema = build_ts_only_schema(time_index_column_name); + // safety: should not failed (UT covers this) + col(time_index_column_name) + .cast_to(&DataType::Int64, &input_schema) + .unwrap() + .cast_to(&DataType::Float64, &input_schema) + .unwrap() + .div(lit(1000.0)) // cast to second will lost precision, so we cast to float64 first and manually divide by 1000 +} + #[cfg(test)] mod test { + use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::prelude::SessionContext; use super::*; @@ -256,11 +328,22 @@ mod test { field_column_name: String, expected: String, ) { - let empty_metric = - EmptyMetric::new(start, end, interval, time_column_name, field_column_name).unwrap(); - let empty_metric_exec = empty_metric.to_execution_plan(); - let session_context = SessionContext::default(); + let df_default_physical_planner = DefaultPhysicalPlanner::default(); + let time_expr = build_special_time_expr(&time_column_name); + let empty_metric = EmptyMetric::new( + start, + end, + interval, + time_column_name, + field_column_name, + time_expr, + ) + .unwrap(); + let empty_metric_exec = empty_metric + .to_execution_plan(&session_context.state(), &df_default_physical_planner) + .unwrap(); + let result = datafusion::physical_plan::collect(empty_metric_exec, session_context.task_ctx()) .await diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index d0c5929fd944..7d7541d640dc 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -31,11 +31,11 @@ pub struct PromExtensionPlanner; impl ExtensionPlanner for PromExtensionPlanner { async fn plan_extension( &self, - _planner: &dyn PhysicalPlanner, + planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, _logical_inputs: &[&LogicalPlan], physical_inputs: &[Arc], - _session_state: &SessionState, + session_state: &SessionState, ) -> DfResult>> { if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) @@ -46,7 +46,7 @@ impl ExtensionPlanner for PromExtensionPlanner { } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { - Ok(Some(node.to_execution_plan())) + Ok(Some(node.to_execution_plan(session_state, planner)?)) } else { Ok(None) } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 13743cd2eaf3..a8c09b927068 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -28,6 +28,7 @@ use datafusion::logical_expr::{ LogicalPlan, LogicalPlanBuilder, Operator, ScalarUDF, }; use datafusion::optimizer::utils; +use datafusion::prelude as df_prelude; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; @@ -48,7 +49,8 @@ use crate::error::{ UnsupportedExprSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu, }; use crate::extension_plan::{ - EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize, + build_special_time_expr, EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, + SeriesDivide, SeriesNormalize, }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, @@ -61,6 +63,8 @@ const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; /// `time()` function in PromQL. const SPECIAL_TIME_FUNCTION: &str = "time"; +const DEFAULT_TIME_INDEX_COLUMN: &str = "time"; + /// default value column name for empty metric const DEFAULT_FIELD_COLUMN: &str = "value"; @@ -181,11 +185,30 @@ impl PromPlanner { Self::try_build_literal_expr(lhs), Self::try_build_literal_expr(rhs), ) { - // TODO(ruihang): handle literal-only expressions - (Some(_lhs), Some(_rhs)) => UnsupportedExprSnafu { - name: "Literal-only expression", + (Some(lhs), Some(rhs)) => { + self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); + self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.table_name = Some(String::new()); + let field_expr = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(lhs), + op: Self::prom_token_to_binary_op(*op)?, + right: Box::new(rhs), + }); + + LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + field_expr, + ) + .context(DataFusionPlanningSnafu)?, + ), + }) } - .fail()?, // lhs is a literal, rhs is a column (Some(expr), None) => { let input = self.prom_expr_to_plan(*rhs.clone()).await?; @@ -283,14 +306,46 @@ impl PromPlanner { name: "Prom Subquery", } .fail()?, - PromExpr::NumberLiteral(NumberLiteral { .. }) => UnsupportedExprSnafu { - name: "Prom Number Literal", + PromExpr::NumberLiteral(NumberLiteral { val }) => { + self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); + self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.table_name = Some(String::new()); + let literal_expr = df_prelude::lit(*val); + + LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + literal_expr, + ) + .context(DataFusionPlanningSnafu)?, + ), + }) } - .fail()?, - PromExpr::StringLiteral(StringLiteral { .. }) => UnsupportedExprSnafu { - name: "Prom String Literal", + PromExpr::StringLiteral(StringLiteral { val }) => { + self.ctx.time_index_column = Some(DEFAULT_TIME_INDEX_COLUMN.to_string()); + self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; + self.ctx.table_name = Some(String::new()); + let literal_expr = df_prelude::lit(val.to_string()); + + LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + literal_expr, + ) + .context(DataFusionPlanningSnafu)?, + ), + }) } - .fail()?, PromExpr::VectorSelector(VectorSelector { name: _, offset, @@ -360,6 +415,7 @@ impl PromPlanner { self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); self.ctx.field_columns = vec![DEFAULT_FIELD_COLUMN.to_string()]; self.ctx.table_name = Some(String::new()); + let time_expr = build_special_time_expr(SPECIAL_TIME_FUNCTION); return Ok(LogicalPlan::Extension(Extension { node: Arc::new( @@ -369,6 +425,7 @@ impl PromPlanner { self.ctx.interval, SPECIAL_TIME_FUNCTION.to_string(), DEFAULT_FIELD_COLUMN.to_string(), + time_expr, ) .context(DataFusionPlanningSnafu)?, ), diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 1a1c835bec42..f54076d25c42 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -388,11 +388,13 @@ impl PromJsonResponse { .map(|(tags, mut values)| { let metric = tags.into_iter().collect(); match result_type { - Some(ValueType::Vector) => Ok(PromSeries { - metric, - value: values.pop(), - ..Default::default() - }), + Some(ValueType::Vector) | Some(ValueType::Scalar) | Some(ValueType::String) => { + Ok(PromSeries { + metric, + value: values.pop(), + ..Default::default() + }) + } Some(ValueType::Matrix) => Ok(PromSeries { metric, values, @@ -690,8 +692,8 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { } PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr), PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr), - PromqlExpr::NumberLiteral(_) => None, - PromqlExpr::StringLiteral(_) => None, + PromqlExpr::NumberLiteral(_) => Some(String::new()), + PromqlExpr::StringLiteral(_) => Some(String::new()), PromqlExpr::Extension(_) => None, PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => { matchers.find_matchers(METRIC_NAME).pop().cloned() diff --git a/tests/cases/standalone/common/tql/literal_only.result b/tests/cases/standalone/common/tql/literal_only.result new file mode 100644 index 000000000000..f30b53fda424 --- /dev/null +++ b/tests/cases/standalone/common/tql/literal_only.result @@ -0,0 +1,30 @@ +TQL EVAL (0, 10, '5s') 1; + ++---------------------+-------+ +| time | value | ++---------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 1.0 | ++---------------------+-------+ + +TQL EVAL (0, 10, '5s') 1+1; + ++---------------------+-------+ +| time | value | ++---------------------+-------+ +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 2.0 | +| 1970-01-01T00:00:10 | 2.0 | ++---------------------+-------+ + +TQL EVAL (0, 10, '5s') "1+1"; + ++---------------------+-------+ +| time | value | ++---------------------+-------+ +| 1970-01-01T00:00:00 | 1+1 | +| 1970-01-01T00:00:05 | 1+1 | +| 1970-01-01T00:00:10 | 1+1 | ++---------------------+-------+ + diff --git a/tests/cases/standalone/common/tql/literal_only.sql b/tests/cases/standalone/common/tql/literal_only.sql new file mode 100644 index 000000000000..dc353146f61e --- /dev/null +++ b/tests/cases/standalone/common/tql/literal_only.sql @@ -0,0 +1,5 @@ +TQL EVAL (0, 10, '5s') 1; + +TQL EVAL (0, 10, '5s') 1+1; + +TQL EVAL (0, 10, '5s') "1+1";