Skip to content

Commit

Permalink
feat(sink): support compression codec in kafka sink
Browse files Browse the repository at this point in the history
Closes #12435
  • Loading branch information
racevedoo committed Sep 20, 2023
1 parent 6500c1e commit 7cb3903
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
Expand Down
35 changes: 35 additions & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};

use super::encoder::{JsonEncoder, TimestampHandlingMode};
use super::formatter::{
Expand Down Expand Up @@ -71,6 +72,16 @@ const fn _default_force_append_only() -> bool {
false
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
enum CompressionCodec {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -126,6 +137,11 @@ pub struct RdKafkaPropertiesProducer {
#[serde(rename = "properties.batch.size")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_size: Option<usize>,

/// Compression codec to use for compressing message sets.
#[serde(rename = "properties.compression.codec")]
#[serde_as(as = "Option<DisplayFromStr>")]
compression_codec: Option<CompressionCodec>,
}

impl RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -154,6 +170,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = self.batch_size {
c.set("batch.size", v.to_string());
}
if let Some(v) = &self.compression_codec {
c.set("compression.codec", v.to_string());
}
}
}

Expand Down Expand Up @@ -651,12 +670,17 @@ mod test {
"properties.retry.backoff.ms".to_string() => "114514".to_string(),
"properties.batch.num.messages".to_string() => "114514".to_string(),
"properties.batch.size".to_string() => "114514".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
};
let c = KafkaConfig::from_hashmap(props).unwrap();
assert_eq!(
c.rdkafka_properties.queue_buffering_max_ms,
Some(114.514f64)
);
assert_eq!(
c.rdkafka_properties.compression_codec,
Some(CompressionCodec::Zstd)
);

let props: HashMap<String, String> = hashmap! {
// basic
Expand All @@ -678,6 +702,16 @@ mod test {
"properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative
};
assert!(KafkaConfig::from_hashmap(props).is_err());

let props: HashMap<String, String> = hashmap! {
// basic
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.compression.codec".to_string() => "notvalid".to_string(), // has to be a valid CompressionCodec
};
assert!(KafkaConfig::from_hashmap(props).is_err());
}

#[test]
Expand Down Expand Up @@ -763,6 +797,7 @@ mod test {
"properties.bootstrap.server".to_string() => "localhost:29092".to_string(),
"type".to_string() => "append-only".to_string(),
"topic".to_string() => "test_topic".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
};

// Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here
Expand Down

0 comments on commit 7cb3903

Please sign in to comment.