Skip to content

Commit

Permalink
produce vector plan
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Dec 28, 2023
1 parent b58296d commit 915de4e
Showing 1 changed file with 95 additions and 50 deletions.
145 changes: 95 additions & 50 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use datatypes::arrow::datatypes::DataType as ArrowDataType;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::{
token, AggregateExpr, BinModifier, BinaryExpr as PromBinaryExpr, Call, EvalStmt,
Expr as PromExpr, Function, LabelModifier, MatrixSelector, NumberLiteral, Offset, ParenExpr,
StringLiteral, SubqueryExpr, TokenType, UnaryExpr, VectorMatchCardinality, VectorSelector,
Expr as PromExpr, Function, FunctionArgs as PromFunctionArgs, LabelModifier, MatrixSelector,
NumberLiteral, Offset, ParenExpr, StringLiteral, SubqueryExpr, TokenType, UnaryExpr,
VectorMatchCardinality, VectorSelector,
};
use snafu::{ensure, OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
Expand All @@ -63,6 +64,8 @@ use crate::functions::{
const SPECIAL_TIME_FUNCTION: &str = "time";
/// `histogram_quantile` function in PromQL
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
/// `vector` function in PromQL
const SPECIAL_VECTOR_FUNCTION: &str = "vector";
/// `le` column for conventional histogram.
const LE_COLUMN_NAME: &str = "le";

Expand Down Expand Up @@ -463,57 +466,14 @@ impl PromPlanner {
})
}
PromExpr::Call(Call { func, args }) => {
// some special functions that are not expression but a plan
if func.name == SPECIAL_HISTOGRAM_QUANTILE {
if args.args.len() != 2 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
.fail();
}
let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(input).await?;

if !self.ctx.has_le_tag() {
return ColumnNotFoundSnafu {
col: LE_COLUMN_NAME.to_string(),
}
.fail();
}
let time_index_column =
self.ctx.time_index_column.clone().with_context(|| {
TimeIndexNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap_or_default(),
}
})?;
// FIXME(ruihang): support multi fields
let field_column = self
.ctx
.field_columns
.first()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
})?
.clone();

return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
HistogramFold::new(
LE_COLUMN_NAME.to_string(),
field_column,
time_index_column,
phi,
input_plan,
)
.context(DataFusionPlanningSnafu)?,
),
}));
return self.create_histogram_plan(args).await;
} else if func.name == SPECIAL_VECTOR_FUNCTION {
return self.create_vector_plan(args).await;
}

// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(prom_expr).await?
Expand Down Expand Up @@ -1303,6 +1263,91 @@ impl PromPlanner {
Ok(exprs)
}

/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
async fn create_histogram_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
if args.args.len() != 2 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
.fail();
}
let phi = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
}
})?;
let input = args.args[1].as_ref().clone();
let input_plan = self.prom_expr_to_plan(input).await?;

if !self.ctx.has_le_tag() {
return ColumnNotFoundSnafu {
col: LE_COLUMN_NAME.to_string(),
}
.fail();
}
let time_index_column =
self.ctx
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap_or_default(),
})?;
// FIXME(ruihang): support multi fields
let field_column = self
.ctx
.field_columns
.first()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_HISTOGRAM_QUANTILE.to_string(),
})?
.clone();

Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
HistogramFold::new(
LE_COLUMN_NAME.to_string(),
field_column,
time_index_column,
phi,
input_plan,
)
.context(DataFusionPlanningSnafu)?,
),
}))
}

/// Create a [SPECIAL_VECTOR_FUNCTION] plan
async fn create_vector_plan(&mut self, args: &PromFunctionArgs) -> Result<LogicalPlan> {
if args.args.len() != 1 {
return FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
}
.fail();
}
let lit = Self::try_build_float_literal(&args.args[0]).with_context(|| {
FunctionInvalidArgumentSnafu {
fn_name: SPECIAL_VECTOR_FUNCTION.to_string(),
}
})?;

// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.table_name = Some(String::new());
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
)
.context(DataFusionPlanningSnafu)?,
),
}))
}

/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
Expand Down

0 comments on commit 915de4e

Please sign in to comment.