Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support precompute partition for iceberg #14710

Merged
merged 8 commits into from
Feb 1, 2024

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Jan 22, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This is the final part of #13898:

  1. support get partition info at frontend
  2. add the project node to compute the partition value and exchange node to shuffle by it

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@ZENOTME ZENOTME requested a review from a team as a code owner January 22, 2024 04:48
@ZENOTME ZENOTME force-pushed the zj/partition_compute_final branch 2 times, most recently from 83392c9 to bc926bd Compare January 22, 2024 08:13
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, thanks!

proto/stream_plan.proto Show resolved Hide resolved
src/connector/src/sink/catalog/desc.rs Show resolved Hide resolved
@@ -177,7 +178,7 @@ impl IcebergConfig {
self.catalog_type.as_deref().unwrap_or("storage")
}

fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
pub fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to make this pub?

@@ -346,6 +347,55 @@ impl IcebergConfig {
}
}

pub async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> {
async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> {

src/connector/src/sink/iceberg/mod.rs Show resolved Hide resolved
.load_table(&table_id)
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;
async fn create_validated_table(&self) -> Result<Table> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn create_validated_table(&self) -> Result<Table> {
async fn create_and_validate_table(&self) -> Result<Table> {

let table = create_table(iceberg_config).await?;
let partition_spec = table
.current_table_metadata()
.current_partition_spec()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not throw error here, user may create a table without partition.

@@ -133,9 +236,42 @@ impl StreamSink {
Ok(Self::new(input, sink))
}

fn derive_iceberg_sink_distribution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some planner tests for this? I think it would better to ensure its consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it inconvenient to test it for Iceberg because I need to connect the catalog and get related info.
We test the sink using handle_explain, which means that there isn't a place to insert the mock partition info in plan test.

  • One way to test it is to mock the partition info in blackhole connector. But this will add the code for test in frontend, I'm not sure whether it's a good way.
pub async fn get_partition_compute_info(
    with_options: &WithOptions,
) -> Result<Option<PartitionComputeInfo>> {
   match connector.as_str() {
        ICEBERG_SINK => {
            ..
        }
       BLACK_HOLE => { ..//generate mock info }
        _ => Ok(None),
    }
}
  • Another way is to modify the test way for sink, maybe we can use create_sink instead of test using explain and add a param to pass the partition info so that we can mock the partition info outside.
    explain::handle_explain(handler_args, *statement, options, analyze).await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it inconvenient to test it for Iceberg because I need to connect the catalog and get related info.

How about creating a mocked iceberg catalog?

@BugenZhao
Copy link
Member

Is this PR specific to Iceberg? Would you please update the PR title to make it more detailed?

@ZENOTME ZENOTME changed the title feat: support precompute partition feat: support precompute partition for iceberg Jan 23, 2024
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 23, 2024

Is this PR specific to Iceberg? Would you please update the PR title to make it more detailed?

Sorry. Update it.

@ZENOTME ZENOTME force-pushed the zj/partition_compute_final branch from b46fa78 to 6d18a72 Compare January 24, 2024 14:31
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4768 files.

Valid Invalid Ignored Fixed
2077 2 2689 0
Click to see the invalid file list
  • src/common/src/array/arrow/arrow_iceberg.rs
  • src/connector/src/sink/iceberg/mock_catalog.rs

@@ -172,3 +172,45 @@
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] }
└─StreamEowcSort { sort_column: t2.b }
└─StreamTableScan { table: t2, columns: [a, b, _row_id] }
- id: create_mock_iceberg_sink_append_only_with_partition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cases seem not enough, in the rfc there are 4 cases, please add test for each case.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not familiar with the context, waiting for renjie's approval

/// ## Why we need `PartitionComputeInfo`?
///
/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>)
/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this also apply to other lakehouse downstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems delta also has partition concept. I guess it can apply it.🤔

Comment on lines +333 to +336
// Separate the partition spec into two parts: sparse partition and range partition.
// Sparse partition means that the data distribution is more sparse at a given time.
// Range partition means that the data distribution is likely same at a given time.
// Only compute the partition and shuffle by them for the sparse partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we are changing the distribution key to adjust each CN's hash range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can see that for sparse partition, we will add the exchange before sink.

└─StreamExchange { dist: HashShard($expr1) }

@ZENOTME ZENOTME force-pushed the zj/partition_compute_final branch from 8b263cb to 397faa4 Compare January 29, 2024 07:23
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@liurenjie1024
Copy link
Contributor

cc @xxchan @BugenZhao PTAL for cargo.lock

@liurenjie1024 liurenjie1024 requested a review from hzxa21 January 31, 2024 11:35
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for Cargo.lock

auto-merge was automatically disabled January 31, 2024 23:49

Merge queue setting changed

@ZENOTME ZENOTME added this pull request to the merge queue Feb 1, 2024
Merged via the queue into main with commit 7d0d43f Feb 1, 2024
42 of 43 checks passed
@ZENOTME ZENOTME deleted the zj/partition_compute_final branch February 1, 2024 03:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants