From 937e099b0e4455bb319a3b8474a0c57e00f2531a Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 24 Nov 2023 13:31:18 +0800 Subject: [PATCH] feat(optimizer): improve inline now proc time (#13609) --- .../tests/testdata/output/cse_expr.yaml | 2 +- .../tests/testdata/output/explain.yaml | 14 +-------- .../tests/testdata/output/expr.yaml | 4 +-- src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/expr/now.rs | 30 +++++++++++++++++++ .../src/optimizer/logical_optimization.rs | 17 ++++++++--- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index 4682f091ad1bc..f76625e1716fb 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -84,5 +84,5 @@ sql: | with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t; batch_plan: |- - BatchProject { exprs: [All((1:Int32 < $expr10009)) as $expr1, Some((1:Int32 < $expr10009)) as $expr2] } + BatchProject { exprs: [All((1:Int32 < $expr10006)) as $expr1, Some((1:Int32 < $expr10006)) as $expr2] } └─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] } diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index e44431ffcd3d7..585d1bca15f77 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -6,11 +6,6 @@ LogicalProject { exprs: [1:Int32] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } - Inline Now and ProcTime: - - LogicalProject { exprs: [1:Int32] } - └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } - Predicate Push Down: LogicalProject { exprs: [1:Int32] } @@ -55,7 +50,7 @@ "stages": { "0": { "root": { - "plan_node_id": 10016, + "plan_node_id": 10013, "plan_node_type": "BatchValues", "schema": [ { @@ -97,13 +92,6 @@ explain_output: |+ Begin: - LogicalProject { exprs: [t1.v1, t2.v2] } - └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) } - ├─LogicalScan { table: t1, columns: [v1, _row_id] } - └─LogicalScan { table: t2, columns: [v2, _row_id] } - - Inline Now and ProcTime: - LogicalProject { exprs: [t1.v1, t2.v2] } └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) } ├─LogicalScan { table: t1, columns: [v1, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index d0ab974ff81bc..36ee6acbe9163 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -450,7 +450,7 @@ └─LogicalProject { exprs: [Array(1:Int32) as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: |- - BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] } + BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10009, ARRAY[2]:List(Int32)))) as $expr1] } └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } ├─BatchValues { rows: [[]] } └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] } @@ -473,7 +473,7 @@ └─LogicalProject { exprs: [Array(1:Int32) as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: |- - BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] } + BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10009, ARRAY[2]:List(Int32)))) as $expr1] } └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } ├─BatchValues { rows: [[]] } └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] } diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index dcac41f85070e..ae4e0f84bc07c 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -58,7 +58,7 @@ pub use function_call::{is_row_function, FunctionCall, FunctionCallDisplay}; pub use function_call_with_lambda::FunctionCallWithLambda; pub use input_ref::{input_ref_to_column_indices, InputRef, InputRefDisplay}; pub use literal::Literal; -pub use now::{InlineNowProcTime, Now}; +pub use now::{InlineNowProcTime, Now, NowProcTimeFinder}; pub use parameter::Parameter; pub use pure::*; pub use risingwave_pb::expr::expr_node::Type as ExprType; diff --git a/src/frontend/src/expr/now.rs b/src/frontend/src/expr/now.rs index ee174883e9d7f..e43b16847cfde 100644 --- a/src/frontend/src/expr/now.rs +++ b/src/frontend/src/expr/now.rs @@ -18,6 +18,7 @@ use risingwave_pb::expr::expr_node::{self, NowRexNode}; use risingwave_pb::expr::ExprNode; use super::{Expr, ExprImpl, ExprRewriter, FunctionCall, Literal}; +use crate::expr::ExprVisitor; /// The `NOW()` function. /// - in streaming queries, it represents a retractable monotonic timestamp stream, @@ -81,3 +82,32 @@ impl ExprRewriter for InlineNowProcTime { FunctionCall::new_unchecked(func_type, inputs, ret).into() } } + +#[derive(Default)] +pub struct NowProcTimeFinder { + has: bool, +} + +impl NowProcTimeFinder { + pub fn has(&self) -> bool { + self.has + } +} + +impl ExprVisitor for NowProcTimeFinder { + fn visit_now(&mut self, _: &Now) { + self.has = true; + } + + fn visit_function_call(&mut self, func_call: &FunctionCall) { + if let expr_node::Type::Proctime = func_call.func_type { + self.has = true; + return; + } + + func_call + .inputs() + .iter() + .for_each(|expr| self.visit_expr(expr)); + } +} diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index df5ee593af8c8..abb007355815f 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -16,9 +16,11 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; use super::plan_node::RewriteExprsRecursive; -use crate::expr::InlineNowProcTime; +use crate::expr::{InlineNowProcTime, NowProcTimeFinder}; use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer}; -use crate::optimizer::plan_node::{ColumnPruningContext, PredicatePushdownContext}; +use crate::optimizer::plan_node::{ + ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive, +}; use crate::optimizer::plan_rewriter::ShareSourceRewriter; #[cfg(debug_assertions)] use crate::optimizer::plan_visitor::InputRefValidator; @@ -487,10 +489,17 @@ impl LogicalOptimizer { } pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef { - // TODO: if there's no `NOW()` or `PROCTIME()`, we don't need to acquire snapshot. + // If now() and proctime() are no found, bail out. + let mut v = NowProcTimeFinder::default(); + plan.visit_exprs_recursive(&mut v); + if !v.has() { + return plan; + } + let epoch = ctx.session_ctx().pinned_snapshot().epoch(); + let mut v = InlineNowProcTime::new(epoch); - let plan = plan.rewrite_exprs_recursive(&mut InlineNowProcTime::new(epoch)); + let plan = plan.rewrite_exprs_recursive(&mut v); if ctx.is_explain_trace() { ctx.trace("Inline Now and ProcTime:");