From 01b73988d32bf7b42727d0880c1ffe1d37608165 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 5 Sep 2024 11:25:09 +0800 Subject: [PATCH] perf(stream): set `noop_update_hint` when jsonb access exists (#18065) (#18412) Co-authored-by: Richard Chien --- .../tests/testdata/output/cse_expr.yaml | 6 ++-- .../optimizer/plan_node/generic/project.rs | 35 +++++++++++++++++++ .../src/optimizer/plan_node/stream_project.rs | 3 +- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index 0e5d72b3499a..abbc0aae184e 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -10,8 +10,8 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } - └─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id] } - └─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id] } + └─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id], noop_update_hint: true } + └─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id], noop_update_hint: true } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression extract2 sql: | @@ -25,7 +25,7 @@ stream_plan: |- StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [$expr1, $expr1, t._row_id] } - └─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] } + └─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id], noop_update_hint: true } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: Common sub expression shouldn't extract impure function sql: | diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 68c652f0e006..5d658d404db3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -296,6 +296,41 @@ impl Project { }) .collect::>>() } + + pub(crate) fn likely_produces_noop_updates(&self) -> bool { + struct HasJsonbAccess { + has: bool, + } + + impl ExprVisitor for HasJsonbAccess { + fn visit_function_call(&mut self, func_call: &FunctionCall) { + if matches!( + func_call.func_type(), + ExprType::JsonbAccess + | ExprType::JsonbAccessStr + | ExprType::JsonbExtractPath + | ExprType::JsonbExtractPathVariadic + | ExprType::JsonbExtractPathText + | ExprType::JsonbExtractPathTextVariadic + | ExprType::JsonbPathExists + | ExprType::JsonbPathMatch + | ExprType::JsonbPathQueryArray + | ExprType::JsonbPathQueryFirst + ) { + self.has = true; + } + } + } + + self.exprs.iter().any(|expr| { + // When there's a jsonb access in the `Project`, it's very likely that the query is + // extracting some fields from a jsonb payload column. In this case, a change from the + // input jsonb payload may not change the output of the `Project`. + let mut visitor = HasJsonbAccess { has: false }; + visitor.visit_expr(expr); + visitor.has + }) + } } /// Construct a `Project` and dedup expressions. diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index ef879627c66b..d6ac8af7b146 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -62,7 +62,8 @@ impl Distill for StreamProject { impl StreamProject { pub fn new(core: generic::Project) -> Self { - Self::new_inner(core, false) + let noop_update_hint = core.likely_produces_noop_updates(); + Self::new_inner(core, noop_update_hint) } /// Set the `noop_update_hint` flag to the given value.