diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index 4da40a95623bf..fdf9a796fc223 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -54,9 +54,13 @@ impl FlowControlExecutor { let msg = msg?; match msg { Message::Chunk(chunk) => { + let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { + // Handle case where chunk is empty + continue; + }; if let Some(rate_limiter) = &rate_limiter { let result = rate_limiter - .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) + .until_n_ready(n) .await; if let Err(InsufficientCapacity(n)) = result { tracing::error!(