Skip to content

Commit

Permalink
feat(connector): add request.required.acks for kafka connector produc…
Browse files Browse the repository at this point in the history
…er (#16482)
  • Loading branch information
yuhao-su authored Apr 26, 2024
1 parent a4a9110 commit 4ba2e66
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ pub struct RdKafkaPropertiesProducer {
)]
#[serde_as(as = "DisplayFromStr")]
max_in_flight_requests_per_connection: usize,

#[serde(rename = "properties.request.required.acks")]
#[serde_as(as = "Option<DisplayFromStr>")]
request_required_acks: Option<i32>,
}

impl RdKafkaPropertiesProducer {
Expand Down Expand Up @@ -196,6 +200,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = &self.compression_codec {
c.set("compression.codec", v.to_string());
}
if let Some(v) = self.request_required_acks {
c.set("request.required.acks", v.to_string());
}
c.set("message.timeout.ms", self.message_timeout_ms.to_string());
c.set(
"max.in.flight.requests.per.connection",
Expand Down Expand Up @@ -603,6 +610,7 @@ mod test {
"properties.compression.codec".to_string() => "zstd".to_string(),
"properties.message.timeout.ms".to_string() => "114514".to_string(),
"properties.max.in.flight.requests.per.connection".to_string() => "114514".to_string(),
"properties.request.required.acks".to_string() => "-1".to_string(),
};
let c = KafkaConfig::from_hashmap(props).unwrap();
assert_eq!(
Expand All @@ -619,6 +627,10 @@ mod test {
.max_in_flight_requests_per_connection,
114514
);
assert_eq!(
c.rdkafka_properties_producer.request_required_acks,
Some(-1)
);

let props: HashMap<String, String> = hashmap! {
// basic
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ KafkaConfig:
comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.
required: false
default: '5'
- name: properties.request.required.acks
field_type: i32
required: false
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
Expand Down

0 comments on commit 4ba2e66

Please sign in to comment.