From 915de4ee7124113268b13e96283bdce797afb027 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 28 Dec 2023 16:00:09 +0800 Subject: [PATCH] produce vector plan Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 145 +++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 50 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 7c8176d7b95e..e8d9d79bc6d5 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -36,8 +36,9 @@ use datatypes::arrow::datatypes::DataType as ArrowDataType; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::{ token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt, - Expr as PromExpr, Function, LabelModifier, MatrixSelector, NumberLiteral, Offset, ParenExpr, - StringLiteral, SubqueryExpr, TokenType, UnaryExpr, VectorMatchCardinality, VectorSelector, + Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector, + NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, TokenType, UnaryExpr, + VectorMatchCardinality, VectorSelector, }; use snafu::{ensure, OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -63,6 +64,8 @@ use crate::functions::{ const SPECIAL_TIME_FUNCTION: &str = "time"; /// `histogram_quantile` function in PromQL const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile"; +/// `vector` function in PromQL +const SPECIAL_VECTOR_FUNCTION: &str = "vector"; /// `le` column for conventional histogram. const LE_COLUMN_NAME: &str = "le"; @@ -463,57 +466,14 @@ impl PromPlanner { }) } PromExpr::Call(Call { func, args }) => { + // some special functions that are not expression but a plan if func.name == SPECIAL_HISTOGRAM_QUANTILE { - if args.args.len() != 2 { - return FunctionInvalidArgumentSnafu { - fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), - } - .fail(); - } - let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| { - FunctionInvalidArgumentSnafu { - fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), - } - })?; - let input = args.args[1].as_ref().clone(); - let input_plan = self.prom_expr_to_plan(input).await?; - - if !self.ctx.has_le_tag() { - return ColumnNotFoundSnafu { - col: LE_COLUMN_NAME.to_string(), - } - .fail(); - } - let time_index_column = - self.ctx.time_index_column.clone().with_context(|| { - TimeIndexNotFoundSnafu { - table: self.ctx.table_name.clone().unwrap_or_default(), - } - })?; - // FIXME(ruihang): support multi fields - let field_column = self - .ctx - .field_columns - .first() - .with_context(|| FunctionInvalidArgumentSnafu { - fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), - })? - .clone(); - - return Ok(LogicalPlan::Extension(Extension { - node: Arc::new( - HistogramFold::new( - LE_COLUMN_NAME.to_string(), - field_column, - time_index_column, - phi, - input_plan, - ) - .context(DataFusionPlanningSnafu)?, - ), - })); + return self.create_histogram_plan(args).await; + } else if func.name == SPECIAL_VECTOR_FUNCTION { + return self.create_vector_plan(args).await; } + // transform function arguments let args = self.create_function_args(&args.args)?; let input = if let Some(prom_expr) = args.input { self.prom_expr_to_plan(prom_expr).await? @@ -1303,6 +1263,91 @@ impl PromPlanner { Ok(exprs) } + /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan. + async fn create_histogram_plan(&mut self, args: &PromFunctionArgs) -> Result { + if args.args.len() != 2 { + return FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + } + .fail(); + } + let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| { + FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + } + })?; + let input = args.args[1].as_ref().clone(); + let input_plan = self.prom_expr_to_plan(input).await?; + + if !self.ctx.has_le_tag() { + return ColumnNotFoundSnafu { + col: LE_COLUMN_NAME.to_string(), + } + .fail(); + } + let time_index_column = + self.ctx + .time_index_column + .clone() + .with_context(|| TimeIndexNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap_or_default(), + })?; + // FIXME(ruihang): support multi fields + let field_column = self + .ctx + .field_columns + .first() + .with_context(|| FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(), + })? + .clone(); + + Ok(LogicalPlan::Extension(Extension { + node: Arc::new( + HistogramFold::new( + LE_COLUMN_NAME.to_string(), + field_column, + time_index_column, + phi, + input_plan, + ) + .context(DataFusionPlanningSnafu)?, + ), + })) + } + + /// Create a [SPECIAL_VECTOR_FUNCTION] plan + async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result { + if args.args.len() != 1 { + return FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_VECTOR_FUNCTION.to_string(), + } + .fail(); + } + let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| { + FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_VECTOR_FUNCTION.to_string(), + } + })?; + + // reuse `SPECIAL_TIME_FUNCTION` as name of time index column + self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); + self.ctx.table_name = Some(String::new()); + Ok(LogicalPlan::Extension(Extension { + node: Arc::new( + EmptyMetric::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + SPECIAL_TIME_FUNCTION.to_string(), + DEFAULT_FIELD_COLUMN.to_string(), + Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))), + ) + .context(DataFusionPlanningSnafu)?, + ), + })) + } + /// Try to build a DataFusion Literal Expression from PromQL Expr, return /// `None` if the input is not a literal expression. fn try_build_literal_expr(expr: &PromExpr) -> Option {