Skip to content

Commit

Permalink
fix: allow configure message.timeout.ms and max.in.flight for kafka s…
Browse files Browse the repository at this point in the history
…ink (#12574)
  • Loading branch information
hzxa21 authored Sep 28, 2023
1 parent a8f21ed commit 78b83df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 32 deletions.
1 change: 0 additions & 1 deletion integration_tests/kafka-cdc-sink/risingwave.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ connector = 'kafka',
properties.bootstrap.server='message_queue:29092',
topic = 'counts',
type = 'debezium',
use_transaction = 'false',
primary_key = 'id'
);
72 changes: 41 additions & 31 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ use crate::{

pub const KAFKA_SINK: &str = "kafka";

const fn _default_timeout() -> Duration {
Duration::from_secs(5)
}

const fn _default_max_retries() -> u32 {
3
}
Expand All @@ -62,12 +58,16 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

const fn _default_use_transaction() -> bool {
const fn _default_force_append_only() -> bool {
false
}

const fn _default_force_append_only() -> bool {
false
const fn _default_message_timeout_ms() -> usize {
5000
}

const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
Expand All @@ -80,6 +80,8 @@ enum CompressionCodec {
Zstd,
}

/// See <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>
/// for the detailed meaning of these librdkafka producer properties
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -140,6 +142,24 @@ pub struct RdKafkaPropertiesProducer {
#[serde(rename = "properties.compression.codec")]
#[serde_as(as = "Option<DisplayFromStr>")]
compression_codec: Option<CompressionCodec>,

/// Produce message timeout.
/// This value is used to limits the time a produced message waits for
/// successful delivery (including retries).
#[serde(
rename = "properties.message.timeout.ms",
default = "_default_message_timeout_ms"
)]
#[serde_as(as = "DisplayFromStr")]
message_timeout_ms: usize,

/// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
#[serde(
rename = "properties.max.in.flight.requests.per.connection",
default = "_default_max_in_flight_requests_per_connection"
)]
#[serde_as(as = "DisplayFromStr")]
max_in_flight_requests_per_connection: usize,
}

impl RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -171,6 +191,11 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = &self.compression_codec {
c.set("compression.codec", v.to_string());
}
c.set("message.timeout.ms", self.message_timeout_ms.to_string());
c.set(
"max.in.flight.requests.per.connection",
self.max_in_flight_requests_per_connection.to_string(),
);
}
}

Expand All @@ -193,13 +218,6 @@ pub struct KafkaConfig {
)]
pub force_append_only: bool,

#[serde(
rename = "properties.timeout",
default = "_default_timeout",
deserialize_with = "deserialize_duration_from_string"
)]
pub timeout: Duration,

#[serde(
rename = "properties.retry.max",
default = "_default_max_retries",
Expand All @@ -214,12 +232,6 @@ pub struct KafkaConfig {
)]
pub retry_interval: Duration,

#[serde(
default = "_default_use_transaction",
deserialize_with = "deserialize_bool_from_string"
)]
pub use_transaction: bool,

/// We have parsed the primary key for an upsert kafka sink into a `usize` vector representing
/// the indices of the pk columns in the frontend, so we simply store the primary key here
/// as a string.
Expand Down Expand Up @@ -372,7 +384,7 @@ pub struct KafkaSinkWriter {
}

impl KafkaSinkWriter {
pub async fn new(mut config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
pub async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
let inner: FutureProducer<PrivateLinkProducerContext> = {
let mut c = ClientConfig::new();

Expand All @@ -381,10 +393,7 @@ impl KafkaSinkWriter {
config.set_client(&mut c);

// ClientConfig configuration
c.set("bootstrap.servers", &config.common.brokers)
.set("message.timeout.ms", "5000");
// Note that we will not use transaction during sinking, thus set it to false
config.use_transaction = false;
c.set("bootstrap.servers", &config.common.brokers);

// Create the producer context, will be used to create the producer
let producer_ctx = PrivateLinkProducerContext::new(
Expand Down Expand Up @@ -631,6 +640,8 @@ mod test {
"properties.batch.num.messages".to_string() => "114514".to_string(),
"properties.batch.size".to_string() => "114514".to_string(),
"properties.compression.codec".to_string() => "zstd".to_string(),
"properties.message.timeout.ms".to_string() => "114514".to_string(),
"properties.max.in.flight.requests.per.connection".to_string() => "114514".to_string(),
};
let c = KafkaConfig::from_hashmap(props).unwrap();
assert_eq!(
Expand All @@ -641,6 +652,11 @@ mod test {
c.rdkafka_properties.compression_codec,
Some(CompressionCodec::Zstd)
);
assert_eq!(c.rdkafka_properties.message_timeout_ms, 114514);
assert_eq!(
c.rdkafka_properties.max_in_flight_requests_per_connection,
114514
);

let props: HashMap<String, String> = hashmap! {
// basic
Expand Down Expand Up @@ -682,12 +698,10 @@ mod test {
"topic".to_string() => "test".to_string(),
"type".to_string() => "append-only".to_string(),
"force_append_only".to_string() => "true".to_string(),
"use_transaction".to_string() => "False".to_string(),
"properties.security.protocol".to_string() => "SASL".to_string(),
"properties.sasl.mechanism".to_string() => "SASL".to_string(),
"properties.sasl.username".to_string() => "test".to_string(),
"properties.sasl.password".to_string() => "test".to_string(),
"properties.timeout".to_string() => "10s".to_string(),
"properties.retry.max".to_string() => "20".to_string(),
"properties.retry.interval".to_string() => "500ms".to_string(),
};
Expand All @@ -696,8 +710,6 @@ mod test {
assert_eq!(config.common.topic, "test");
assert_eq!(config.r#type, "append-only");
assert!(config.force_append_only);
assert!(!config.use_transaction);
assert_eq!(config.timeout, Duration::from_secs(10));
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));

Expand All @@ -710,8 +722,6 @@ mod test {
};
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert!(!config.force_append_only);
assert!(!config.use_transaction);
assert_eq!(config.timeout, Duration::from_secs(5));
assert_eq!(config.max_retry_num, 3);
assert_eq!(config.retry_interval, Duration::from_millis(100));

Expand Down

0 comments on commit 78b83df

Please sign in to comment.