diff --git a/Cargo.lock b/Cargo.lock index cf9a9dcd9540b..e9be35816ecde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6944,6 +6944,8 @@ dependencies = [ "serde_json", "serde_with 3.3.0", "simd-json", + "strum 0.25.0", + "strum_macros 0.25.2", "tempfile", "thiserror", "time", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index c8cb5a0204fa9..71ca46d808b9e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -100,6 +100,8 @@ serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" +strum = "0.25" +strum_macros = "0.25" tempfile = "3" thiserror = "1" time = "0.3.28" diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ec5192666916c..bd898dc24fecf 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; +use strum_macros::{Display, EnumString}; use super::encoder::{JsonEncoder, TimestampHandlingMode}; use super::formatter::{ @@ -71,6 +72,16 @@ const fn _default_force_append_only() -> bool { false } +#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] +#[strum(serialize_all = "snake_case")] +enum CompressionCodec { + None, + Gzip, + Snappy, + Lz4, + Zstd, +} + #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RdKafkaPropertiesProducer { @@ -126,6 +137,11 @@ pub struct RdKafkaPropertiesProducer { #[serde(rename = "properties.batch.size")] #[serde_as(as = "Option")] batch_size: Option, + + /// Compression codec to use for compressing message sets. + #[serde(rename = "properties.compression.codec")] + #[serde_as(as = "Option")] + compression_codec: Option, } impl RdKafkaPropertiesProducer { @@ -154,6 +170,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = self.batch_size { c.set("batch.size", v.to_string()); } + if let Some(v) = &self.compression_codec { + c.set("compression.codec", v.to_string()); + } } } @@ -651,12 +670,17 @@ mod test { "properties.retry.backoff.ms".to_string() => "114514".to_string(), "properties.batch.num.messages".to_string() => "114514".to_string(), "properties.batch.size".to_string() => "114514".to_string(), + "properties.compression.codec".to_string() => "zstd".to_string(), }; let c = KafkaConfig::from_hashmap(props).unwrap(); assert_eq!( c.rdkafka_properties.queue_buffering_max_ms, Some(114.514f64) ); + assert_eq!( + c.rdkafka_properties.compression_codec, + Some(CompressionCodec::Zstd) + ); let props: HashMap = hashmap! { // basic @@ -678,6 +702,16 @@ mod test { "properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative }; assert!(KafkaConfig::from_hashmap(props).is_err()); + + let props: HashMap = hashmap! { + // basic + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "append-only".to_string(), + "properties.compression.codec".to_string() => "notvalid".to_string(), // has to be a valid CompressionCodec + }; + assert!(KafkaConfig::from_hashmap(props).is_err()); } #[test] @@ -763,6 +797,7 @@ mod test { "properties.bootstrap.server".to_string() => "localhost:29092".to_string(), "type".to_string() => "append-only".to_string(), "topic".to_string() => "test_topic".to_string(), + "properties.compression.codec".to_string() => "zstd".to_string(), }; // Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here