Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support create iceberg source #14971

Merged
merged 18 commits into from
Feb 23, 2024

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Feb 4, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Related issue Feat: Batch ingest iceberg/file source #14742
  • Support create iceberg source.
  • iceberg source could not be used in streaming queries.
  • iceberg source enumerator only checks source schema against iceberg schema.
  • Because iceberg source has its own row format and doesn't need to be specified by users explicitly, we could omit it.

Iceberg source Example

create source s
( user_id bigint, user_name string)
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='demo_table'
);

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Copy link

gitguardian bot commented Feb 4, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 11fdfcd ci/scripts/regress-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Our GitHub checks need improvements? Share your feedbacks!

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

src/connector/src/source/iceberg/mod.rs Outdated Show resolved Hide resolved
src/connector/src/source/iceberg/mod.rs Outdated Show resolved Hide resolved
pub struct SourceEnumeratorInfo {
pub source_id: u32,
pub source: Option<Source>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Iceberg, we need this Source to validate that the schema users provided is validated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do that in the optimizer stage as other connectors do? During the create source process, we fetch all related info and do checks there

@@ -249,6 +249,18 @@ impl Parser {
} else {
ConnectorSchema::native().into()
})
} else if connector.contains("iceberg") {
let expected = ConnectorSchema::native();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the format and encode should be here, but it should not be the native. native means the format and encode is the same with Rsingwave's internal representation.
the FORMAT means how to get the operation(Insert/Delete) from the file, the ENCODE is means how to get the rows containment from the bytes, such as parquet/csv/json.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think actually the issue is that "batch-only source" does not need to resolve the operation from the file... so the streaming connector's “FORMAT” is meaningless for it... And I am not sure about the encode, does it stored in iceberg's catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think we need encode or format for iceberg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for iceberg. We don't need any format or encoding, because they are self-explanatory and users don't need to specify any format and encoding. We could introduce another Format works like Native to meet this requirement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many places expect source format and encoding, so I don't want to make them accept an Option<SourceSchema>. Introduce a trivial format and encoding for iceberg LGTM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think actually the issue is that "batch-only source" does not need to resolve the operation from the file... so the streaming connector's “FORMAT” is meaningless for it... And I am not sure about the encode, does it stored in iceberg's catalog?

here we just want to load from the iceberg, all chunks are written as insert I think. format plain should be ok, which means append only. encode can be a random placeholder, eg. iceberg, to mark it as a spec encode and not for common cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if it can be considered as a kind of format plain... especially considering there could be deletion files in Iceberge. I am thinking if we need ConnectorSchema for the iceberg connector.

Copy link
Contributor

@st1page st1page left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

src/connector/src/sink/catalog/mod.rs Show resolved Hide resolved
pub struct SourceEnumeratorInfo {
pub source_id: u32,
pub source: Option<Source>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do that in the optimizer stage as other connectors do? During the create source process, we fetch all related info and do checks there

Comment on lines +74 to +88
fn id(&self) -> SplitId {
unimplemented!()
}

fn restore_from_json(_value: JsonbVal) -> anyhow::Result<Self> {
unimplemented!()
}

fn encode_to_json(&self) -> JsonbVal {
unimplemented!()
}

fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> {
unimplemented!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these going to be impl? or left them here on purpose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of them would be implemented for batch-read in a later PR. But most of them related to streaming are left unimplemented on purpose.

src/frontend/src/handler/create_source.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@fuyufjh fuyufjh self-requested a review February 21, 2024 02:47
@xiangjinwu

This comment was marked as off-topic.

@chenzl25
Copy link
Contributor Author

We have so many different names for the same aws concepts, and this list may not be exhaustive ...

Not a blocking issue as we can just support many different aliases. But would like to bring it to people's attention.

We have iceberg sink and I just think iceberg source should have similar field names to the sink.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch select from iceberg source is not implemented in this PR, right? If so, how about change the PR title to feat(frontend): support create iceberg source? 😄

The PR itself LGTM.

src/sqlparser/src/ast/statement.rs Show resolved Hide resolved
@chenzl25 chenzl25 changed the title feat(batch,stream): support create iceberg source feat(frontend): support create iceberg source Feb 21, 2024
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the work.

@chenzl25 chenzl25 enabled auto-merge February 21, 2024 15:47
auto-merge was automatically disabled February 22, 2024 06:09

Merge queue setting changed

@chenzl25 chenzl25 enabled auto-merge February 22, 2024 09:22
auto-merge was automatically disabled February 23, 2024 01:45

Merge queue setting changed

@chenzl25 chenzl25 added this pull request to the merge queue Feb 23, 2024
Merged via the queue into main with commit 0c329e9 Feb 23, 2024
36 of 38 checks passed
@chenzl25 chenzl25 deleted the dylan/support_create_iceberg_source branch February 23, 2024 05:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants