-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(over window): fix error in using aggregate function result as win… #12551
Conversation
…dow function argument rewrote the `partition by` and `group by` expressions (`input_ref`) in the window function to avoid out of index problems.
prepare tabledev=> create table t (id int, a int, b int, c int);
CREATE_TABLE
dev=> insert into t values (1, 72, 23, 84), (2, 98, 13, 29), (3, 74, 56, 43), (4, 19, 56, 83), (5, 52, 68, 20);
INSERT 0 5
dev=> select * from t;
id | a | b | c
----+----+----+----
1 | 72 | 23 | 84
2 | 98 | 13 | 29
3 | 74 | 56 | 43
4 | 19 | 56 | 83
5 | 52 | 68 | 20
(5 rows) execute querydev=> select row_number() over (order by a) from t group by a;
ERROR: QueryError: Feature is not yet implemented: Window function with empty PARTITION BY is not supported yet
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
dev=> select row_number() over (partition by a order by a) from t group by a;
row_number
------------
1
1
1
1
1
(5 rows)
dev=> select sum((sum(b))) over (partition by a order by a) from t group by a;
sum
-----
56
68
23
56
13
(5 rows) explain querydev=> explain select row_number() over (partition by a order by a) from t group by a;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [row_number] }
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.a ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchSort { order: [t.a ASC, t.a ASC] }
└─BatchHashAgg { group_key: [t.a], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.a) }
└─BatchScan { table: t, columns: [a] }
(7 rows)
dev=> explain select row_number() over (partition by a order by a desc) from t group by a;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [row_number] }
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.a DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchSort { order: [t.a ASC, t.a DESC] }
└─BatchHashAgg { group_key: [t.a], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(t.a) }
└─BatchScan { table: t, columns: [a] }
(7 rows)
dev=> explain select sum((sum(b))) over (partition by a order by a) from t group by a;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [sum] }
└─BatchOverWindow { window_functions: [sum(sum(t.b)) OVER(PARTITION BY t.a ORDER BY t.a ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchSort { order: [t.a ASC, t.a ASC] }
└─BatchHashAgg { group_key: [t.a], aggs: [sum(t.b)] }
└─BatchExchange { order: [], dist: HashShard(t.a) }
└─BatchScan { table: t, columns: [a, b] }
(7 rows) |
thanks! could you also help to add some planner test? |
Hi, I just added some planner_test and e2e_test. I don't know yet how rw handles the sort key, but I'll start checking. edit:It dawned on me that the tested mysql: edit*:documented: If ORDER BY is not given, the rows are returned in whatever order the system finds fastest to produce. Maybe this can be seen as compatible behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM! Thx for your contribution!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you plz test some cases like the following where partition by
and order by
are not the same as group by
?
select
a, b,
sum( sum(c) ) over (partition by a order by b)
from t
group by a, b;
Or, if we want to go further:
select
a, b,
sum( sum(c) ) over (partition by a, avg(d) order by max(e), b)
from t
group by a, b;
Also plz add streaming version of these tests. You may refer to how OverWindow is e2e-tested.
- sql: | | ||
CREATE TABLE t (a int, b int); | ||
SELECT a, row_number() OVER (PARTITION BY a ORDER BY a DESC) FROM t; | ||
expected_outputs: | ||
- batch_plan | ||
- stream_plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not a test for agg
.
- sql: | | ||
CREATE TABLE t(a int, b int); | ||
SELECT a, sum((sum(b))) OVER (PARTITION BY a ORDER BY a) FROM t GROUP BY a; | ||
batch_plan: |- | ||
BatchExchange { order: [], dist: Single } | ||
└─BatchProject { exprs: [t.a, sum] } | ||
└─BatchOverWindow { window_functions: [sum(sum(t.b)) OVER(PARTITION BY t.a ORDER BY t.a ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } | ||
└─BatchSort { order: [t.a ASC, t.a ASC] } | ||
└─BatchHashAgg { group_key: [t.a], aggs: [sum(t.b)] } | ||
└─BatchExchange { order: [], dist: HashShard(t.a) } | ||
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } | ||
stream_plan: |- | ||
StreamMaterialize { columns: [a, sum], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } | ||
└─StreamProject { exprs: [t.a, sum] } | ||
└─StreamOverWindow { window_functions: [sum(sum(t.b)) OVER(PARTITION BY t.a ORDER BY t.a ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } | ||
└─StreamProject { exprs: [t.a, sum(t.b)] } | ||
└─StreamHashAgg { group_key: [t.a], aggs: [sum(t.b), count] } | ||
└─StreamExchange { dist: HashShard(t.a) } | ||
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but similarly, will better to have different partition by
, order by
and group by
.
will it feel better now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
a, | ||
sum((sum(b))) over (partition by a order by a) | ||
from t | ||
group by a; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add order by a
to ensure consistent result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is for consistent result, should order by
constraints be added to every query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Should be added to every query in this file.
To ensure consistent result
#12551) (#12720) Co-authored-by: jinser <[email protected]>
…dow function argument
rewrote the
partition by
andgroup by
expressions (input_ref
) in the window function to avoid out of index problems.I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Fixes #10666.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.