diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 588c5d99ae955..434b1d45ca450 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -63,10 +63,6 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } -const fn _default_message_timeout_ms() -> usize { - 5000 -} - const fn _default_max_in_flight_requests_per_connection() -> usize { 5 } @@ -152,12 +148,9 @@ pub struct RdKafkaPropertiesProducer { /// Produce message timeout. /// This value is used to limits the time a produced message waits for /// successful delivery (including retries). - #[serde( - rename = "properties.message.timeout.ms", - default = "_default_message_timeout_ms" - )] - #[serde_as(as = "DisplayFromStr")] - message_timeout_ms: usize, + #[serde(rename = "properties.message.timeout.ms")] + #[serde_as(as = "Option")] + message_timeout_ms: Option, /// The maximum number of unacknowledged requests the client will send on a single connection before blocking. #[serde( @@ -207,7 +200,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = self.request_required_acks { c.set("request.required.acks", v.to_string()); } - c.set("message.timeout.ms", self.message_timeout_ms.to_string()); + if let Some(v) = self.message_timeout_ms { + c.set("message.timeout.ms", v.to_string()); + } c.set( "max.in.flight.requests.per.connection", self.max_in_flight_requests_per_connection.to_string(), @@ -635,7 +630,10 @@ mod test { c.rdkafka_properties_producer.compression_codec, Some(CompressionCodec::Zstd) ); - assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514); + assert_eq!( + c.rdkafka_properties_producer.message_timeout_ms, + Some(114514) + ); assert_eq!( c.rdkafka_properties_producer .max_in_flight_requests_per_connection, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index c6c36917e9c21..b60a83b3b2f6e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -507,7 +507,6 @@ KafkaConfig: This value is used to limits the time a produced message waits for successful delivery (including retries). required: false - default: '5000' - name: properties.max.in.flight.requests.per.connection field_type: usize comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.