-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support compression codec in kafka sink #12439
feat(sink): support compression codec in kafka sink #12439
Conversation
daeb2b2
to
1590110
Compare
src/connector/src/sink/kafka.rs
Outdated
@@ -71,6 +72,25 @@ const fn _default_force_append_only() -> bool { | |||
false | |||
} | |||
|
|||
#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] | |||
enum CompressionCodec { | |||
#[serde(rename = "none")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we need serde here, appreciate some guidance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't. #[serde_as(as = "Option<DisplayFromStr>")]
will delegate the implementation to FromStr
and Display
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -763,6 +806,7 @@ mod test { | |||
"properties.bootstrap.server".to_string() => "localhost:29092".to_string(), | |||
"type".to_string() => "append-only".to_string(), | |||
"topic".to_string() => "test_topic".to_string(), | |||
"properties.compression.codec".to_string() => "zstd".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: checked in the local kafka broker and the message is indeed produced using zstd compression.
command used:
./.risingwave/bin/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files <path-to-.risingwave>/data/kafka-29092/test_topic-0/00000000000000000000.log --print-data-log | grep compresscodec
src/connector/src/sink/kafka.rs
Outdated
@@ -71,6 +72,25 @@ const fn _default_force_append_only() -> bool { | |||
false | |||
} | |||
|
|||
#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] | |||
enum CompressionCodec { | |||
#[serde(rename = "none")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't. #[serde_as(as = "Option<DisplayFromStr>")]
will delegate the implementation to FromStr
and Display
.
src/connector/src/sink/kafka.rs
Outdated
#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] | ||
enum CompressionCodec { | ||
#[serde(rename = "none")] | ||
#[strum(serialize = "none")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a serialize_all
attribute to the whole enum
like this one:
#[strum(serialize_all = "snake_case")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the tip, done 😄
1590110
to
7cb3903
Compare
@BugenZhao I'm not sure why the "Hakari Fix" check is failing. Maybe it fails on PRs from forked repos? |
c82c4ed
to
219003f
Compare
I guess there might be something wrong with the checkout |
219003f
to
1625a65
Compare
I tried changing from |
Update: the following seems to work
Ref: actions/checkout#551 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for your contribution.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This aims to support setting
compression.codec
in kafka sink.Closes #12435
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Support setting kafka producer
compression.codec
in kafka sink