diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 13c0dcca93583..ed8842e70825f 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -99,22 +99,29 @@ impl WaitCheckpointTask { } } WaitCheckpointTask::AckPubsubMessage(subscription, ack_id_arrs) => { + async fn ack(subscription: &Subscription, ack_ids: Vec) { + tracing::trace!("acking pubsub messages {:?}", ack_ids); + match subscription.ack(ack_ids).await { + Ok(()) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "failed to ack pubsub messages", + ) + } + } + } + const MAX_ACK_BATCH_SIZE: usize = 1000; let mut ack_ids: Vec = vec![]; for arr in ack_id_arrs { for ack_id in arr.as_utf8().iter().flatten() { - ack_ids.push(ack_id.to_string()) - } - } - tracing::trace!("acking pubsub messages {:?}", ack_ids); - match subscription.ack(ack_ids).await { - Ok(()) => {} - Err(e) => { - tracing::error!( - error = %e.as_report(), - "failed to ack pubsub messages", - ) + ack_ids.push(ack_id.to_string()); + if ack_ids.len() >= MAX_ACK_BATCH_SIZE { + ack(&subscription, std::mem::take(&mut ack_ids)).await; + } } } + ack(&subscription, ack_ids).await; } } }