From f6a16db4fcfbdcdd87f16a604f364d14c22cc150 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 24 Jun 2024 18:32:07 +0800 Subject: [PATCH] fix(optimizer): fix apply topn transpose rule (#17386) (#17423) Co-authored-by: Dylan --- .../batch/subquery/lateral_subquery.slt.part | 29 +++++++++++++++++++ .../testdata/input/lateral_subquery.yaml | 15 ++++++++++ .../testdata/output/lateral_subquery.yaml | 28 ++++++++++++++++++ .../rule/apply_topn_transpose_rule.rs | 4 ++- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/e2e_test/batch/subquery/lateral_subquery.slt.part b/e2e_test/batch/subquery/lateral_subquery.slt.part index 04e93a3d397ce..98077487828e4 100644 --- a/e2e_test/batch/subquery/lateral_subquery.slt.part +++ b/e2e_test/batch/subquery/lateral_subquery.slt.part @@ -82,3 +82,32 @@ drop table all_sales; statement ok drop table salesperson; +statement ok +CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + +statement ok +INSERT INTO r VALUES +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 2, 2), +('2024-06-20T19:00:22Z'::TIMESTAMPTZ, 1, 3), +('2024-06-20T19:00:23Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 2, 1), +('2024-06-20T19:00:24Z'::TIMESTAMPTZ, 1, 2), +('2024-06-20T19:00:25Z'::TIMESTAMPTZ, 2, 1); + +query TII rowsort +SELECT e.ts AS e_ts, d.* +FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e +JOIN LATERAL +( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC +)d on true; +---- +2024-06-20 19:01:00+00:00 2024-06-20 19:00:22+00:00 1 3 +2024-06-20 19:01:00+00:00 2024-06-20 19:00:24+00:00 1 2 + +statement ok +DROP TABLE r CASCADE; diff --git a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml index 869bbdf6d7136..8b9126f18d641 100644 --- a/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/lateral_subquery.yaml @@ -119,3 +119,18 @@ ) AS b ON TRUE; expected_outputs: - stream_plan +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + expected_outputs: + - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index acac3cc236994..5412a9be005cb 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -242,3 +242,31 @@ └─StreamExchange { dist: HashShard(t1.c1, $expr3) } └─StreamProject { exprs: [t1.c1, t1.c2, t1.c3, AtTimeZone(t1.c4, 'UTC':Varchar) as $expr2, t1.c4::Date as $expr3, t1._row_id] } └─StreamTableScan { table: t1, columns: [t1.c1, t1.c2, t1.c3, t1.c4, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } +- name: https://github.com/risingwavelabs/risingwave/issues/17382 + sql: | + CREATE TABLE r(ts TIMESTAMPTZ, src_id int, dev_id int); + SELECT e.ts AS e_ts, d.* + FROM ( + SELECT '2024-06-20T19:01:00Z'::TIMESTAMPTZ ts, 1::INT AS src_id) e + JOIN LATERAL + ( + SELECT DISTINCT ON(src_id, dev_id) * + FROM r + WHERE r.src_id = e.src_id AND r.ts <= e.ts + ORDER BY src_id, dev_id, ts DESC + )d on true; + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 IS NOT DISTINCT FROM 1:Int32 AND '2024-06-20 19:01:00+00:00':Timestamptz IS NOT DISTINCT FROM '2024-06-20 19:01:00+00:00':Timestamptz, output: ['2024-06-20 19:01:00+00:00':Timestamptz, r.ts, r.src_id, r.dev_id] } + ├─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [['2024-06-20 19:01:00+00:00':Timestamptz, 1:Int32]] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + └─BatchGroupTopN { order: [r.src_id ASC, r.dev_id ASC, r.ts DESC], limit: 1, offset: 0, group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id] } + └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz, r.src_id, r.dev_id) } + └─BatchHashJoin { type: Inner, predicate: 1:Int32 = r.src_id AND (r.ts <= '2024-06-20 19:01:00+00:00':Timestamptz), output: all } + ├─BatchExchange { order: [], dist: HashShard(1:Int32) } + │ └─BatchHashAgg { group_key: [1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz], aggs: [] } + │ └─BatchExchange { order: [], dist: HashShard(1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz) } + │ └─BatchValues { rows: [[1:Int32, '2024-06-20 19:01:00+00:00':Timestamptz]] } + └─BatchExchange { order: [], dist: HashShard(r.src_id) } + └─BatchScan { table: r, columns: [r.ts, r.src_id, r.dev_id], distribution: SomeShard } diff --git a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs index 61b887af7ea54..1b337f1dcd9c3 100644 --- a/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_topn_transpose_rule.rs @@ -49,7 +49,8 @@ impl Rule for ApplyTopNTransposeRule { apply.clone().decompose(); assert_eq!(join_type, JoinType::Inner); let topn: &LogicalTopN = right.as_logical_top_n()?; - let (topn_input, limit, offset, with_ties, mut order, group_key) = topn.clone().decompose(); + let (topn_input, limit, offset, with_ties, mut order, mut group_key) = + topn.clone().decompose(); let apply_left_len = left.schema().len(); @@ -74,6 +75,7 @@ impl Rule for ApplyTopNTransposeRule { .column_orders .iter_mut() .for_each(|ord| ord.column_index += apply_left_len); + group_key.iter_mut().for_each(|idx| *idx += apply_left_len); let new_group_key = (0..apply_left_len).chain(group_key).collect_vec(); LogicalTopN::new(new_apply, limit, offset, with_ties, order, new_group_key) };