From a56553189157adb93cc8c3d1ea13198f4f4cc770 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Tue, 26 Sep 2023 19:46:24 +0800 Subject: [PATCH] feat(sink): await on kafka delivery future on QueueFull --- src/connector/src/sink/kafka.rs | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index a5a524048bfd..77060b4d228b 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -345,10 +345,14 @@ impl Sink for KafkaSink { } } -/// The delivery buffer queue size /// When the `DeliveryFuture` the current `future_delivery_buffer` -/// is buffering is greater than this size, then enforcing commit once -const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536; +/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`, +/// then enforcing commit once +const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2; +/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified. +/// This default value is determined based on the librdkafka default. See the following doc for more details: +/// https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md?plain=1#L139 +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000; struct KafkaPayloadWriter { inner: FutureProducer, @@ -416,12 +420,21 @@ impl KafkaPayloadWriter { let mut ret = Ok(()); + let max_delivery_buffer_size = (self + .config + .rdkafka_properties + .queue_buffering_max_messages + .as_ref() + .cloned() + .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 + * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; + for _ in 0..self.config.max_retry_num { match self.inner.send_result(record) { Ok(delivery_future) => { // First check if the current length is // greater than the preset limit - while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { + while self.future_delivery_buffer.len() >= max_delivery_buffer_size { Self::map_future_result( self.future_delivery_buffer .pop_front() @@ -439,15 +452,19 @@ impl KafkaPayloadWriter { Err((e, rec)) => { record = rec; match e { - err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) - | err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => { + err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { tracing::warn!( "producing message (key {:?}) to topic {} failed, err {:?}, retrying", record.key.map(|k| k.to_bytes()), record.topic, err ); - tokio::time::sleep(self.config.retry_interval).await; + Self::map_future_result( + self.future_delivery_buffer + .pop_front() + .expect("Expect the future not to be None") + .await, + )?; continue; } _ => return Err(e),