-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: draft stream partition compute #14135
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4637 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2037 | 5 | 2595 | 0 |
Click to see the invalid file list
- src/connector/src/sink/iceberg/precomputed_partition_writer.rs
- src/frontend/src/optimizer/plan_node/stream_partition_compute.rs
- src/stream/src/executor/partition_compute/iceberg.rs
- src/stream/src/executor/partition_compute/mod.rs
- src/stream/src/from_proto/partition_compute.rs
src/frontend/src/optimizer/plan_node/stream_partition_compute.rs
Outdated
Show resolved
Hide resolved
d2d8045
to
d8a9274
Compare
cc @ZENOTME Is this pr ready for review? |
Yes. And this PR is a basic framework. I left several todo which can be fix in this PR or I can separate this PR into smaller PR(frontend, executor part) and fix them in those. But I'm think the basic framework is ready to be review now. |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #14135 +/- ##
==========================================
- Coverage 68.04% 67.91% -0.13%
==========================================
Files 1541 1546 +5
Lines 265735 266229 +494
==========================================
- Hits 180811 180808 -3
- Misses 84924 85421 +497
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The framework generally LGTM. I think we need some small adjustment.
.fields | ||
.push(Field::with_name(partition_type.clone(), "_rw_partition")); | ||
let mut watermark_columns = input.watermark_columns().clone(); | ||
watermark_columns.grow(input.watermark_columns().len() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
input: PlanRef, | ||
connector_name: String, | ||
partition_fields: Vec<PartitionField>, | ||
partition_type: DataType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making it an explicit StructType
?
use crate::stream_fragmenter::BuildFragmentGraphState; | ||
use crate::PlanRef; | ||
|
||
pub struct PartitionComputeInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comment here that now we only support iceberg. I think in future if we plan to support more sinks such as delta, we may need to change it to enum, but it leaves to future.
) | ||
.into(); | ||
Ok(( | ||
RequiredDist::shard_by_key( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to our design, we don't always need to do this sharding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which case you mean? I think if we have the partition compute, then we have to do the shard. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can refer to the diagram below:
flowchart TD
Start((start))
End((end))
Start --> append_only{Sink append only?}
append_only -->|yes| iceberg_range_only_part{Range only partition?}
iceberg_range_only_part -->|yes| no_shuffle(No shuffle)
iceberg_range_only_part -->|no| append_only_shuffle(Shuffle by partition columns)
no_shuffle --> End
append_only_shuffle --> End
append_only -->|no| upsert_iceberg_range_only_part{Range only partition?}
upsert_iceberg_range_only_part -->|yes| upsert_shuffle_by_stream_part_key(Shuffle by stream key)
upsert_shuffle_by_stream_part_key --> End
upsert_iceberg_range_only_part -->|No| upsert_shuffle_by_part_key(Shuffle by partition columns)
upsert_shuffle_by_part_key --> End
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this logic in
async fn get_partition_compute_info_for_iceberg( |
// use icelake::types::{PartitionSpec, PartitionField}; | ||
// use itertools::Itertools; | ||
|
||
// pub fn create_partition() -> PartitionSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are copied from PartitionedWriter
? I would suggest to remove them and add some ut.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will add the ut later.
|
||
/// Partition append only writer | ||
pub struct PrecomputedPartitionedWriter<B: IcebergWriterBuilder> { | ||
inner_writers: HashMap<OwnedRow, (B::R, icelake::types::StructValue)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are missing metrics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will complete this later.
self.partition_type.clone().try_into().map_err(|e| { | ||
icelake::Error::new(icelake::ErrorKind::ArrowError, format!("{}", e)) | ||
})?; | ||
let partition_col_idx = schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think rather than looking this, it would be safer to pass the partition index to writer builder?
transform: Box<dyn Transform>, | ||
} | ||
|
||
pub struct PartitionComputeExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we really need an extra PartitionComputeExecutor
, this is in fact a ProjectionExecutor
, and what's really required is a TransformExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we use a Project instead and append the partition field, like:🤔
|field 1|field 2| -> |field 1|field 2|partition field 1|partition field 2|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to put all partition fields under one struct type? This we can avoid name conflict as much as possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. And we should implement an expression which take multiple field and combine them as a struct first, seems we don't have a such a expression right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do this just in the exprssion? StructArray is also an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I got your point.
pub transform: String, | ||
} | ||
|
||
/// [`StreamPartitionCompute`] used to compute the partition of the stream in advance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add more comments here? It's not clear to people without background what does "partition" and "stream" and "in advance" mean here.
It would be better to describe why it is needed, how it works and when it is used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added doc in
/// ## Why we need `PartitionComputeInfo`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
a7c11fe
to
33117bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4688 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2049 | 4 | 2635 | 0 |
Click to see the invalid file list
- src/connector/src/sink/iceberg/precomputed_partition_writer.rs
- src/expr/impl/src/external/iceberg.rs
- src/expr/impl/src/external/mod.rs
- src/expr/impl/src/scalar/construct_struct.rs
76b6931
to
3f70156
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4698 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2048 | 4 | 2646 | 0 |
Click to see the invalid file list
- src/connector/src/sink/iceberg/precomputed_partition_writer.rs
- src/expr/core/src/expr/external/iceberg.rs
- src/expr/core/src/expr/external/mod.rs
- src/expr/impl/src/scalar/construct_struct.rs
3f70156
to
585fb32
Compare
585fb32
to
b4600d1
Compare
e6d7230
to
0e9f74a
Compare
I have rewritten this PR using the project node, this PR is about the whole process. For easier to review, maybe we can split this PR into smaller PRs if this draft looks good to move:
|
Generally LGTM, +1 with the idea of splitting it into smaller prs for easier review. |
Close by #14710 |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
draft #13898
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.