Skip to content

Commit

Permalink
fix flow_control: it should defensively handle empty chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 31, 2023
1 parent 5173a45 commit d2a016e
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ impl FlowControlExecutor {
let msg = msg?;
match msg {
Message::Chunk(chunk) => {
let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else {
// Handle case where chunk is empty
continue;
};
if let Some(rate_limiter) = &rate_limiter {
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.until_n_ready(n)
.await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
Expand Down

0 comments on commit d2a016e

Please sign in to comment.