Skip to content

Commit

Permalink
fix(optimizer): fix apply topn transpose rule (#17386)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and lmatz committed Jun 24, 2024
1 parent 4121c6d commit d5a4d72
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 1 deletion.
29 changes: 29 additions & 0 deletions e2e_test/batch/subquery/lateral_subquery.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,32 @@ drop table all_sales;
statement ok
drop table salesperson;

statement ok
CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int);

statement ok
INSERT INTO r VALUES
('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2),
('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3),
('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2),
('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1),
('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2),
('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1);

query TII rowsort
SELECT e.ts AS e_ts, d.*
FROM (
SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e
JOIN LATERAL
(
SELECT DISTINCT ON(src_id, dev_id) *
FROM r
WHERE r.src_id = e.src_id AND r.ts <= e.ts
ORDER BY src_id, dev_id, ts DESC
)d on true;
----
2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3
2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2

statement ok
DROP TABLE r CASCADE;
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,18 @@
) AS b ON TRUE;
expected_outputs:
- stream_plan
- name: https://github.com/risingwavelabs/risingwave/issues/17382
sql: |
CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int);
SELECT e.ts AS e_ts, d.*
FROM (
SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e
JOIN LATERAL
(
SELECT DISTINCT ON(src_id, dev_id) *
FROM r
WHERE r.src_id = e.src_id AND r.ts <= e.ts
ORDER BY src_id, dev_id, ts DESC
)d on true;
expected_outputs:
- batch_plan
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,31 @@
└─StreamExchange { dist: HashShard(t1.c1, $expr3) }
└─StreamProject { exprs: [t1.c1, t1.c2, t1.c3, AtTimeZone(t1.c4, 'UTC':Varchar) as $expr2, t1.c4::Date as $expr3, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1.c4, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
- name: https://github.com/risingwavelabs/risingwave/issues/17382
sql: |
CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int);
SELECT e.ts AS e_ts, d.*
FROM (
SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e
JOIN LATERAL
(
SELECT DISTINCT ON(src_id, dev_id) *
FROM r
WHERE r.src_id = e.src_id AND r.ts <= e.ts
ORDER BY src_id, dev_id, ts DESC
)d on true;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: 1:Int32 IS NOT DISTINCT FROM 1:Int32 AND '2024-06-20 19:01:00+00:00':Timestamptz IS NOT DISTINCT FROM '2024-06-20 19:01:00+00:00':Timestamptz, output: ['2024-06-20 19:01:00+00:00':Timestamptz, r.ts, r.src_id, r.dev_id] }
├─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) }
│ └─BatchValues { rows: [['2024-06-20 19:01:00+00:00':Timestamptz, 1:Int32]] }
└─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) }
└─BatchGroupTopN { order: [r.src_id ASC, r.dev_id ASC, r.ts DESC], limit: 1, offset: 0, group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id] }
└─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id) }
└─BatchHashJoin { type: Inner, predicate: 1:Int32 = r.src_id AND (r.ts <= '2024-06-20 19:01:00+00:00':Timestamptz), output: all }
├─BatchExchange { order: [], dist: HashShard(1:Int32) }
│ └─BatchHashAgg { group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) }
│ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] }
└─BatchExchange { order: [], dist: HashShard(r.src_id) }
└─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard }
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl Rule for ApplyTopNTransposeRule {
apply.clone().decompose();
assert_eq!(join_type, JoinType::Inner);
let topn: &LogicalTopN = right.as_logical_top_n()?;
let (topn_input, limit, offset, with_ties, mut order, group_key) = topn.clone().decompose();
let (topn_input, limit, offset, with_ties, mut order, mut group_key) =
topn.clone().decompose();

let apply_left_len = left.schema().len();

Expand All @@ -74,6 +75,7 @@ impl Rule for ApplyTopNTransposeRule {
.column_orders
.iter_mut()
.for_each(|ord| ord.column_index += apply_left_len);
group_key.iter_mut().for_each(|idx| *idx += apply_left_len);
let new_group_key = (0..apply_left_len).chain(group_key).collect_vec();
LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key)
};
Expand Down

0 comments on commit d5a4d72

Please sign in to comment.