From 11f2ea264b43614e9f23fc88ac8f1711fb1d53ce Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 28 Aug 2024 18:16:23 +0800 Subject: [PATCH 1/2] feat(sink): use official default message_timeout_ms --- src/connector/src/sink/kafka.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 588c5d99ae95..434b1d45ca45 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -63,10 +63,6 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } -const fn _default_message_timeout_ms() -> usize { - 5000 -} - const fn _default_max_in_flight_requests_per_connection() -> usize { 5 } @@ -152,12 +148,9 @@ pub struct RdKafkaPropertiesProducer { /// Produce message timeout. /// This value is used to limits the time a produced message waits for /// successful delivery (including retries). - #[serde( - rename = "properties.message.timeout.ms", - default = "_default_message_timeout_ms" - )] - #[serde_as(as = "DisplayFromStr")] - message_timeout_ms: usize, + #[serde(rename = "properties.message.timeout.ms")] + #[serde_as(as = "Option")] + message_timeout_ms: Option, /// The maximum number of unacknowledged requests the client will send on a single connection before blocking. #[serde( @@ -207,7 +200,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = self.request_required_acks { c.set("request.required.acks", v.to_string()); } - c.set("message.timeout.ms", self.message_timeout_ms.to_string()); + if let Some(v) = self.message_timeout_ms { + c.set("message.timeout.ms", v.to_string()); + } c.set( "max.in.flight.requests.per.connection", self.max_in_flight_requests_per_connection.to_string(), @@ -635,7 +630,10 @@ mod test { c.rdkafka_properties_producer.compression_codec, Some(CompressionCodec::Zstd) ); - assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514); + assert_eq!( + c.rdkafka_properties_producer.message_timeout_ms, + Some(114514) + ); assert_eq!( c.rdkafka_properties_producer .max_in_flight_requests_per_connection, From 06584470528c758cec61ac4748f9bf1e817e38d3 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 29 Aug 2024 12:01:03 +0800 Subject: [PATCH 2/2] update yaml --- src/connector/with_options_sink.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index c6c36917e9c2..b60a83b3b2f6 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -507,7 +507,6 @@ KafkaConfig: This value is used to limits the time a produced message waits for successful delivery (including retries). required: false - default: '5000' - name: properties.max.in.flight.requests.per.connection field_type: usize comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.