From 4db0bed0bd926c0d0aaf28eb10ddfb7b51793c11 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 23 Oct 2024 13:13:25 +0800 Subject: [PATCH] fix(optimizer): apply join transpose rule (#19060) --- .../input/subquery_expr_correlated.yaml | 30 ++++++++ .../output/subquery_expr_correlated.yaml | 71 +++++++++++++++++++ .../src/optimizer/logical_optimization.rs | 14 +--- .../rule/apply_join_transpose_rule.rs | 16 +++-- .../rule/project_join_separate_rule.rs | 2 +- 5 files changed, 115 insertions(+), 18 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml index f35703128ae7d..7c87713e85c20 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery_expr_correlated.yaml @@ -554,3 +554,33 @@ expected_outputs: - batch_plan - stream_plan +- name: improve join transpose rule to handle join with output indices + sql: | + with rawdata as ( + select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson + union all + select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb + ) + select + array( + select + array( + select 1 + from jsonb_array_elements( + case + when jsonb_typeof(o.value->'value') = 'array' then o.value->'value' + else '[]'::jsonb + end + ) with ordinality as x(v, i), jsonb_each ( + case + when jsonb_typeof(v) = 'object' then v + else '{}'::jsonb + end + ) as j + ) + from jsonb_each(rawjson) AS o + where jsonb_typeof(o.value) = 'object' and o.value ? 'value' + ) as kv + from rawdata + expected_outputs: + - optimized_logical_plan_for_batch diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 09905835f2009..07399b433c0f6 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -2122,3 +2122,74 @@ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t6.l) } └─StreamTableScan { table: t6, columns: [t6.k, t6.l, t6._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t6._row_id], pk: [_row_id], dist: UpstreamHashShard(t6._row_id) } +- name: improve join transpose rule to handle join with output indices + sql: | + with rawdata as ( + select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson + union all + select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb + ) + select + array( + select + array( + select 1 + from jsonb_array_elements( + case + when jsonb_typeof(o.value->'value') = 'array' then o.value->'value' + else '[]'::jsonb + end + ) with ordinality as x(v, i), jsonb_each ( + case + when jsonb_typeof(v) = 'object' then v + else '{}'::jsonb + end + ) as j + ) + from jsonb_each(rawjson) AS o + where jsonb_typeof(o.value) = 'object' and o.value ? 'value' + ) as kv + from rawdata + optimized_logical_plan_for_batch: |- + LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: [$expr6] } + ├─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Coalesce(array_agg($expr5) filter(IsNotNull(1:Int32)), ARRAY[]:List(List(Int32))) as $expr6] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [array_agg($expr5) filter(IsNotNull(1:Int32))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] } + ├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr1, $expr2) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5] } + ├─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr1] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, Coalesce(array_agg(1:Int32) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr5] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [array_agg(1:Int32) filter(IsNotNull(1:Int32))] } + └─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr2, $expr3) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, 1:Int32, 1:Int32] } + ├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [] } + │ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr2] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3, 1:Int32, 1:Int32] } + └─LogicalJoin { type: Inner, on: IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom($expr3, $expr4) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3] } + ├─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3], aggs: [] } + │ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr3] } + │ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + │ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + │ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + │ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } + └─LogicalProjectSet { select_list: [$0, $1, $2, $3, JsonbEach(Case((JsonbTypeof($3) = 'object':Varchar), $3, '{}':Jsonb))] } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))], aggs: [] } + └─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4], aggs: [] } + └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr4] } + └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) } + └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] } + └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] } + └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..84a49d705ba7e 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -166,12 +166,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock = LazyLock::new(|| { OptimizationStage::new( "General Unnesting(Translate Apply)", - vec![ - TranslateApplyRule::create(true), - // Separate the project from a join if necessary because `ApplyJoinTransposeRule` - // can't handle a join with `output_indices`. - ProjectJoinSeparateRule::create(), - ], + vec![TranslateApplyRule::create(true)], ApplyOrder::TopDown, ) }); @@ -180,12 +175,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock LazyLock::new(|| { OptimizationStage::new( "General Unnesting(Translate Apply)", - vec![ - TranslateApplyRule::create(false), - // Separate the project from a join if necessary because `ApplyJoinTransposeRule` - // can't handle a join with `output_indices`. - ProjectJoinSeparateRule::create(), - ], + vec![TranslateApplyRule::create(false)], ApplyOrder::TopDown, ) }); diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 3da0348936238..6e94ec926c4b7 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -24,7 +24,9 @@ use crate::expr::{ InputRef, }; use crate::optimizer::plan_node::generic::GenericPlanRef; -use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary}; +use crate::optimizer::plan_node::{ + LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNode, PlanTreeNodeBinary, +}; use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder}; use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter; use crate::optimizer::PlanRef; @@ -122,10 +124,14 @@ impl Rule for ApplyJoinTransposeRule { return None; } - assert!( - join.output_indices_are_trivial(), - "ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule" - ); + // ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule. + // As this rule will be applied until we reach the fixed point, if the join has output indices, apply ProjectJoinSeparateRule first and return is safety. + if !join.output_indices_are_trivial() { + let new_apply_right = crate::optimizer::rule::ProjectJoinSeparateRule::create() + .apply(join.clone().into()) + .unwrap(); + return Some(apply.clone_with_inputs(&[apply_left, new_apply_right])); + } let (push_left, push_right) = match join.join_type() { // `LeftSemi`, `LeftAnti`, `LeftOuter` can only push to left side if it's right side has diff --git a/src/frontend/src/optimizer/rule/project_join_separate_rule.rs b/src/frontend/src/optimizer/rule/project_join_separate_rule.rs index 2cbdd143a6669..ce144e341ed24 100644 --- a/src/frontend/src/optimizer/rule/project_join_separate_rule.rs +++ b/src/frontend/src/optimizer/rule/project_join_separate_rule.rs @@ -25,7 +25,7 @@ impl ProjectJoinSeparateRule { impl Rule for ProjectJoinSeparateRule { fn apply(&self, plan: PlanRef) -> Option { let join = plan.as_logical_join()?; - if join.is_full_out() { + if join.output_indices_are_trivial() { None } else { let (left, right, on, join_type, output_indices) = join.clone().decompose();