diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index d0971d9b115c0..34f9eb12d692a 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -605,6 +605,11 @@ impl SourceBackfillExecutorInner { break 'backfill_loop; } } else { + self.progress.update_for_source_backfill( + barrier.epoch, + backfill_stage.total_backfilled_rows(), + ); + // yield barrier after reporting progress yield Message::Barrier(barrier); } } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index e5eb7a1f82d85..dba8f5050627a 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -28,8 +28,8 @@ type ConsumedRows = u64; #[derive(Debug, Clone, Copy)] pub(crate) enum BackfillState { - ConsumingUpstreamTable(ConsumedEpoch, ConsumedRows), - DoneConsumingUpstreamTable(ConsumedRows), + ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows), + DoneConsumingUpstreamTableOrSource(ConsumedRows), ConsumingLogStore { pending_barrier_num: usize }, DoneConsumingLogStore, } @@ -37,10 +37,12 @@ pub(crate) enum BackfillState { impl BackfillState { pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress { let (done, consumed_epoch, consumed_rows, pending_barrier_num) = match self { - BackfillState::ConsumingUpstreamTable(consumed_epoch, consumed_rows) => { + BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => { (false, consumed_epoch, consumed_rows, 0) } - BackfillState::DoneConsumingUpstreamTable(consumed_rows) => (true, 0, consumed_rows, 0), /* unused field for done */ + BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => { + (true, 0, consumed_rows, 0) + } // unused field for done BackfillState::ConsumingLogStore { pending_barrier_num, } => (false, 0, 0, pending_barrier_num as _), @@ -59,14 +61,14 @@ impl BackfillState { impl Display for BackfillState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - BackfillState::ConsumingUpstreamTable(epoch, rows) => { + BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => { write!( f, "ConsumingUpstreamTable(epoch: {}, rows: {})", epoch, rows ) } - BackfillState::DoneConsumingUpstreamTable(rows) => { + BackfillState::DoneConsumingUpstreamTableOrSource(rows) => { write!(f, "DoneConsumingUpstreamTable(rows: {})", rows) } BackfillState::ConsumingLogStore { @@ -190,7 +192,7 @@ impl CreateMviewProgressReporter { current_consumed_rows: ConsumedRows, ) { match self.state { - Some(BackfillState::ConsumingUpstreamTable(last, last_consumed_rows)) => { + Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => { assert!( last < consumed_epoch, "last_epoch: {:#?} must be greater than consumed epoch: {:#?}", @@ -209,18 +211,23 @@ impl CreateMviewProgressReporter { }; self.update_inner( epoch, - BackfillState::ConsumingUpstreamTable(consumed_epoch, current_consumed_rows), + BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows), ); } + /// The difference from [`Self::update`] (MV backfill) is that we + /// don't care `ConsumedEpoch` here. pub fn update_for_source_backfill( &mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows, ) { match self.state { - Some(BackfillState::ConsumingUpstreamTable(last_epoch, _last_consumed_rows)) => { - debug_assert_eq!(last_epoch, 0); + Some(BackfillState::ConsumingUpstreamTableOrSource( + dummy_last_epoch, + _last_consumed_rows, + )) => { + debug_assert_eq!(dummy_last_epoch, 0); } Some(state) => { panic!( @@ -232,19 +239,20 @@ impl CreateMviewProgressReporter { }; self.update_inner( epoch, - BackfillState::ConsumingUpstreamTable(0, current_consumed_rows), + // fill a dummy ConsumedEpoch + BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows), ); } /// Finish the progress. If the progress is already finished, then perform no-op. /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint. pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) { - if let Some(BackfillState::DoneConsumingUpstreamTable(_)) = self.state { + if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state { return; } self.update_inner( epoch, - BackfillState::DoneConsumingUpstreamTable(current_consumed_rows), + BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows), ); } @@ -255,7 +263,7 @@ impl CreateMviewProgressReporter { ) { assert_matches!( self.state, - Some(BackfillState::DoneConsumingUpstreamTable(_)) + Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) | Some(BackfillState::ConsumingLogStore { .. }), "cannot update log store progress at state {:?}", self.state @@ -271,7 +279,7 @@ impl CreateMviewProgressReporter { pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) { assert_matches!( self.state, - Some(BackfillState::DoneConsumingUpstreamTable(_)) + Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) | Some(BackfillState::ConsumingLogStore { .. }), "cannot finish log store progress at state {:?}", self.state