Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: draft stream partition compute #14135

Closed
wants to merge 2 commits into from
Closed

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Dec 21, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

draft #13898

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4637 files.

Valid Invalid Ignored Fixed
2037 5 2595 0
Click to see the invalid file list
  • src/connector/src/sink/iceberg/precomputed_partition_writer.rs
  • src/frontend/src/optimizer/plan_node/stream_partition_compute.rs
  • src/stream/src/executor/partition_compute/iceberg.rs
  • src/stream/src/executor/partition_compute/mod.rs
  • src/stream/src/from_proto/partition_compute.rs

src/stream/src/executor/partition_compute/iceberg.rs Outdated Show resolved Hide resolved
src/stream/src/executor/partition_compute/mod.rs Outdated Show resolved Hide resolved
src/stream/src/from_proto/partition_compute.rs Outdated Show resolved Hide resolved
@ZENOTME ZENOTME marked this pull request as ready for review December 21, 2023 13:49
@ZENOTME ZENOTME requested a review from a team as a code owner December 21, 2023 13:49
@ZENOTME ZENOTME force-pushed the zj/partition_compute branch 3 times, most recently from d2d8045 to d8a9274 Compare December 22, 2023 06:15
@liurenjie1024
Copy link
Contributor

cc @ZENOTME Is this pr ready for review?

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 22, 2023

cc @ZENOTME Is this pr ready for review?

Yes. And this PR is a basic framework. I left several todo which can be fix in this PR or I can separate this PR into smaller PR(frontend, executor part) and fix them in those. But I'm think the basic framework is ready to be review now.

Copy link

codecov bot commented Dec 22, 2023

Codecov Report

Attention: 307 lines in your changes are missing coverage. Please review.

Comparison is base (bb66778) 68.04% compared to head (d8a9274) 67.91%.
Report is 61 commits behind head on main.

Files Patch % Lines
...r/src/sink/iceberg/precomputed_partition_writer.rs 0.00% 98 Missing ⚠️
...rc/optimizer/plan_node/stream_partition_compute.rs 0.00% 68 Missing ⚠️
...rc/frontend/src/optimizer/plan_node/stream_sink.rs 29.16% 51 Missing ⚠️
src/connector/src/sink/iceberg/mod.rs 0.00% 29 Missing ⚠️
src/frontend/src/handler/create_sink.rs 34.21% 25 Missing ⚠️
src/stream/src/from_proto/partition_compute.rs 0.00% 19 Missing ⚠️
src/stream/src/executor/partition_compute/mod.rs 92.00% 12 Missing ⚠️
...c/stream/src/executor/partition_compute/iceberg.rs 86.66% 4 Missing ⚠️
src/stream/src/executor/sink.rs 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #14135      +/-   ##
==========================================
- Coverage   68.04%   67.91%   -0.13%     
==========================================
  Files        1541     1546       +5     
  Lines      265735   266229     +494     
==========================================
- Hits       180811   180808       -3     
- Misses      84924    85421     +497     
Flag Coverage Δ
rust 67.91% <41.52%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework generally LGTM. I think we need some small adjustment.

.fields
.push(Field::with_name(partition_type.clone(), "_rw_partition"));
let mut watermark_columns = input.watermark_columns().clone();
watermark_columns.grow(input.watermark_columns().len() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this?

input: PlanRef,
connector_name: String,
partition_fields: Vec<PartitionField>,
partition_type: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making it an explicit StructType?

use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

pub struct PartitionComputeInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment here that now we only support iceberg. I think in future if we plan to support more sinks such as delta, we may need to change it to enum, but it leaves to future.

)
.into();
Ok((
RequiredDist::shard_by_key(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to our design, we don't always need to do this sharding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which case you mean? I think if we have the partition compute, then we have to do the shard. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refer to the diagram below:

flowchart TD
    Start((start))
    End((end))
    Start --> append_only{Sink append only?}
    append_only -->|yes| iceberg_range_only_part{Range only partition?}
    iceberg_range_only_part -->|yes| no_shuffle(No shuffle)
    iceberg_range_only_part -->|no| append_only_shuffle(Shuffle by partition columns)
    no_shuffle --> End
    append_only_shuffle --> End
    append_only -->|no| upsert_iceberg_range_only_part{Range only partition?}
    upsert_iceberg_range_only_part -->|yes| upsert_shuffle_by_stream_part_key(Shuffle by stream key)
    upsert_shuffle_by_stream_part_key --> End
    upsert_iceberg_range_only_part -->|No| upsert_shuffle_by_part_key(Shuffle by partition columns)
    upsert_shuffle_by_part_key --> End
Loading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this logic in

async fn get_partition_compute_info_for_iceberg(
.

// use icelake::types::{PartitionSpec, PartitionField};
// use itertools::Itertools;

// pub fn create_partition() -> PartitionSpec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are copied from PartitionedWriter? I would suggest to remove them and add some ut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add the ut later.


/// Partition append only writer
pub struct PrecomputedPartitionedWriter<B: IcebergWriterBuilder> {
inner_writers: HashMap<OwnedRow, (B::R, icelake::types::StructValue)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing metrics here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will complete this later.

self.partition_type.clone().try_into().map_err(|e| {
icelake::Error::new(icelake::ErrorKind::ArrowError, format!("{}", e))
})?;
let partition_col_idx = schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rather than looking this, it would be safer to pass the partition index to writer builder?

transform: Box<dyn Transform>,
}

pub struct PartitionComputeExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we really need an extra PartitionComputeExecutor, this is in fact a ProjectionExecutor, and what's really required is a TransformExpression

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we use a Project instead and append the partition field, like:🤔
|field 1|field 2| -> |field 1|field 2|partition field 1|partition field 2|

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to put all partition fields under one struct type? This we can avoid name conflict as much as possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. And we should implement an expression which take multiple field and combine them as a struct first, seems we don't have a such a expression right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this just in the exprssion? StructArray is also an array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I got your point.

pub transform: String,
}

/// [`StreamPartitionCompute`] used to compute the partition of the stream in advance.
Copy link
Member

@xxchan xxchan Dec 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more comments here? It's not clear to people without background what does "partition" and "stream" and "in advance" mean here.

It would be better to describe why it is needed, how it works and when it is used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added doc in

/// ## Why we need `PartitionComputeInfo`?
. Please let me know if this not clear enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@ZENOTME ZENOTME force-pushed the zj/partition_compute branch 2 times, most recently from a7c11fe to 33117bd Compare January 3, 2024 14:53
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4688 files.

Valid Invalid Ignored Fixed
2049 4 2635 0
Click to see the invalid file list
  • src/connector/src/sink/iceberg/precomputed_partition_writer.rs
  • src/expr/impl/src/external/iceberg.rs
  • src/expr/impl/src/external/mod.rs
  • src/expr/impl/src/scalar/construct_struct.rs

src/expr/impl/src/external/iceberg.rs Outdated Show resolved Hide resolved
src/expr/impl/src/external/mod.rs Outdated Show resolved Hide resolved
src/expr/impl/src/scalar/construct_struct.rs Outdated Show resolved Hide resolved
@ZENOTME ZENOTME force-pushed the zj/partition_compute branch 2 times, most recently from 76b6931 to 3f70156 Compare January 7, 2024 14:18
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4698 files.

Valid Invalid Ignored Fixed
2048 4 2646 0
Click to see the invalid file list
  • src/connector/src/sink/iceberg/precomputed_partition_writer.rs
  • src/expr/core/src/expr/external/iceberg.rs
  • src/expr/core/src/expr/external/mod.rs
  • src/expr/impl/src/scalar/construct_struct.rs

src/expr/core/src/expr/external/iceberg.rs Outdated Show resolved Hide resolved
src/expr/core/src/expr/external/mod.rs Outdated Show resolved Hide resolved
@ZENOTME ZENOTME force-pushed the zj/partition_compute branch from 3f70156 to 585fb32 Compare January 7, 2024 14:22
@ZENOTME ZENOTME force-pushed the zj/partition_compute branch from 585fb32 to b4600d1 Compare January 7, 2024 14:42
@tabVersion tabVersion self-requested a review January 7, 2024 15:41
@ZENOTME ZENOTME force-pushed the zj/partition_compute branch from e6d7230 to 0e9f74a Compare January 8, 2024 02:49
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 8, 2024

I have rewritten this PR using the project node, this PR is about the whole process. For easier to review, maybe we can split this PR into smaller PRs if this draft looks good to move:

  1. Introduce new expression:
    a. external expression for partition compute
    b. construct struct expression
  2. Introduce precompute partition writer for iceberg sink
  3. Introduce the partition compute concept. It including:
    a. get PartitionComputeInfo in frontend.
    b. Convert PartitionComputeInfo as a Project before sink.

@liurenjie1024
Copy link
Contributor

I have rewritten this PR using the project node, this PR is about the whole process. For easier to review, maybe we can split this PR into smaller PRs if this draft looks good to move:

  1. Introduce new expression:
    a. external expression for partition compute
    b. construct struct expression
  2. Introduce precompute partition writer for iceberg sink
  3. Introduce the partition compute concept. It including:
    a. get PartitionComputeInfo in frontend.
    b. Convert PartitionComputeInfo as a Project before sink.

Generally LGTM, +1 with the idea of splitting it into smaller prs for easier review.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Apr 1, 2024

Close by #14710

@ZENOTME ZENOTME closed this Apr 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants