diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 6a740eafa0503..ffc010d139adc 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -463,8 +463,14 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingLogStore> { async fn next_barrier(&mut self) -> StreamExecutorResult> { Ok( if let Some(barrier) = self.state.upstream_pending_barriers.pop_back() { - // only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock caused by - // downstream waiting on a checkpoint barrier to bump up. + // Only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock. + // + // After updating and decreasing `max_pending_barrier_num`, we won't poll upstream until + // the downstream fetches the next barrier, and this causes back pressure to upstream. + // However, downstream will be blocked at `try_wait_epoch` on a non-checkpoint epoch on upstream, + // and `try_wait_epoch` can only be unblocked when the next checkpoint epoch is processed. But since + // the upstream is blocked by back pressure, it cannot process the next checkpoint epoch, which + // causes deadlock. if barrier.kind.is_checkpoint() { self.state.max_pending_barrier_num = min( self.state.upstream_pending_barriers.len(),