Skip to content

Commit

Permalink
perf(stream): set noop_update_hint when jsonb access exists (#18065) (
Browse files Browse the repository at this point in the history
#18412)

Co-authored-by: Richard Chien <[email protected]>
  • Loading branch information
zwang28 and stdrc authored Sep 5, 2024
1 parent c512972 commit 01b7398
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/cse_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [JsonbAccess($expr1, 'c':Varchar) as $expr2, JsonbAccess($expr1, 'b':Varchar) as $expr3, t._row_id], noop_update_hint: true }
└─StreamProject { exprs: [t.v1, JsonbAccess(t.v1, 'a':Varchar) as $expr1, t._row_id], noop_update_hint: true }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression extract2
sql: |
Expand All @@ -25,7 +25,7 @@
stream_plan: |-
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [$expr1, $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id] }
└─StreamProject { exprs: [t.v1, JsonbAccess(JsonbAccess(t.v1, 'a':Varchar), 'c':Varchar) as $expr1, t._row_id], noop_update_hint: true }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: Common sub expression shouldn't extract impure function
sql: |
Expand Down
35 changes: 35 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,41 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
})
.collect::<Option<Vec<_>>>()
}

pub(crate) fn likely_produces_noop_updates(&self) -> bool {
struct HasJsonbAccess {
has: bool,
}

impl ExprVisitor for HasJsonbAccess {
fn visit_function_call(&mut self, func_call: &FunctionCall) {
if matches!(
func_call.func_type(),
ExprType::JsonbAccess
| ExprType::JsonbAccessStr
| ExprType::JsonbExtractPath
| ExprType::JsonbExtractPathVariadic
| ExprType::JsonbExtractPathText
| ExprType::JsonbExtractPathTextVariadic
| ExprType::JsonbPathExists
| ExprType::JsonbPathMatch
| ExprType::JsonbPathQueryArray
| ExprType::JsonbPathQueryFirst
) {
self.has = true;
}
}
}

self.exprs.iter().any(|expr| {
// When there's a jsonb access in the `Project`, it's very likely that the query is
// extracting some fields from a jsonb payload column. In this case, a change from the
// input jsonb payload may not change the output of the `Project`.
let mut visitor = HasJsonbAccess { has: false };
visitor.visit_expr(expr);
visitor.has
})
}
}

/// Construct a `Project` and dedup expressions.
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ impl Distill for StreamProject {

impl StreamProject {
pub fn new(core: generic::Project<PlanRef>) -> Self {
Self::new_inner(core, false)
let noop_update_hint = core.likely_produces_noop_updates();
Self::new_inner(core, noop_update_hint)
}

/// Set the `noop_update_hint` flag to the given value.
Expand Down

0 comments on commit 01b7398

Please sign in to comment.