Skip to content

Commit

Permalink
work with OR
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Dec 28, 2023
1 parent 915de4e commit a0ca568
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 173 deletions.
4 changes: 2 additions & 2 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ impl EmptyMetric {
.map(|expr| {
physical_planner.create_physical_expr(
expr,
&self.result_schema,
&ArrowSchema::from(self.result_schema.as_ref()),
&self.time_index_schema,
&ArrowSchema::from(self.time_index_schema.as_ref()),
session_state,
)
})
Expand Down
88 changes: 72 additions & 16 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const DEFAULT_FIELD_COLUMN: &str = "value";
/// Special modifier to project field columns under multi-field mode
const FIELD_COLUMN_MATCHER: &str = "__field__";

/// default value column name for prometheus metrics.
/// This is the same with the constant defined in `servers`
const GREPTIME_VALUE: &str = "greptime_value";

#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
Expand Down Expand Up @@ -292,13 +296,13 @@ impl PromPlanner {
let left_field_columns = self.ctx.field_columns.clone();
let left_table_ref: OwnedTableReference =
self.ctx.table_name.clone().unwrap_or_default().into();
let left_tag_cols = self.ctx.tag_columns.clone();
let left_context = self.ctx.clone();

let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
let right_field_columns = self.ctx.field_columns.clone();
let right_table_ref: OwnedTableReference =
self.ctx.table_name.clone().unwrap_or_default().into();
let right_tag_cols = self.ctx.tag_columns.clone();
let right_context = self.ctx.clone();

// TODO(ruihang): avoid join if left and right are the same table

Expand All @@ -307,8 +311,8 @@ impl PromPlanner {
return self.set_op_on_non_field_columns(
left_input,
right_input,
left_tag_cols,
right_tag_cols,
left_context,
right_context,
*op,
modifier,
);
Expand Down Expand Up @@ -1333,14 +1337,16 @@ impl PromPlanner {
// reuse `SPECIAL_TIME_FUNCTION` as name of time index column
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.table_name = Some(String::new());
self.ctx.tag_columns = vec![];
self.ctx.field_columns = vec![GREPTIME_VALUE.to_string()];
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
self.ctx.start,
self.ctx.end,
self.ctx.interval,
SPECIAL_TIME_FUNCTION.to_string(),
DEFAULT_FIELD_COLUMN.to_string(),
GREPTIME_VALUE.to_string(),
Some(DfExpr::Literal(ScalarValue::Float64(Some(lit)))),
)
.context(DataFusionPlanningSnafu)?,
Expand Down Expand Up @@ -1536,19 +1542,35 @@ impl PromPlanner {

/// Build a set operator (AND/OR/UNLESS)
fn set_op_on_non_field_columns(
&self,
&mut self,
left: LogicalPlan,
right: LogicalPlan,
left_tag_cols: Vec<String>,
right_tag_cols: Vec<String>,
left_context: PromPlannerContext,
right_context: PromPlannerContext,
op: TokenType,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let mut left_tag_col_set = left_tag_cols.into_iter().collect::<HashSet<_>>();
let mut right_tag_col_set = right_tag_cols.into_iter().collect::<HashSet<_>>();
let mut left_tag_col_set = left_context
.tag_columns
.iter()
.cloned()
.collect::<HashSet<_>>();
let mut right_tag_col_set = right_context
.tag_columns
.iter()
.cloned()
.collect::<HashSet<_>>();

if matches!(op.id(), token::T_LOR) {
return self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier);
return self.or_operator(
left,
right,
left_tag_col_set,
right_tag_col_set,
left_context,
right_context,
modifier,
);
}

// apply modifier
Expand Down Expand Up @@ -1630,19 +1652,22 @@ impl PromPlanner {
.build()
.context(DataFusionPlanningSnafu),
token::T_LOR => {
self.or_operator(left, right, left_tag_col_set, right_tag_col_set, modifier)
unreachable!()
}
_ => UnexpectedTokenSnafu { token: op }.fail(),
}
}

// TODO(ruihang): change function name
#[allow(clippy::too_many_arguments)]
fn or_operator(
&self,
&mut self,
left: LogicalPlan,
right: LogicalPlan,
left_tag_cols_set: HashSet<String>,
right_tag_cols_set: HashSet<String>,
left_context: PromPlannerContext,
right_context: PromPlannerContext,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
// prepare hash sets
Expand All @@ -1668,18 +1693,37 @@ impl PromPlanner {
.as_ref()
.map(|r| r.to_string())
.unwrap_or_default();
let left_time_index_column =
left_context
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu {
table: left_qualifier_string.clone(),
})?;
let right_time_index_column =
right_context
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu {
table: right_qualifier_string.clone(),
})?;

// step 0: fill all columns in output schema
let all_columns_set = left
let mut all_columns_set = left
.schema()
.fields()
.iter()
.chain(right.schema().fields().iter())
.map(|field| field.name().clone())
.collect::<HashSet<_>>();
// remove time index column
all_columns_set.remove(&left_time_index_column);
all_columns_set.remove(&right_time_index_column);
let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
// sort to ensure the generated schema is not volatile
all_columns.sort_unstable();
// use left time index column name as the result time index column name
all_columns.insert(0, left_time_index_column.clone());

// step 1: align schema using project, fill non-exist columns with null
let left_proj_exprs = all_columns.iter().map(|col| {
Expand All @@ -1689,13 +1733,21 @@ impl PromPlanner {
DfExpr::Column(Column::new(left_qualifier.clone(), col))
}
});
let right_proj_exprs = all_columns.iter().map(|col| {
let right_time_index_expr = DfExpr::Column(Column::new(
right_qualifier.clone(),
right_time_index_column,
))
.alias(left_time_index_column.clone());
let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
if tags_not_in_right.contains(col) {
DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
} else {
DfExpr::Column(Column::new(right_qualifier.clone(), col))
}
});
let right_proj_exprs = [right_time_index_expr]
.into_iter()
.chain(right_proj_exprs_without_time_index);

let left_projected = LogicalPlanBuilder::from(left)
.project(left_proj_exprs)
Expand Down Expand Up @@ -1736,13 +1788,17 @@ impl PromPlanner {
left_projected,
right_projected,
match_columns,
self.ctx.time_index_column.clone().unwrap(),
left_time_index_column.clone(),
schema,
);
let result = LogicalPlan::Extension(Extension {
node: Arc::new(union_distinct_on),
});

// step 4: update context
self.ctx.time_index_column = Some(left_time_index_column);
self.ctx.tag_columns = all_tags.into_iter().collect();

Ok(result)
}

Expand Down
Loading

0 comments on commit a0ca568

Please sign in to comment.