From 4ba2e667b0f70896ea32e19d86ebe6b3dd763aec Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Fri, 26 Apr 2024 11:19:33 -0500 Subject: [PATCH] feat(connector): add request.required.acks for kafka connector producer (#16482) --- src/connector/src/sink/kafka.rs | 12 ++++++++++++ src/connector/with_options_sink.yaml | 3 +++ 2 files changed, 15 insertions(+) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 8d56441e53488..f15173a4aabf6 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -162,6 +162,10 @@ pub struct RdKafkaPropertiesProducer { )] #[serde_as(as = "DisplayFromStr")] max_in_flight_requests_per_connection: usize, + + #[serde(rename = "properties.request.required.acks")] + #[serde_as(as = "Option")] + request_required_acks: Option, } impl RdKafkaPropertiesProducer { @@ -196,6 +200,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = &self.compression_codec { c.set("compression.codec", v.to_string()); } + if let Some(v) = self.request_required_acks { + c.set("request.required.acks", v.to_string()); + } c.set("message.timeout.ms", self.message_timeout_ms.to_string()); c.set( "max.in.flight.requests.per.connection", @@ -603,6 +610,7 @@ mod test { "properties.compression.codec".to_string() => "zstd".to_string(), "properties.message.timeout.ms".to_string() => "114514".to_string(), "properties.max.in.flight.requests.per.connection".to_string() => "114514".to_string(), + "properties.request.required.acks".to_string() => "-1".to_string(), }; let c = KafkaConfig::from_hashmap(props).unwrap(); assert_eq!( @@ -619,6 +627,10 @@ mod test { .max_in_flight_requests_per_connection, 114514 ); + assert_eq!( + c.rdkafka_properties_producer.request_required_acks, + Some(-1) + ); let props: HashMap = hashmap! { // basic diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 27daa718b64f9..e99be370aec86 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -319,6 +319,9 @@ KafkaConfig: comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking. required: false default: '5' + - name: properties.request.required.acks + field_type: i32 + required: false - name: broker.rewrite.endpoints field_type: HashMap comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.