From c82c4ed1bbf709f260b3beba957dd1bd978b44d8 Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Tue, 19 Sep 2023 14:20:42 -0300 Subject: [PATCH] feat(sink): support compression codec in kafka sink Closes https://github.com/risingwavelabs/risingwave/issues/12435 --- .github/workflows/hakari_fix.yml | 2 +- Cargo.lock | 2 ++ src/connector/Cargo.toml | 2 ++ src/connector/src/sink/kafka.rs | 35 ++++++++++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/.github/workflows/hakari_fix.yml b/.github/workflows/hakari_fix.yml index 670ca38cccc27..391331d0dc510 100644 --- a/.github/workflows/hakari_fix.yml +++ b/.github/workflows/hakari_fix.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v3 with: - ref: ${{ github.head_ref }} + ref: ${{ github.event.pull_request.head.ref }} - name: Install cargo-hakari uses: taiki-e/install-action@v2 diff --git a/Cargo.lock b/Cargo.lock index cf9a9dcd9540b..e9be35816ecde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6944,6 +6944,8 @@ dependencies = [ "serde_json", "serde_with 3.3.0", "simd-json", + "strum 0.25.0", + "strum_macros 0.25.2", "tempfile", "thiserror", "time", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index c8cb5a0204fa9..71ca46d808b9e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -100,6 +100,8 @@ serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" +strum = "0.25" +strum_macros = "0.25" tempfile = "3" thiserror = "1" time = "0.3.28" diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ec5192666916c..bd898dc24fecf 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::Schema; use risingwave_rpc_client::ConnectorClient; use serde_derive::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; +use strum_macros::{Display, EnumString}; use super::encoder::{JsonEncoder, TimestampHandlingMode}; use super::formatter::{ @@ -71,6 +72,16 @@ const fn _default_force_append_only() -> bool { false } +#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)] +#[strum(serialize_all = "snake_case")] +enum CompressionCodec { + None, + Gzip, + Snappy, + Lz4, + Zstd, +} + #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RdKafkaPropertiesProducer { @@ -126,6 +137,11 @@ pub struct RdKafkaPropertiesProducer { #[serde(rename = "properties.batch.size")] #[serde_as(as = "Option")] batch_size: Option, + + /// Compression codec to use for compressing message sets. + #[serde(rename = "properties.compression.codec")] + #[serde_as(as = "Option")] + compression_codec: Option, } impl RdKafkaPropertiesProducer { @@ -154,6 +170,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = self.batch_size { c.set("batch.size", v.to_string()); } + if let Some(v) = &self.compression_codec { + c.set("compression.codec", v.to_string()); + } } } @@ -651,12 +670,17 @@ mod test { "properties.retry.backoff.ms".to_string() => "114514".to_string(), "properties.batch.num.messages".to_string() => "114514".to_string(), "properties.batch.size".to_string() => "114514".to_string(), + "properties.compression.codec".to_string() => "zstd".to_string(), }; let c = KafkaConfig::from_hashmap(props).unwrap(); assert_eq!( c.rdkafka_properties.queue_buffering_max_ms, Some(114.514f64) ); + assert_eq!( + c.rdkafka_properties.compression_codec, + Some(CompressionCodec::Zstd) + ); let props: HashMap = hashmap! { // basic @@ -678,6 +702,16 @@ mod test { "properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative }; assert!(KafkaConfig::from_hashmap(props).is_err()); + + let props: HashMap = hashmap! { + // basic + "connector".to_string() => "kafka".to_string(), + "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), + "topic".to_string() => "test".to_string(), + "type".to_string() => "append-only".to_string(), + "properties.compression.codec".to_string() => "notvalid".to_string(), // has to be a valid CompressionCodec + }; + assert!(KafkaConfig::from_hashmap(props).is_err()); } #[test] @@ -763,6 +797,7 @@ mod test { "properties.bootstrap.server".to_string() => "localhost:29092".to_string(), "type".to_string() => "append-only".to_string(), "topic".to_string() => "test_topic".to_string(), + "properties.compression.codec".to_string() => "zstd".to_string(), }; // Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here