Skip to content

Commit

Permalink
feat(optimizer): improve inline now proc time (#13609)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 24, 2023
1 parent bf3975f commit 937e099
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@
sql: |
with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t;
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < $expr10009)) as $expr1, Some((1:Int32 < $expr10009)) as $expr2] }
BatchProject { exprs: [All((1:Int32 < $expr10006)) as $expr1, Some((1:Int32 < $expr10006)) as $expr2] }
└─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] }
14 changes: 1 addition & 13 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Inline Now and ProcTime:
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Predicate Push Down:
LogicalProject { exprs: [1:Int32] }
Expand Down Expand Up @@ -55,7 +50,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10016,
"plan_node_id": 10013,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down Expand Up @@ -97,13 +92,6 @@
explain_output: |+
Begin:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
└─LogicalScan { table: t2, columns: [v2, _row_id] }
Inline Now and ProcTime:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10009, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand All @@ -473,7 +473,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10009, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use function_call::{is_row_function, FunctionCall, FunctionCallDisplay};
pub use function_call_with_lambda::FunctionCallWithLambda;
pub use input_ref::{input_ref_to_column_indices, InputRef, InputRefDisplay};
pub use literal::Literal;
pub use now::{InlineNowProcTime, Now};
pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
pub use parameter::Parameter;
pub use pure::*;
pub use risingwave_pb::expr::expr_node::Type as ExprType;
Expand Down
30 changes: 30 additions & 0 deletions src/frontend/src/expr/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::expr::expr_node::{self, NowRexNode};
use risingwave_pb::expr::ExprNode;

use super::{Expr, ExprImpl, ExprRewriter, FunctionCall, Literal};
use crate::expr::ExprVisitor;

/// The `NOW()` function.
/// - in streaming queries, it represents a retractable monotonic timestamp stream,
Expand Down Expand Up @@ -81,3 +82,32 @@ impl ExprRewriter for InlineNowProcTime {
FunctionCall::new_unchecked(func_type, inputs, ret).into()
}
}

#[derive(Default)]
pub struct NowProcTimeFinder {
has: bool,
}

impl NowProcTimeFinder {
pub fn has(&self) -> bool {
self.has
}
}

impl ExprVisitor for NowProcTimeFinder {
fn visit_now(&mut self, _: &Now) {
self.has = true;
}

fn visit_function_call(&mut self, func_call: &FunctionCall) {
if let expr_node::Type::Proctime = func_call.func_type {
self.has = true;
return;
}

func_call
.inputs()
.iter()
.for_each(|expr| self.visit_expr(expr));
}
}
17 changes: 13 additions & 4 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result};

use super::plan_node::RewriteExprsRecursive;
use crate::expr::InlineNowProcTime;
use crate::expr::{InlineNowProcTime, NowProcTimeFinder};
use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use crate::optimizer::plan_node::{ColumnPruningContext, PredicatePushdownContext};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive,
};
use crate::optimizer::plan_rewriter::ShareSourceRewriter;
#[cfg(debug_assertions)]
use crate::optimizer::plan_visitor::InputRefValidator;
Expand Down Expand Up @@ -487,10 +489,17 @@ impl LogicalOptimizer {
}

pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
// TODO: if there's no `NOW()` or `PROCTIME()`, we don't need to acquire snapshot.
// If now() and proctime() are no found, bail out.
let mut v = NowProcTimeFinder::default();
plan.visit_exprs_recursive(&mut v);
if !v.has() {
return plan;
}

let epoch = ctx.session_ctx().pinned_snapshot().epoch();
let mut v = InlineNowProcTime::new(epoch);

let plan = plan.rewrite_exprs_recursive(&mut InlineNowProcTime::new(epoch));
let plan = plan.rewrite_exprs_recursive(&mut v);

if ctx.is_explain_trace() {
ctx.trace("Inline Now and ProcTime:");
Expand Down

0 comments on commit 937e099

Please sign in to comment.