Skip to content

Commit

Permalink
feat(sink): await on kafka delivery future on QueueFull
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Sep 26, 2023
1 parent 09a1dcb commit a565531
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,14 @@ impl Sink for KafkaSink {
}
}

/// The delivery buffer queue size
/// When the `DeliveryFuture` the current `future_delivery_buffer`
/// is buffering is greater than this size, then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 65536;
/// is buffering is greater than `queue_buffering_max_messages` * `KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO`,
/// then enforcing commit once
const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
/// The default queue size used to enforce a commit in kafka producer if `queue.buffering.max.messages` is not specified.
/// This default value is determined based on the librdkafka default. See the following doc for more details:
/// https://github.com/confluentinc/librdkafka/blob/1cb80090dfc75f5a36eae3f4f8844b14885c045e/CONFIGURATION.md?plain=1#L139
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;

struct KafkaPayloadWriter {
inner: FutureProducer<PrivateLinkProducerContext>,
Expand Down Expand Up @@ -416,12 +420,21 @@ impl KafkaPayloadWriter {

let mut ret = Ok(());

let max_delivery_buffer_size = (self
.config
.rdkafka_properties
.queue_buffering_max_messages
.as_ref()
.cloned()
.unwrap_or(KAFKA_WRITER_MAX_QUEUE_SIZE) as f32
* KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO) as usize;

for _ in 0..self.config.max_retry_num {
match self.inner.send_result(record) {
Ok(delivery_future) => {
// First check if the current length is
// greater than the preset limit
while self.future_delivery_buffer.len() >= KAFKA_WRITER_MAX_QUEUE_SIZE {
while self.future_delivery_buffer.len() >= max_delivery_buffer_size {
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
Expand All @@ -439,15 +452,19 @@ impl KafkaPayloadWriter {
Err((e, rec)) => {
record = rec;
match e {
err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull)
| err @ KafkaError::MessageProduction(RDKafkaErrorCode::MessageTimedOut) => {
err @ KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::warn!(
"producing message (key {:?}) to topic {} failed, err {:?}, retrying",
record.key.map(|k| k.to_bytes()),
record.topic,
err
);
tokio::time::sleep(self.config.retry_interval).await;
Self::map_future_result(
self.future_delivery_buffer
.pop_front()
.expect("Expect the future not to be None")
.await,
)?;
continue;
}
_ => return Err(e),
Expand Down

0 comments on commit a565531

Please sign in to comment.