From 22a5e3a30aa0777ef19802d751acdf2892bf3dce Mon Sep 17 00:00:00 2001 From: William Wen Date: Sat, 12 Oct 2024 14:13:23 +0800 Subject: [PATCH] add comment --- src/stream/src/executor/backfill/snapshot_backfill.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 6a740eafa050..ffc010d139ad 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -463,8 +463,14 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingLogStore> { async fn next_barrier(&mut self) -> StreamExecutorResult> { Ok( if let Some(barrier) = self.state.upstream_pending_barriers.pop_back() { - // only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock caused by - // downstream waiting on a checkpoint barrier to bump up. + // Only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock. + // + // After updating and decreasing `max_pending_barrier_num`, we won't poll upstream until + // the downstream fetches the next barrier, and this causes back pressure to upstream. + // However, downstream will be blocked at `try_wait_epoch` on a non-checkpoint epoch on upstream, + // and `try_wait_epoch` can only be unblocked when the next checkpoint epoch is processed. But since + // the upstream is blocked by back pressure, it cannot process the next checkpoint epoch, which + // causes deadlock. if barrier.kind.is_checkpoint() { self.state.max_pending_barrier_num = min( self.state.upstream_pending_barriers.len(),