Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): improve scalar subqueries optimization time #16966

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,49 @@
expected_outputs:
- batch_plan
- stream_plan
- name: improve multi scalar subqueries optimization time. issue 16952. case 1.
sql: |
create table t1(a int, b int);
create table t2(c int primary key, d int);
select
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col1,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col2,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col3,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col4,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col5,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col6,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col7,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col8,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col9,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col10,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col11,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col12,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col13,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col14,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col15,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col16,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col17,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col18
from t1;
expected_outputs:
- batch_plan
- stream_plan
- name: improve multi scalar subqueries optimization time. issue 16952. case 2.
sql: |
create table t1(a int, b int);
create table t2(c int primary key, d int);
create table t3(e int, f int);
create table t4(g int, h int);
create table t5(i int, j int);
create table t6(k int, l int);
select
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col1,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col2,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col3,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col4,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col5,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col6
from t1;
expected_outputs:
- batch_plan
- stream_plan

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -151,40 +151,25 @@
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this plan gets improved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the initial logical plan of this case is:

 LogicalProject { exprs: [t.x, t.arr, unnest, ordinality, unnest, ordinality] }
 └─LogicalApply { type: Inner, on: true, correlated_id: 1 }
   ├─LogicalApply { type: Inner, on: true, correlated_id: 2 }
   │ ├─LogicalScan { table: t, columns: [x, arr, _row_id] }
   │ └─LogicalTableFunction { table_function: Unnest(CorrelatedInputRef { index: 1, correlated_id: 2 }) }
   └─LogicalTableFunction { table_function: Unnest(CorrelatedInputRef { index: 1, correlated_id: 1 }) }

It is the same shape as the multi scalar subqueries.

stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), projected_row_id(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, projected_row_id, arr, projected_row_id#1], pk_columns: [t._row_id, projected_row_id, arr, projected_row_id#1], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, projected_row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, projected_row_id] }
├─StreamShare { id: 8 }
│ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, projected_row_id] }
│ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, projected_row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamShare { id: 8 }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
select * from abs(1) WITH ORDINALITY;
batch_plan: |-
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
ApplyOrder::BottomUp,
ApplyOrder::TopDown,
)
});

Expand All @@ -186,7 +186,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage>
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
ApplyOrder::BottomUp,
ApplyOrder::TopDown,
)
});

Expand Down
18 changes: 16 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub struct LogicalApply {
/// Whether we require the subquery to produce at most one row. If `true`, we have to report an
/// error if the subquery produces more than one row.
max_one_row: bool,

/// An apply has been translated by `translate_apply()`, so we should not translate it in `translate_apply_rule` again.
/// This flag is used to avoid infinite loop in General Unnesting(Translate Apply), since we use a top-down apply order instead of bottom-up to improve the multi-scalar subqueries optimization time.
translated: bool,
}

impl Distill for LogicalApply {
Expand Down Expand Up @@ -85,6 +89,7 @@ impl LogicalApply {
correlated_id: CorrelatedId,
correlated_indices: Vec<usize>,
max_one_row: bool,
translated: bool,
) -> Self {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
Expand All @@ -105,6 +110,7 @@ impl LogicalApply {
correlated_id,
correlated_indices,
max_one_row,
translated,
}
}

Expand All @@ -125,6 +131,7 @@ impl LogicalApply {
correlated_id,
correlated_indices,
max_one_row,
false,
)
.into()
}
Expand Down Expand Up @@ -164,6 +171,10 @@ impl LogicalApply {
self.correlated_indices.to_owned()
}

pub fn translated(&self) -> bool {
self.translated
}

pub fn max_one_row(&self) -> bool {
self.max_one_row
}
Expand Down Expand Up @@ -202,15 +213,17 @@ impl LogicalApply {
let apply_left_len = apply_left.schema().len();
let correlated_indices_len = correlated_indices.len();

let new_apply = LogicalApply::create(
let new_apply = LogicalApply::new(
domain,
apply_right,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
max_one_row,
);
true,
)
.into();

let on = Self::rewrite_on(on, correlated_indices_len, apply_left_len).and(Condition {
conjunctions: eq_predicates,
Expand Down Expand Up @@ -285,6 +298,7 @@ impl PlanTreeNodeBinary for LogicalApply {
self.correlated_id,
self.correlated_indices.clone(),
self.max_one_row,
self.translated,
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ impl Rule for ApplyAggTransposeRule {
correlated_id,
correlated_indices.clone(),
false,
false,
)
.translate_apply(left, eq_predicates)
} else {
LogicalApply::new(
LogicalApply::create(
left,
input,
JoinType::Inner,
Expand All @@ -118,7 +119,6 @@ impl Rule for ApplyAggTransposeRule {
correlated_indices.clone(),
false,
)
.into()
};

let group_agg = {
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_dedup_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ impl Rule for ApplyDedupTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
dedup_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_dedup = {
let mut new_dedup_cols: Vec<usize> = (0..apply_left_len).collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ impl Rule for ApplyExpandTransposeRule {
return None;
}

let new_apply: PlanRef = LogicalApply::new(
let new_apply: PlanRef = LogicalApply::create(
left,
expand_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_apply_schema_len = new_apply.schema().len();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,15 @@ impl Rule for ApplyHopWindowTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
hop_window_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_hop_window = LogicalHopWindow::create(
new_apply,
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_limit_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,15 @@ impl Rule for ApplyLimitTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
limit_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_topn = {
// use the first column as an order to provide determinism for streaming queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ impl Rule for ApplyOverWindowTransposeRule {
let apply_left_len = left.schema().len();
let apply_left_schema = left.schema().clone();

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
window_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_over_window = {
// Shift index of window functions' `InputRef` with `apply_left_len`.
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ impl Rule for ApplyTopNTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
topn_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_topn = {
// shift index of topn's `InputRef` with `apply_left_len`.
Expand Down
Loading
Loading