Skip to content

Commit

Permalink
feat(optimizer): improve scalar subqueries optimization time (#16966)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored May 29, 2024
1 parent 356593e commit bd6454e
Show file tree
Hide file tree
Showing 13 changed files with 1,179 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,49 @@
expected_outputs:
- batch_plan
- stream_plan
- name: improve multi scalar subqueries optimization time. issue 16952. case 1.
sql: |
create table t1(a int, b int);
create table t2(c int primary key, d int);
select
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col1,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col2,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col3,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col4,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col5,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col6,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col7,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col8,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col9,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col10,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col11,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col12,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col13,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col14,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col15,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col16,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col17,
COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col18
from t1;
expected_outputs:
- batch_plan
- stream_plan
- name: improve multi scalar subqueries optimization time. issue 16952. case 2.
sql: |
create table t1(a int, b int);
create table t2(c int primary key, d int);
create table t3(e int, f int);
create table t4(g int, h int);
create table t5(i int, j int);
create table t6(k int, l int);
select
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col1,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col2,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col3,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col4,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col5,
COALESCE((SELECT sum(d) FROM t2 left join t3 on e = a and f = c left join t4 on g = a and h = c left join t5 on i = a and j = c WHERE t1.a = t2.c and t1.a = t2.c and j in (select k from t6 where b = l) ), 0) col6
from t1;
expected_outputs:
- batch_plan
- stream_plan

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -151,40 +151,25 @@
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.arr] }
├─BatchExchange { order: [], dist: HashShard(t.arr) }
│ └─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchProjectSet { select_list: [$0, Unnest($0)] }
└─BatchHashAgg { group_key: [t.arr], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, ordinality, arr_2, ordinality_2, t._row_id(hidden), projected_row_id(hidden), projected_row_id#1(hidden)], stream_key: [t._row_id, projected_row_id, arr, projected_row_id#1], pk_columns: [t._row_id, projected_row_id, arr, projected_row_id#1], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, projected_row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), $expr1, projected_row_id, t.arr, Unnest($0), t._row_id, projected_row_id] }
├─StreamShare { id: 8 }
│ └─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, projected_row_id] }
│ └─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, Unnest($0), (projected_row_id + 1:Int64) as $expr2, t._row_id, projected_row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.arr) }
│ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamProjectSet { select_list: [$0, Unnest($0)] }
│ └─StreamProject { exprs: [t.arr] }
│ └─StreamHashAgg { group_key: [t.arr], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamShare { id: 8 }
└─StreamProject { exprs: [t.x, t.arr, Unnest($0), (projected_row_id + 1:Int64) as $expr1, t._row_id, projected_row_id] }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, projected_row_id, t.arr, Unnest($0), t._row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamProjectSet { select_list: [$0, Unnest($0)] }
└─StreamProject { exprs: [t.arr] }
└─StreamHashAgg { group_key: [t.arr], aggs: [count] }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.arr) }
└─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
select * from abs(1) WITH ORDINALITY;
batch_plan: |-
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITH_SHARE: LazyLock<OptimizationStage> =
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
ApplyOrder::BottomUp,
ApplyOrder::TopDown,
)
});

Expand All @@ -186,7 +186,7 @@ static GENERAL_UNNESTING_TRANS_APPLY_WITHOUT_SHARE: LazyLock<OptimizationStage>
// can't handle a join with `output_indices`.
ProjectJoinSeparateRule::create(),
],
ApplyOrder::BottomUp,
ApplyOrder::TopDown,
)
});

Expand Down
18 changes: 16 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub struct LogicalApply {
/// Whether we require the subquery to produce at most one row. If `true`, we have to report an
/// error if the subquery produces more than one row.
max_one_row: bool,

/// An apply has been translated by `translate_apply()`, so we should not translate it in `translate_apply_rule` again.
/// This flag is used to avoid infinite loop in General Unnesting(Translate Apply), since we use a top-down apply order instead of bottom-up to improve the multi-scalar subqueries optimization time.
translated: bool,
}

impl Distill for LogicalApply {
Expand Down Expand Up @@ -85,6 +89,7 @@ impl LogicalApply {
correlated_id: CorrelatedId,
correlated_indices: Vec<usize>,
max_one_row: bool,
translated: bool,
) -> Self {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
Expand All @@ -105,6 +110,7 @@ impl LogicalApply {
correlated_id,
correlated_indices,
max_one_row,
translated,
}
}

Expand All @@ -125,6 +131,7 @@ impl LogicalApply {
correlated_id,
correlated_indices,
max_one_row,
false,
)
.into()
}
Expand Down Expand Up @@ -164,6 +171,10 @@ impl LogicalApply {
self.correlated_indices.to_owned()
}

pub fn translated(&self) -> bool {
self.translated
}

pub fn max_one_row(&self) -> bool {
self.max_one_row
}
Expand Down Expand Up @@ -202,15 +213,17 @@ impl LogicalApply {
let apply_left_len = apply_left.schema().len();
let correlated_indices_len = correlated_indices.len();

let new_apply = LogicalApply::create(
let new_apply = LogicalApply::new(
domain,
apply_right,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
max_one_row,
);
true,
)
.into();

let on = Self::rewrite_on(on, correlated_indices_len, apply_left_len).and(Condition {
conjunctions: eq_predicates,
Expand Down Expand Up @@ -285,6 +298,7 @@ impl PlanTreeNodeBinary for LogicalApply {
self.correlated_id,
self.correlated_indices.clone(),
self.max_one_row,
self.translated,
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ impl Rule for ApplyAggTransposeRule {
correlated_id,
correlated_indices.clone(),
false,
false,
)
.translate_apply(left, eq_predicates)
} else {
LogicalApply::new(
LogicalApply::create(
left,
input,
JoinType::Inner,
Expand All @@ -118,7 +119,6 @@ impl Rule for ApplyAggTransposeRule {
correlated_indices.clone(),
false,
)
.into()
};

let group_agg = {
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_dedup_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ impl Rule for ApplyDedupTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
dedup_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_dedup = {
let mut new_dedup_cols: Vec<usize> = (0..apply_left_len).collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ impl Rule for ApplyExpandTransposeRule {
return None;
}

let new_apply: PlanRef = LogicalApply::new(
let new_apply: PlanRef = LogicalApply::create(
left,
expand_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_apply_schema_len = new_apply.schema().len();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,15 @@ impl Rule for ApplyHopWindowTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
hop_window_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_hop_window = LogicalHopWindow::create(
new_apply,
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_limit_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,15 @@ impl Rule for ApplyLimitTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
limit_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_topn = {
// use the first column as an order to provide determinism for streaming queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ impl Rule for ApplyOverWindowTransposeRule {
let apply_left_len = left.schema().len();
let apply_left_schema = left.schema().clone();

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
window_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_over_window = {
// Shift index of window functions' `InputRef` with `apply_left_len`.
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ impl Rule for ApplyTopNTransposeRule {
return None;
}

let new_apply = LogicalApply::new(
let new_apply = LogicalApply::create(
left,
topn_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();
);

let new_topn = {
// shift index of topn's `InputRef` with `apply_left_len`.
Expand Down
Loading

0 comments on commit bd6454e

Please sign in to comment.