From d2a016e1e9e37f23a77c7d1daffd557bbfbaa351 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 31 Oct 2023 22:20:03 +0800 Subject: [PATCH] fix flow_control: it should defensively handle empty chunks --- src/stream/src/executor/flow_control.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index 4da40a95623bf..fdf9a796fc223 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -54,9 +54,13 @@ impl FlowControlExecutor { let msg = msg?; match msg { Message::Chunk(chunk) => { + let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { + // Handle case where chunk is empty + continue; + }; if let Some(rate_limiter) = &rate_limiter { let result = rate_limiter - .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) + .until_n_ready(n) .await; if let Err(InsufficientCapacity(n)) = result { tracing::error!(