From fd975e025e2f9964970a35afb186a0e2e336493c Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 23 Oct 2024 13:13:25 +0800 Subject: [PATCH 1/2] fix(optimizer): apply join transpose rule (#19060) --- .../input/subquery_expr_correlated.yaml | 30 ++++++++ .../output/subquery_expr_correlated.yaml | 71 +++++++++++++++++++ .../src/optimizer/logical_optimization.rs | 14 +--- .../rule/apply_join_transpose_rule.rs | 16 +++-- .../rule/project_join_separate_rule.rs | 2 +- 5 files changed, 115 insertions(+), 18 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml index f35703128ae7d..7c87713e85c20 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml @@ -554,3 +554,33 @@ expected_outputs: - batch_plan - stream_plan +- name: improve join transpose rule to handle join with output indices + sql: | + with rawdata as ( + select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson + union all + select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb + ) + select + array( + select + array( + select 1 + from jsonb_array_elements( + case + when jsonb_typeof(o.value->'value') = 'array' then o.value->'value' + else '[]'::jsonb + end + ) with ordinality as x(v, i), jsonb_each ( + case + when jsonb_typeof(v) = 'object' then v + else '{}'::jsonb + end + ) as j + ) + from jsonb_each(rawjson) AS o + where jsonb_typeof(o.value) = 'object' and o.value ? 'value' + ) as kv + from rawdata + expected_outputs: + - optimized_logical_plan_for_batch diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 09905835f2009..07399b433c0f6 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -2122,3 +2122,74 @@ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t6.l) } └─StreamTableScan { table: t6, columns: [t6.k, t6.l, t6._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t6._row_id], pk: [_row_id], dist: UpstreamHashShard(t6._row_id) } +- name: improve join transpose rule to handle join with output indices + sql: | + with rawdata as ( + select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson + union all + select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb + ) + select + array( + select + array( + select 1 + from jsonb_array_elements( + case + when jsonb_typeof(o.value->'value') = 'array' then o.value->'value' + else '[]'::jsonb + end + ) with ordinality as x(v, i), jsonb_each ( + case + when jsonb_typeof(v) = 'object' then v + else '{}'::jsonb + end + ) as j + ) + from jsonb_each(rawjson) AS o + where jsonb_typeof(o.value) = 'object' and o.value ? 'value' + ) as kv + from rawdata + optimized_logical_plan_for_batch: |- + LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: [$expr6] } + ├─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Coalesce(array_agg($expr5) filter(IsNotNull(1:Int32)), ARRAY[]:List(List(Int32))) as $expr6] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [array_agg($expr5) filter(IsNotNull(1:Int32))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] } + ├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr1, $expr2) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5] } + ├─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr1] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, Coalesce(array_agg(1:Int32) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr5] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [array_agg(1:Int32) filter(IsNotNull(1:Int32))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr2, $expr3) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, 1:Int32, 1:Int32] } + ├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [] } + │ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr2] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3, 1:Int32, 1:Int32] } + └─LogicalJoin { type: Inner, on: IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom($expr3, $expr4) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3] } + ├─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3], aggs: [] } + │ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr3] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProjectSet { select_list: [$0, $1, $2, $3, JsonbEach(Case((JsonbTypeof($3) = 'object':Varchar), $3, '{}':Jsonb))] } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))], aggs: [] } + └─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4], aggs: [] } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr4] } + └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 8a508c577a6fe..766cde4ecfc7e 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -199,12 +199,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock = LazyLock::new(|| { OptimizationStage::new( "General Unnesting(Translate Apply)", - vec![ - TranslateApplyRule::create(true), - // Separate the project from a join if necessary because `ApplyJoinTransposeRule` - // can't handle a join with `output_indices`. - ProjectJoinSeparateRule::create(), - ], + vec![TranslateApplyRule::create(true)], ApplyOrder::TopDown, ) }); @@ -213,12 +208,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock LazyLock::new(|| { OptimizationStage::new( "General Unnesting(Translate Apply)", - vec![ - TranslateApplyRule::create(false), - // Separate the project from a join if necessary because `ApplyJoinTransposeRule` - // can't handle a join with `output_indices`. - ProjectJoinSeparateRule::create(), - ], + vec![TranslateApplyRule::create(false)], ApplyOrder::TopDown, ) }); diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 4ae41ce3bd564..9e6f7582c0590 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -24,7 +24,9 @@ use crate::expr::{ InputRef, }; use crate::optimizer::plan_node::generic::GenericPlanRef; -use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary}; +use crate::optimizer::plan_node::{ + LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNode, PlanTreeNodeBinary, +}; use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder}; use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter; use crate::optimizer::PlanRef; @@ -122,10 +124,14 @@ impl Rule for ApplyJoinTransposeRule { return None; } - assert!( - join.output_indices_are_trivial(), - "ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule" - ); + // ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule. + // As this rule will be applied until we reach the fixed point, if the join has output indices, apply ProjectJoinSeparateRule first and return is safety. + if !join.output_indices_are_trivial() { + let new_apply_right = crate::optimizer::rule::ProjectJoinSeparateRule::create() + .apply(join.clone().into()) + .unwrap(); + return Some(apply.clone_with_inputs(&[apply_left, new_apply_right])); + } let (push_left, push_right) = match join.join_type() { // `LeftSemi`, `LeftAnti`, `LeftOuter` can only push to left side if it's right side has diff --git a/src/frontend/src/optimizer/rule/project_join_separate_rule.rs b/src/frontend/src/optimizer/rule/project_join_separate_rule.rs index 2cbdd143a6669..ce144e341ed24 100644 --- a/src/frontend/src/optimizer/rule/project_join_separate_rule.rs +++ b/src/frontend/src/optimizer/rule/project_join_separate_rule.rs @@ -25,7 +25,7 @@ impl ProjectJoinSeparateRule { impl Rule for ProjectJoinSeparateRule { fn apply(&self, plan: PlanRef) -> Option { let join = plan.as_logical_join()?; - if join.is_full_out() { + if join.output_indices_are_trivial() { None } else { let (left, right, on, join_type, output_indices) = join.clone().decompose(); From e6f830b68f42022b8ee1d9e4f68836cddcb3086a Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:06:48 +0800 Subject: [PATCH 2/2] fix(dashboard): show creating job edges in bp graph (#19066) --- src/meta/service/src/stream_service.rs | 2 +- src/meta/src/controller/catalog.rs | 111 +++++++++++++++---------- src/meta/src/dashboard/mod.rs | 2 +- 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 19fafb5de74f4..4adde9de62a26 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -312,7 +312,7 @@ impl StreamManagerService for StreamServiceImpl { let dependencies = self .metadata_manager .catalog_controller - .list_object_dependencies() + .list_created_object_dependencies() .await?; Ok(Response::new(ListObjectDependenciesResponse { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 5d215344ac0ec..7ce5747f7debe 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -672,25 +672,42 @@ impl CatalogController { Ok(tables) } - pub async fn list_object_dependencies(&self) -> MetaResult> { - let inner = self.inner.read().await; + pub async fn list_all_object_dependencies(&self) -> MetaResult> { + self.list_object_dependencies(true).await + } - let dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find() - .select_only() - .columns([ - object_dependency::Column::Oid, - object_dependency::Column::UsedBy, - ]) - .join( - JoinType::InnerJoin, - object_dependency::Relation::Object1.def(), - ) - .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) - .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) - .into_tuple() - .all(&inner.db) - .await?; + pub async fn list_created_object_dependencies(&self) -> MetaResult> { + self.list_object_dependencies(false).await + } + async fn list_object_dependencies( + &self, + include_creating: bool, + ) -> MetaResult> { + let inner = self.inner.read().await; + + let dependencies: Vec<(ObjectId, ObjectId)> = { + let filter = if include_creating { + Expr::value(true) + } else { + streaming_job::Column::JobStatus.eq(JobStatus::Created) + }; + ObjectDependency::find() + .select_only() + .columns([ + object_dependency::Column::Oid, + object_dependency::Column::UsedBy, + ]) + .join( + JoinType::InnerJoin, + object_dependency::Relation::Object1.def(), + ) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) + .filter(filter) + .into_tuple() + .all(&inner.db) + .await? + }; let mut obj_dependencies = dependencies .into_iter() .map(|(oid, used_by)| PbObjectDependencies { @@ -721,20 +738,24 @@ impl CatalogController { } })); - let sink_dependencies: Vec<(SinkId, TableId)> = Sink::find() - .select_only() - .columns([sink::Column::SinkId, sink::Column::TargetTable]) - .join(JoinType::InnerJoin, sink::Relation::Object.def()) - .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) - .filter( + let sink_dependencies: Vec<(SinkId, TableId)> = { + let filter = if include_creating { + sink::Column::TargetTable.is_not_null() + } else { streaming_job::Column::JobStatus .eq(JobStatus::Created) - .and(sink::Column::TargetTable.is_not_null()), - ) - .into_tuple() - .all(&inner.db) - .await?; - + .and(sink::Column::TargetTable.is_not_null()) + }; + Sink::find() + .select_only() + .columns([sink::Column::SinkId, sink::Column::TargetTable]) + .join(JoinType::InnerJoin, sink::Relation::Object.def()) + .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) + .filter(filter) + .into_tuple() + .all(&inner.db) + .await? + }; obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| { PbObjectDependencies { object_id: table_id as _, @@ -742,22 +763,26 @@ impl CatalogController { } })); - let subscription_dependencies: Vec<(SubscriptionId, TableId)> = Subscription::find() - .select_only() - .columns([ - subscription::Column::SubscriptionId, - subscription::Column::DependentTableId, - ]) - .join(JoinType::InnerJoin, subscription::Relation::Object.def()) - .filter( + let subscription_dependencies: Vec<(SubscriptionId, TableId)> = { + let filter = if include_creating { + subscription::Column::DependentTableId.is_not_null() + } else { subscription::Column::SubscriptionState .eq(Into::::into(SubscriptionState::Created)) - .and(subscription::Column::DependentTableId.is_not_null()), - ) - .into_tuple() - .all(&inner.db) - .await?; - + .and(subscription::Column::DependentTableId.is_not_null()) + }; + Subscription::find() + .select_only() + .columns([ + subscription::Column::SubscriptionId, + subscription::Column::DependentTableId, + ]) + .join(JoinType::InnerJoin, subscription::Relation::Object.def()) + .filter(filter) + .into_tuple() + .all(&inner.db) + .await? + }; obj_dependencies.extend(subscription_dependencies.into_iter().map( |(subscription_id, table_id)| PbObjectDependencies { object_id: subscription_id as _, diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index bec4b82f59c96..2bef0505ce443 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -326,7 +326,7 @@ pub(super) mod handlers { let object_dependencies = srv .metadata_manager .catalog_controller - .list_object_dependencies() + .list_all_object_dependencies() .await .map_err(err)?;