Skip to content

Commit

Permalink
feat(sink): use official default message_timeout_ms (#18304)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 29, 2024
1 parent 2529c56 commit 242dbff
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
22 changes: 10 additions & 12 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

const fn _default_message_timeout_ms() -> usize {
5000
}

const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}
Expand Down Expand Up @@ -150,12 +146,9 @@ pub struct RdKafkaPropertiesProducer {
/// Produce message timeout.
/// This value is used to limits the time a produced message waits for
/// successful delivery (including retries).
#[serde(
rename = "properties.message.timeout.ms",
default = "_default_message_timeout_ms"
)]
#[serde_as(as = "DisplayFromStr")]
message_timeout_ms: usize,
#[serde(rename = "properties.message.timeout.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_timeout_ms: Option<usize>,

/// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
#[serde(
Expand Down Expand Up @@ -205,7 +198,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = self.request_required_acks {
c.set("request.required.acks", v.to_string());
}
c.set("message.timeout.ms", self.message_timeout_ms.to_string());
if let Some(v) = self.message_timeout_ms {
c.set("message.timeout.ms", v.to_string());
}
c.set(
"max.in.flight.requests.per.connection",
self.max_in_flight_requests_per_connection.to_string(),
Expand Down Expand Up @@ -626,7 +621,10 @@ mod test {
c.rdkafka_properties_producer.compression_codec,
Some(CompressionCodec::Zstd)
);
assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514);
assert_eq!(
c.rdkafka_properties_producer.message_timeout_ms,
Some(114514)
);
assert_eq!(
c.rdkafka_properties_producer
.max_in_flight_requests_per_connection,
Expand Down
1 change: 0 additions & 1 deletion src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ KafkaConfig:
This value is used to limits the time a produced message waits for
successful delivery (including retries).
required: false
default: '5000'
- name: properties.max.in.flight.requests.per.connection
field_type: usize
comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.
Expand Down

0 comments on commit 242dbff

Please sign in to comment.