Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 12, 2024
1 parent 62106c4 commit 22a5e3a
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,14 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingLogStore> {
async fn next_barrier(&mut self) -> StreamExecutorResult<Option<DispatcherBarrier>> {
Ok(
if let Some(barrier) = self.state.upstream_pending_barriers.pop_back() {
// only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock caused by
// downstream waiting on a checkpoint barrier to bump up.
// Only update the `max_pending_barrier_num` on checkpoint barrier to avoid deadlock.
//
// After updating and decreasing `max_pending_barrier_num`, we won't poll upstream until
// the downstream fetches the next barrier, and this causes back pressure to upstream.
// However, downstream will be blocked at `try_wait_epoch` on a non-checkpoint epoch on upstream,
// and `try_wait_epoch` can only be unblocked when the next checkpoint epoch is processed. But since
// the upstream is blocked by back pressure, it cannot process the next checkpoint epoch, which
// causes deadlock.
if barrier.kind.is_checkpoint() {
self.state.max_pending_barrier_num = min(
self.state.upstream_pending_barriers.len(),
Expand Down

0 comments on commit 22a5e3a

Please sign in to comment.