Skip to content

Commit

Permalink
feat(sink): await on kafka delivery future on QueueFull (#12546)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Sep 28, 2023
1 parent 1faa565 commit 8f220de
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,14 @@ impl Sink for KafkaSink {
}
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536;
/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`,
/// then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified.
/// This default value is determined based on the librdkafka default. See the following doc for more details:
/// <https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md>
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;

struct KafkaPayloadWriter {
inner: FutureProducer<PrivateLinkProducerContext>,
Expand Down Expand Up @@ -421,12 +425,27 @@ impl KafkaPayloadWriter {

let mut ret = Ok(());

for _ in 0..self.config.max_retry_num {
let max_delivery_buffer_size = (self
.config
.rdkafka_properties
.queue_buffering_max_messages
.as_ref()
.cloned()
.unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
* KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;

for i in 0..self.config.max_retry_num {
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
while self.future_delivery_buffer.len() >= max_delivery_buffer_size {
tracing::warn!(
"Number of records being delivered ({}) >= expected kafka producer queue size ({}).
This indicates the default value of queue.buffering.max.messages has changed.",
self.future_delivery_buffer.len(),
max_delivery_buffer_size
);
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
Expand All @@ -442,17 +461,26 @@ impl KafkaPayloadWriter {
// The enqueue buffer is full, `send_result` will immediately return
// We can retry for another round after sleeping for sometime
Err((e, rec)) => {
tracing::warn!(
"producing message (key {:?}) to topic {} failed, err {:?}.",
rec.key.map(|k| k.to_bytes()),
rec.topic,
e
);
record = rec;
match e {
err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
| err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::warn!(
"producing message (key {:?}) to topic {} failed, err {:?}, retrying",
record.key.map(|k| k.to_bytes()),
record.topic,
err
"Producer queue full. Delivery future buffer size={}. Await and retry #{}",
self.future_delivery_buffer.len(),
i
);
tokio::time::sleep(self.config.retry_interval).await;
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
.expect("Expect the future not to be None")
.await,
)?;
continue;
}
_ => return Err(e),
Expand Down

0 comments on commit 8f220de

Please sign in to comment.