-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(frontend): prune order key using functional dependencies #16204
Conversation
e49f6a8
to
c7112c9
Compare
src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml
Outdated
Show resolved
Hide resolved
&columns, | ||
// For index, we can't prune the ORDER KEY, | ||
// since it's also the distribution key. | ||
table_type != TableType::Index, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index actually is also a materialized view. This is a bit special if we can't unify them in this code path. Could you provide an example to illustrate the problem you met? Let's see whether we could resolve it together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the failing query.
create table t1 (v1 int, v2 int);
create materialized view v as select count(*) cnt from t1;
explain (verbose) create index mv_idx on v(cnt);
This is its plan without pruning:
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
This is its plan with pruning:
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
It has the following functional dependency: [] -> all_columns
, since it is a singleton.
As mentioned in the PR description, the following section of code:
risingwave/src/frontend/src/handler/create_index.rs
Lines 385 to 400 in a14ff37
PlanRoot::new( | |
logical_project, | |
RequiredDist::PhysicalDist(Distribution::HashShard( | |
(0..distributed_by_columns_len).collect(), | |
)), | |
Order::new( | |
index_columns | |
.iter() | |
.enumerate() | |
.map(|(i, (_, order))| ColumnOrder::new(i, *order)) | |
.collect(), | |
), | |
project_required_cols, | |
out_names, | |
) | |
.gen_index_plan(index_name, definition, retention_seconds) |
Will result in
- order key: [cnt]
- dist key: [0]
Now dist key refers to order key, you can see how it just takes the prefix:
RequiredDist::PhysicalDist(Distribution::HashShard(
(0..distributed_by_columns_len).collect(),
)),
This seems a little strange, because order key is not appended to pk at this point.
Subsequently, when we derive the pk, that is just stream key combined with order key.
Stream key is []
. And after pruning, Order key is []
as well. So pk is just []
.
Then distribution key is not a subset of pk
at all.
There's 2 solutions I can think of here:
- Just avoid index (stupid and simple but not elegant).
- Separately append distribution key for index elsewhere. Instead of relying on order key to be the distribution key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe when we derive a pk, we need to consider the distribution key as well, because theoretically users can specify a distribution key for a materialized view as well, though we don't support it right now, but I remember this feature has been requested several times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, even for the normal materialized view, we need to ensure distribution key is a subset of pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it by appending the distribution key if it is not part of the pk.
└─BatchSimpleAgg { aggs: [sum0(count)] } | ||
└─BatchExchange { order: [], dist: Single } | ||
└─BatchSimpleAgg { aggs: [count] } | ||
└─BatchScan { table: t, columns: [], distribution: SomeShard } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why prune TopN columns can also rewrite the BatchTopN 🤔 . Regardless, here it could be rewritten to BatchLimit
instead I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
// We need to ensure distribution key is part of pk. | ||
// If it is not part of either stream_key or order_key, | ||
// It must mean that it is only necessary for storage distribution. | ||
// Such a case is rare, but it is possible, | ||
// for example in the case of index created on singleton mv. | ||
// In this case, we can simply append these columns to pk. | ||
for &idx in input.distribution().dist_column_indices() { | ||
if in_order.contains(idx) { | ||
continue; | ||
} | ||
pk.push(ColumnOrder::new(idx, OrderType::ascending())); | ||
in_order.insert(idx); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My recollection is a bit hazy, but it seems to be for the performance consideration of batch scanning, the distribution key was restricted to the prefix of the primary key (pk), so that queries with a prefix can scan fewer partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, now I prefer to not append the distribution key to the end when distribution key is eliminated by functional dependency and we can use the original one instead. @kwannoel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, now I prefer to not append the distribution key to the end when distribution key is eliminated by functional dependency and we can use the original one instead. @kwannoel
Hmm so just don't prune order key for indexes, since they are used as distribution key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the tricky thing is that distribution key is always a prefix of PK, and so is order key in certain cases.
There isn't a clear way to determine how we should interleave this distribution key's missing columns into the current PK, preserving the above properties, unless we know the plans which generated the distribution key and order key.
In the case of derive pk, we don't have this info. So seems hard to make a generalised decision. Think we can just disable the order key pruning for index, seems to be the only one so far which has the overlap.
We can revisit if more edge cases show up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid of this PR eliminating the distribution key and then to ensure the distribution key is part of pk, we generate a new pk with the distribution key in the suffix. For this case, we can just give up the optimization and use the original derived pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it by making distribution a constraint of minimizing order key. If pruning order key column also results in dist key column being pruned, we will not prune it.
e113408
to
1ab4f59
Compare
src/frontend/src/planner/query.rs
Outdated
// Optimize order key before using it for TopN. | ||
let func_dep = plan.functional_dependency(); | ||
let order = func_dep.minimize_order_key(order); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Not clear what's the dist_key_indices
to pass in here... Only impacts batch top n, we can leave it out for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pass in []
as dist_key_indices since top n is singleton dist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 4ee529c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Closes #16148 .
The main idea in this PR is to prune order key in areas where it is consumed:
derive_pk
in the final part forcreate materialized view, create sink, create subscription
.Note that we did not do so in
BatchSort
, since functional dependency may not be supported there yet (?) I tried to add it, but seems it did not prune the order key, as it did in stream.We specifically do it in these areas, rather than add a new optimizer rule + optimizer pass, since the call to
FunctionalDependencySet::minimize_key
can be quite expensive, and we only need it in a few areas.We also don't call minimize_key for
Index
, since it reuses the order keys as distribution key, rather than as a key. Pruning it can lead to incorrect distribution, for instance when:But we want index to be distributed on col 0. So it is incorrect.
risingwave/src/frontend/src/handler/create_index.rs
Lines 385 to 400 in a14ff37
Finally, we use a new minimizing function,
minimize_order_key
, rather thanminimize_key
, since the properties of order key is different from that ofminimize_key
.For instance, given the following order key:
If we simply apply
minimize_key
to it, it will treat2
is obsolete, since the remainder can still form a key due to the functional dependency of(0, 1) -> 2
.But this breaks the ordering properties, since we no longer order by
2
first, before(0, 1)
.So we introduce a new minimizing function which will minimize by using the functional dependencies of the prefixes of the order key to prune the suffixes.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.