From 8f220de0935e6cd792ec4f21b4ef3ad13bf41f7f Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Thu, 28 Sep 2023 14:24:22 +0800 Subject: [PATCH] feat(sink): await on kafka delivery future on QueueFull (#12546) --- src/connector/src/sink/kafka.rs | 52 +++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index c2c76dd2b2e88..ed3cde4846491 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -350,10 +350,14 @@ impl Sink for KafkaSink { } } -/// The delivery buffer queue size /// When the `DeliveryFuture` the current `future_delivery_buffer` -/// is buffering is greater than this size, then enforcing commit once -const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536; +/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`, +/// then enforcing commit once +const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2; +/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified. +/// This default value is determined based on the librdkafka default. See the following doc for more details: +/// +const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000; struct KafkaPayloadWriter { inner: FutureProducer, @@ -421,12 +425,27 @@ impl KafkaPayloadWriter { let mut ret = Ok(()); - for _ in 0..self.config.max_retry_num { + let max_delivery_buffer_size = (self + .config + .rdkafka_properties + .queue_buffering_max_messages + .as_ref() + .cloned() + .unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32 + * KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize; + + for i in 0..self.config.max_retry_num { match self.inner.send_result(record) { Ok(delivery_future) => { // First check if the current length is // greater than the preset limit - while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE { + while self.future_delivery_buffer.len() >= max_delivery_buffer_size { + tracing::warn!( + "Number of records being delivered ({}) >= expected kafka producer queue size ({}). + This indicates the default value of queue.buffering.max.messages has changed.", + self.future_delivery_buffer.len(), + max_delivery_buffer_size + ); Self::map_future_result( self.future_delivery_buffer .pop_front() @@ -442,17 +461,26 @@ impl KafkaPayloadWriter { // The enqueue buffer is full, `send_result` will immediately return // We can retry for another round after sleeping for sometime Err((e, rec)) => { + tracing::warn!( + "producing message (key {:?}) to topic {} failed, err {:?}.", + rec.key.map(|k| k.to_bytes()), + rec.topic, + e + ); record = rec; match e { - err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) - | err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => { + KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => { tracing::warn!( - "producing message (key {:?}) to topic {} failed, err {:?}, retrying", - record.key.map(|k| k.to_bytes()), - record.topic, - err + "Producer queue full. Delivery future buffer size={}. Await and retry #{}", + self.future_delivery_buffer.len(), + i ); - tokio::time::sleep(self.config.retry_interval).await; + Self::map_future_result( + self.future_delivery_buffer + .pop_front() + .expect("Expect the future not to be None") + .await, + )?; continue; } _ => return Err(e),