Skip to content

Commit

Permalink
feat(sink): use official default message_timeout_ms
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 28, 2024
1 parent 9587945 commit 11f2ea2
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ const fn _default_retry_backoff() -> Duration {
Duration::from_millis(100)
}

const fn _default_message_timeout_ms() -> usize {
5000
}

const fn _default_max_in_flight_requests_per_connection() -> usize {
5
}
Expand Down Expand Up @@ -152,12 +148,9 @@ pub struct RdKafkaPropertiesProducer {
/// Produce message timeout.
/// This value is used to limits the time a produced message waits for
/// successful delivery (including retries).
#[serde(
rename = "properties.message.timeout.ms",
default = "_default_message_timeout_ms"
)]
#[serde_as(as = "DisplayFromStr")]
message_timeout_ms: usize,
#[serde(rename = "properties.message.timeout.ms")]
#[serde_as(as = "Option<DisplayFromStr>")]
message_timeout_ms: Option<usize>,

/// The maximum number of unacknowledged requests the client will send on a single connection before blocking.
#[serde(
Expand Down Expand Up @@ -207,7 +200,9 @@ impl RdKafkaPropertiesProducer {
if let Some(v) = self.request_required_acks {
c.set("request.required.acks", v.to_string());
}
c.set("message.timeout.ms", self.message_timeout_ms.to_string());
if let Some(v) = self.message_timeout_ms {
c.set("message.timeout.ms", v.to_string());
}
c.set(
"max.in.flight.requests.per.connection",
self.max_in_flight_requests_per_connection.to_string(),
Expand Down Expand Up @@ -635,7 +630,10 @@ mod test {
c.rdkafka_properties_producer.compression_codec,
Some(CompressionCodec::Zstd)
);
assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514);
assert_eq!(
c.rdkafka_properties_producer.message_timeout_ms,
Some(114514)
);
assert_eq!(
c.rdkafka_properties_producer
.max_in_flight_requests_per_connection,
Expand Down

0 comments on commit 11f2ea2

Please sign in to comment.