Skip to content

Commit

Permalink
feat(sink): support compression codec in kafka sink
Browse files Browse the repository at this point in the history
Closes #12435
  • Loading branch information
racevedoo committed Sep 19, 2023
1 parent ed96064 commit daeb2b2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
strum = "0.25"
strum_macros = "0.25"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
Expand Down
45 changes: 45 additions & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};

use super::encoder::{JsonEncoder, TimestampHandlingMode};
use super::formatter::{
Expand Down Expand Up @@ -71,6 +72,26 @@ const fn _default_force_append_only() -> bool {
false
}

// none, gzip, snappy, lz4, zstd
#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
enum CompressionCodec {
#[serde(rename = "none")]
#[strum(serialize = "none")]
None,
#[serde(rename = "gzip")]
#[strum(serialize = "gzip")]
Gzip,
#[serde(rename = "snappy")]
#[strum(serialize = "snappy")]
Snappy,
#[serde(rename = "lz4")]
#[strum(serialize = "lz4")]
Lz4,
#[serde(rename = "zstd")]
#[strum(serialize = "zstd")]
Zstd,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -126,6 +147,11 @@ pub struct RdKafkaPropertiesProducer {
#[serde(rename = "properties.batch.size")]
#[serde_as(as = "Option<DisplayFromStr>")]
batch_size: Option<usize>,

/// Compression codec to use for compressing message sets.
#[serde(rename = "properties.compression.codec")]
#[serde_as(as = "Option<DisplayFromStr>")]
compression_codec: Option<CompressionCodec>,
}

impl RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -154,6 +180,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = self.batch_size {
c.set("batch.size", v.to_string());
}
if let Some(v) = &self.compression_codec {
c.set("compression.codec", v.to_string());
}
}
}

Expand Down Expand Up @@ -651,12 +680,17 @@ mod test {
"properties.retry.backoff.ms".to_string() => "114514".to_string(),
"properties.batch.num.messages".to_string() => "114514".to_string(),
"properties.batch.size".to_string() => "114514".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
};
let c = KafkaConfig::from_hashmap(props).unwrap();
assert_eq!(
c.rdkafka_properties.queue_buffering_max_ms,
Some(114.514f64)
);
assert_eq!(
c.rdkafka_properties.compression_codec,
Some(CompressionCodec::Zstd)
);

let props: HashMap<String, String> = hashmap! {
// basic
Expand All @@ -678,6 +712,16 @@ mod test {
"properties.queue.buffering.max.kbytes".to_string() => "-114514".to_string(), // usize cannot be negative
};
assert!(KafkaConfig::from_hashmap(props).is_err());

let props: HashMap<String, String> = hashmap! {
// basic
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"properties.compression.codec".to_string() => "notvalid".to_string(), // has to be a valid CompressionCodec
};
assert!(KafkaConfig::from_hashmap(props).is_err());
}

#[test]
Expand Down Expand Up @@ -763,6 +807,7 @@ mod test {
"properties.bootstrap.server".to_string() => "localhost:29092".to_string(),
"type".to_string() => "append-only".to_string(),
"topic".to_string() => "test_topic".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
};

// Create a table with two columns (| id : INT32 | v2 : VARCHAR |) here
Expand Down

0 comments on commit daeb2b2

Please sign in to comment.