Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(optimizer): apply join transpose rule #19060

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,33 @@
expected_outputs:
- batch_plan
- stream_plan
- name: improve join transpose rule to handle join with output indices
sql: |
with rawdata as (
select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson
union all
select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb
)
select
array(
select
array(
select 1
from jsonb_array_elements(
case
when jsonb_typeof(o.value->'value') = 'array' then o.value->'value'
else '[]'::jsonb
end
) with ordinality as x(v, i), jsonb_each (
case
when jsonb_typeof(v) = 'object' then v
else '{}'::jsonb
end
) as j
)
from jsonb_each(rawjson) AS o
where jsonb_typeof(o.value) = 'object' and o.value ? 'value'
) as kv
from rawdata
expected_outputs:
- optimized_logical_plan_for_batch
Original file line number Diff line number Diff line change
Expand Up @@ -2122,3 +2122,74 @@
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t6.l) }
└─StreamTableScan { table: t6, columns: [t6.k, t6.l, t6._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t6._row_id], pk: [_row_id], dist: UpstreamHashShard(t6._row_id) }
- name: improve join transpose rule to handle join with output indices
sql: |
with rawdata as (
select 'first' as w, '{"x":{"value":123},"y":{"value":[1,2,3]},"z":{"value":[{"a":4,"b":5},{"a":6,"b":7}]}}'::jsonb as rawjson
union all
select 'second', '{"x":{"value":456},"y":{"value":[7,8,9]},"z":{"value":[{"a":0,"b":1},{"a":2,"b":3}]}}'::jsonb
)
select
array(
select
array(
select 1
from jsonb_array_elements(
case
when jsonb_typeof(o.value->'value') = 'array' then o.value->'value'
else '[]'::jsonb
end
) with ordinality as x(v, i), jsonb_each (
case
when jsonb_typeof(v) = 'object' then v
else '{}'::jsonb
end
) as j
)
from jsonb_each(rawjson) AS o
where jsonb_typeof(o.value) = 'object' and o.value ? 'value'
) as kv
from rawdata
optimized_logical_plan_for_batch: |-
LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: [$expr6] }
├─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Coalesce(array_agg($expr5) filter(IsNotNull(1:Int32)), ARRAY[]:List(List(Int32))) as $expr6] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [array_agg($expr5) filter(IsNotNull(1:Int32))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] }
├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5, 1:Int32] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr1, $expr2) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr5] }
├─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr1] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, Coalesce(array_agg(1:Int32) filter(IsNotNull(1:Int32)), ARRAY[]:List(Int32)) as $expr5] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [array_agg(1:Int32) filter(IsNotNull(1:Int32))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom($expr2, $expr3) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2, 1:Int32, 1:Int32] }
├─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr2], aggs: [] }
│ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr2] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3, 1:Int32, 1:Int32] }
└─LogicalJoin { type: Inner, on: IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom(JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))) AND IsNotDistinctFrom($expr3, $expr4) AND IsNotDistinctFrom('{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, '{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb), output: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3] }
├─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr3], aggs: [] }
│ └─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr3] }
│ └─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
│ └─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
│ └─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
│ └─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
└─LogicalProjectSet { select_list: [$0, $1, $2, $3, JsonbEach(Case((JsonbTypeof($3) = 'object':Varchar), $3, '{}':Jsonb))] }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb)), JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))], aggs: [] }
└─LogicalProjectSet { select_list: [$0, $1, JsonbArrayElements(Case((JsonbTypeof(JsonbAccess($1, 'value':Varchar)) = 'array':Varchar), JsonbAccess($1, 'value':Varchar), '[]':Jsonb))] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, $expr4], aggs: [] }
└─LogicalProject { exprs: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb, Field(JsonbEach($0), 1:Int32) as $expr4] }
└─LogicalFilter { predicate: (JsonbTypeof(Field(JsonbEach($0), 1:Int32)) = 'object':Varchar) AND JsonbExists(Field(JsonbEach($0), 1:Int32), 'value':Varchar) }
└─LogicalProjectSet { select_list: [$0, JsonbEach($0)] }
└─LogicalAgg { group_key: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], aggs: [] }
└─LogicalValues { rows: [['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb], ['{"x": {"value": 456}, "y": {"value": [7, 8, 9]}, "z": {"value": [{"a": 0, "b": 1}, {"a": 2, "b": 3}]}}':Jsonb]], schema: Schema { fields: ['{"x": {"value": 123}, "y": {"value": [1, 2, 3]}, "z": {"value": [{"a": 4, "b": 5}, {"a": 6, "b": 7}]}}':Jsonb:Jsonb] } }
14 changes: 2 additions & 12 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![
TranslateApplyRule::create(true),
// Separate the project from a join if necessary because `ApplyJoinTransposeRule`
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
vec![TranslateApplyRule::create(true)],
ApplyOrder::TopDown,
)
});
Expand All @@ -213,12 +208,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage>
LazyLock::new(|| {
OptimizationStage::new(
"General Unnesting(Translate Apply)",
vec![
TranslateApplyRule::create(false),
// Separate the project from a join if necessary because `ApplyJoinTransposeRule`
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
vec![TranslateApplyRule::create(false)],
ApplyOrder::TopDown,
)
});
Expand Down
16 changes: 11 additions & 5 deletions src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::expr::{
InputRef,
};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_node::{
LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNode, PlanTreeNodeBinary,
};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
use crate::optimizer::PlanRef;
Expand Down Expand Up @@ -122,10 +124,14 @@ impl Rule for ApplyJoinTransposeRule {
return None;
}

assert!(
join.output_indices_are_trivial(),
"ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule"
);
// ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule.
// As this rule will be applied until we reach the fixed point, if the join has output indices, apply ProjectJoinSeparateRule first and return is safety.
if !join.output_indices_are_trivial() {
let new_apply_right = crate::optimizer::rule::ProjectJoinSeparateRule::create()
.apply(join.clone().into())
.unwrap();
return Some(apply.clone_with_inputs(&[apply_left, new_apply_right]));
}

let (push_left, push_right) = match join.join_type() {
// `LeftSemi`, `LeftAnti`, `LeftOuter` can only push to left side if it's right side has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl ProjectJoinSeparateRule {
impl Rule for ProjectJoinSeparateRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let join = plan.as_logical_join()?;
if join.is_full_out() {
if join.output_indices_are_trivial() {
None
} else {
let (left, right, on, join_type, output_indices) = join.clone().decompose();
Expand Down
Loading