Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): prune order key using functional dependencies #16204

Merged
merged 16 commits into from
Apr 11, 2024

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Apr 8, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Closes #16148 .

The main idea in this PR is to prune order key in areas where it is consumed:

  1. In derive_pk in the final part for create materialized view, create sink, create subscription.

Note that we did not do so in BatchSort, since functional dependency may not be supported there yet (?) I tried to add it, but seems it did not prune the order key, as it did in stream.

We specifically do it in these areas, rather than add a new optimizer rule + optimizer pass, since the call to FunctionalDependencySet::minimize_key can be quite expensive, and we only need it in a few areas.

We also don't call minimize_key for Index, since it reuses the order keys as distribution key, rather than as a key. Pruning it can lead to incorrect distribution, for instance when:

pk = []

Then,
order key = [0]

After pruning,
order key = [].

But we want index to be distributed on col 0. So it is incorrect.

PlanRoot::new(
logical_project,
RequiredDist::PhysicalDist(Distribution::HashShard(
(0..distributed_by_columns_len).collect(),
)),
Order::new(
index_columns
.iter()
.enumerate()
.map(|(i, (_, order))| ColumnOrder::new(i, *order))
.collect(),
),
project_required_cols,
out_names,
)
.gen_index_plan(index_name, definition, retention_seconds)

Finally, we use a new minimizing function, minimize_order_key, rather than minimize_key, since the properties of order key is different from that of minimize_key.

For instance, given the following order key:

fun dep: (0, 1) -> 2
order key: [2, 0, 1]

If we simply apply minimize_key to it, it will treat 2 is obsolete, since the remainder can still form a key due to the functional dependency of (0, 1) -> 2.

But this breaks the ordering properties, since we no longer order by 2 first, before (0, 1).

So we introduce a new minimizing function which will minimize by using the functional dependencies of the prefixes of the order key to prune the suffixes.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@kwannoel kwannoel force-pushed the kwannoel/pk-prefix branch from e49f6a8 to c7112c9 Compare April 9, 2024 02:21
@kwannoel kwannoel requested review from chenzl25, hzxa21 and xxhZs April 9, 2024 03:38
@kwannoel kwannoel requested a review from chenzl25 April 9, 2024 08:02
&columns,
// For index, we can't prune the ORDER KEY,
// since it's also the distribution key.
table_type != TableType::Index,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Index actually is also a materialized view. This is a bit special if we can't unify them in this code path. Could you provide an example to illustrate the problem you met? Let's see whether we could resolve it together.

Copy link
Contributor Author

@kwannoel kwannoel Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the failing query.

    create table t1 (v1 int, v2 int);
    create materialized view v as select count(*) cnt from t1;
    explain (verbose) create index mv_idx on v(cnt);

This is its plan without pruning:

    StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
    └─StreamExchange { dist: HashShard(v.cnt) }
      └─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }

This is its plan with pruning:

    StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
    └─StreamExchange { dist: HashShard(v.cnt) }
      └─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }

It has the following functional dependency: [] -> all_columns, since it is a singleton.

As mentioned in the PR description, the following section of code:

PlanRoot::new(
logical_project,
RequiredDist::PhysicalDist(Distribution::HashShard(
(0..distributed_by_columns_len).collect(),
)),
Order::new(
index_columns
.iter()
.enumerate()
.map(|(i, (_, order))| ColumnOrder::new(i, *order))
.collect(),
),
project_required_cols,
out_names,
)
.gen_index_plan(index_name, definition, retention_seconds)

Will result in

  1. order key: [cnt]
  2. dist key: [0]

Now dist key refers to order key, you can see how it just takes the prefix:

        RequiredDist::PhysicalDist(Distribution::HashShard(
            (0..distributed_by_columns_len).collect(),
        )),

This seems a little strange, because order key is not appended to pk at this point.
Subsequently, when we derive the pk, that is just stream key combined with order key.
Stream key is []. And after pruning, Order key is [] as well. So pk is just [].

Then distribution key is not a subset of pk at all.

There's 2 solutions I can think of here:

  1. Just avoid index (stupid and simple but not elegant).
  2. Separately append distribution key for index elsewhere. Instead of relying on order key to be the distribution key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe when we derive a pk, we need to consider the distribution key as well, because theoretically users can specify a distribution key for a materialized view as well, though we don't support it right now, but I remember this feature has been requested several times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, even for the normal materialized view, we need to ensure distribution key is a subset of pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it by appending the distribution key if it is not part of the pk.

└─BatchSimpleAgg { aggs: [sum0(count)] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [count] }
└─BatchScan { table: t, columns: [], distribution: SomeShard }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why prune TopN columns can also rewrite the BatchTopN 🤔 . Regardless, here it could be rewritten to BatchLimit instead I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines 127 to 139
// We need to ensure distribution key is part of pk.
// If it is not part of either stream_key or order_key,
// It must mean that it is only necessary for storage distribution.
// Such a case is rare, but it is possible,
// for example in the case of index created on singleton mv.
// In this case, we can simply append these columns to pk.
for &idx in input.distribution().dist_column_indices() {
if in_order.contains(idx) {
continue;
}
pk.push(ColumnOrder::new(idx, OrderType::ascending()));
in_order.insert(idx);
}
Copy link
Contributor

@chenzl25 chenzl25 Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this behavior could break the assumption (not sure whether still holds ATM) we made, i.e. the distribution key should be the prefix of the pk. I remember we had some discussions on this topic @st1page @fuyufjh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recollection is a bit hazy, but it seems to be for the performance consideration of batch scanning, the distribution key was restricted to the prefix of the primary key (pk), so that queries with a prefix can scan fewer partitions.

Copy link
Contributor

@chenzl25 chenzl25 Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, now I prefer to not append the distribution key to the end when distribution key is eliminated by functional dependency and we can use the original one instead. @kwannoel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, now I prefer to not append the distribution key to the end when distribution key is eliminated by functional dependency and we can use the original one instead. @kwannoel

Hmm so just don't prune order key for indexes, since they are used as distribution key?

Copy link
Contributor Author

@kwannoel kwannoel Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tricky thing is that distribution key is always a prefix of PK, and so is order key in certain cases.

There isn't a clear way to determine how we should interleave this distribution key's missing columns into the current PK, preserving the above properties, unless we know the plans which generated the distribution key and order key.

In the case of derive pk, we don't have this info. So seems hard to make a generalised decision. Think we can just disable the order key pruning for index, seems to be the only one so far which has the overlap.

We can revisit if more edge cases show up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid of this PR eliminating the distribution key and then to ensure the distribution key is part of pk, we generate a new pk with the distribution key in the suffix. For this case, we can just give up the optimization and use the original derived pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it by making distribution a constraint of minimizing order key. If pruning order key column also results in dist key column being pruned, we will not prune it.

@kwannoel kwannoel force-pushed the kwannoel/pk-prefix branch from e113408 to 1ab4f59 Compare April 10, 2024 14:52
@kwannoel kwannoel requested review from chenzl25 and st1page April 10, 2024 14:58
// Optimize order key before using it for TopN.
let func_dep = plan.functional_dependency();
let order = func_dep.minimize_order_key(order);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Not clear what's the dist_key_indices to pass in here... Only impacts batch top n, we can leave it out for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pass in [] as dist_key_indices since top n is singleton dist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 4ee529c

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@kwannoel kwannoel added this pull request to the merge queue Apr 11, 2024
Merged via the queue into main with commit 132b4c9 Apr 11, 2024
27 of 28 checks passed
@kwannoel kwannoel deleted the kwannoel/pk-prefix branch April 11, 2024 04:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use functional dependencies to optimize unnecessary order key column
3 participants