Skip to content

Commit

Permalink
feat(optimizer): improve inline session timezone in exprs (#13640)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 24, 2023
1 parent ac2a842 commit 7b21e04
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10013,
"plan_node_id": 10010,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@
JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10022(hidden), side_input.key(hidden)] }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub use now::{InlineNowProcTime, Now, NowProcTimeFinder};
pub use parameter::Parameter;
pub use pure::*;
pub use risingwave_pb::expr::expr_node::Type as ExprType;
pub use session_timezone::SessionTimezone;
pub use session_timezone::{SessionTimezone, TimestamptzExprFinder};
pub use subquery::{Subquery, SubqueryKind};
pub use table_function::{TableFunction, TableFunctionType};
pub use type_inference::{
Expand Down
34 changes: 33 additions & 1 deletion src/frontend/src/expr/session_timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;

pub use crate::expr::expr_rewriter::ExprRewriter;
pub use crate::expr::function_call::FunctionCall;
use crate::expr::{Expr, ExprImpl};
use crate::expr::{Expr, ExprImpl, ExprVisitor};
use crate::session::current;

/// `SessionTimezone` will be used to resolve session
Expand Down Expand Up @@ -264,3 +264,35 @@ impl SessionTimezone {
.into()
}
}

#[derive(Default)]
pub struct TimestamptzExprFinder {
has: bool,
}

impl TimestamptzExprFinder {
pub fn has(&self) -> bool {
self.has
}
}

impl ExprVisitor for TimestamptzExprFinder {
fn visit_function_call(&mut self, func_call: &FunctionCall) {
if func_call.return_type() == DataType::Timestamptz {
self.has = true;
return;
}

for input in &func_call.inputs {
if input.return_type() == DataType::Timestamptz {
self.has = true;
return;
}
}

func_call
.inputs()
.iter()
.for_each(|expr| self.visit_expr(expr));
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ impl LogicalOptimizer {
}

pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef {
// If now() and proctime() are no found, bail out.
// If now() and proctime() are not found, bail out.
let mut v = NowProcTimeFinder::default();
plan.visit_exprs_recursive(&mut v);
if !v.has() {
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ use self::plan_visitor::{has_batch_exchange, CardinalityVisitor};
use self::property::{Cardinality, RequiredDist};
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::TimestamptzExprFinder;
use crate::optimizer::plan_node::generic::Union;
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
ToStream,
ToStream, VisitExprsRecursive,
};
use crate::optimizer::plan_visitor::TemporalJoinValidator;
use crate::optimizer::property::Distribution;
Expand Down Expand Up @@ -739,8 +740,13 @@ fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
}

fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut());
Ok(plan)
let mut v = TimestamptzExprFinder::default();
plan.visit_exprs_recursive(&mut v);
if v.has() {
Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
} else {
Ok(plan)
}
}

fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
Expand Down

0 comments on commit 7b21e04

Please sign in to comment.