Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support compression codec in kafka sink #12439

Merged

Conversation

racevedoo
Copy link
Contributor

@racevedoo racevedoo commented Sep 19, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This aims to support setting compression.codec in kafka sink.

Closes #12435

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Support setting kafka producer compression.codec in kafka sink

@racevedoo racevedoo requested a review from a team as a code owner September 19, 2023 17:27
@racevedoo racevedoo force-pushed the kafka-sink-compression-codec branch 2 times, most recently from daeb2b2 to 1590110 Compare September 19, 2023 20:01
@@ -71,6 +72,25 @@ const fn _default_force_append_only() -> bool {
false
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
enum CompressionCodec {
#[serde(rename = "none")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we need serde here, appreciate some guidance

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't. #[serde_as(as = "Option<DisplayFromStr>")] will delegate the implementation to FromStr and Display.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -763,6 +806,7 @@ mod test {
"properties.bootstrap.server".to_string() => "localhost:29092".to_string(),
"type".to_string() => "append-only".to_string(),
"topic".to_string() => "test_topic".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: checked in the local kafka broker and the message is indeed produced using zstd compression.

command used:

./.risingwave/bin/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files <path-to-.risingwave>/data/kafka-29092/test_topic-0/00000000000000000000.log --print-data-log | grep compresscodec

@xxchan xxchan requested review from tabVersion and wenym1 September 20, 2023 03:24
@@ -71,6 +72,25 @@ const fn _default_force_append_only() -> bool {
false
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
enum CompressionCodec {
#[serde(rename = "none")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't. #[serde_as(as = "Option<DisplayFromStr>")] will delegate the implementation to FromStr and Display.

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
enum CompressionCodec {
#[serde(rename = "none")]
#[strum(serialize = "none")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a serialize_all attribute to the whole enum like this one:

#[strum(serialize_all = "snake_case")]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the tip, done 😄

@racevedoo racevedoo force-pushed the kafka-sink-compression-codec branch from 1590110 to 7cb3903 Compare September 20, 2023 11:59
@racevedoo racevedoo requested a review from BugenZhao September 20, 2023 12:01
@racevedoo
Copy link
Contributor Author

@BugenZhao I'm not sure why the "Hakari Fix" check is failing. Maybe it fails on PRs from forked repos?

@racevedoo racevedoo force-pushed the kafka-sink-compression-codec branch 2 times, most recently from c82c4ed to 219003f Compare September 21, 2023 10:20
@BugenZhao
Copy link
Member

@BugenZhao I'm not sure why the "Hakari Fix" check is failing. Maybe it fails on PRs from forked repos?

I guess there might be something wrong with the checkout ref. 🤔 @TennyZhuang Would you please help to take a look?

@racevedoo racevedoo force-pushed the kafka-sink-compression-codec branch from 219003f to 1625a65 Compare September 21, 2023 10:29
@racevedoo
Copy link
Contributor Author

racevedoo commented Sep 21, 2023

I guess there might be something wrong with the checkout ref. 🤔 @TennyZhuang Would you please help to take a look?

I tried changing from ${{ github.head_ref }} to ${{ github.event.pull_request.head.ref }}, but it didn't work. I'll try some other options.

@racevedoo
Copy link
Contributor Author

Update: the following seems to work

          ref: ${{ github.event.pull_request.head.ref }}
          repository: ${{ github.event.pull_request.head.repo.full_name }}

Ref: actions/checkout#551

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for your contribution.

@BugenZhao BugenZhao added this pull request to the merge queue Sep 25, 2023
Merged via the queue into risingwavelabs:main with commit bc43c39 Sep 25, 2023
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support compression in kafka sink
2 participants