diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index e8ab8b9f254d7..e5eb7a1f82d85 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -219,15 +219,20 @@ impl CreateMviewProgressReporter { current_consumed_rows: ConsumedRows, ) { match self.state { - Some(BackfillState::ConsumingUpstream(last_epoch, _last_consumed_rows)) => { + Some(BackfillState::ConsumingUpstreamTable(last_epoch, _last_consumed_rows)) => { debug_assert_eq!(last_epoch, 0); } - Some(BackfillState::Done(_)) => unreachable!(), + Some(state) => { + panic!( + "should not update consuming progress at invalid state: {:?}", + state + ) + } None => {} }; self.update_inner( epoch, - BackfillState::ConsumingUpstream(0, current_consumed_rows), + BackfillState::ConsumingUpstreamTable(0, current_consumed_rows), ); }