diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 854de53f37708..5f5227e8090d1 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1177,9 +1177,7 @@ impl GlobalBarrierManager { // Update the progress of all commands. for progress in resps.iter().flat_map(|r| &r.create_mview_progress) { // Those with actors complete can be finished immediately. - if let Some(command) = tracker.update(progress, &version_stats) - && !command.tracks_sink() - { + if let Some(command) = tracker.update(progress, &version_stats) { tracing::trace!(?progress, "finish progress"); commands.push(command); } else { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index fede60d5eb939..ba1e11c9c6fa3 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -223,13 +223,6 @@ impl TrackingJob { TrackingJob::Recovered(recovered) => Some(recovered.fragments.table_id()), } } - - pub(crate) fn tracks_sink(&self) -> bool { - match self { - TrackingJob::New(command) => command.tracks_sink(), - TrackingJob::Recovered(_) => false, - } - } } pub struct RecoveredTrackingJob { @@ -247,15 +240,6 @@ pub(super) struct TrackingCommand { pub notifiers: Vec, } -impl TrackingCommand { - pub fn tracks_sink(&self) -> bool { - match &self.context.command { - Command::CreateStreamingJob { ddl_type, .. } => *ddl_type == DdlType::Sink, - _ => false, - } - } -} - /// Track the progress of all creating mviews. When creation is done, `notify_finished` will be /// called on registered notifiers. /// @@ -443,20 +427,6 @@ impl CreateMviewProgressTracker { definition, ); if *ddl_type == DdlType::Sink { - // First we duplicate a separate tracking job for sink. - // This does not need notifiers, it is solely used for - // tracking the backfill progress of sink. - // It will still be removed from progress map when - // backfill completes. - let tracking_job = TrackingJob::New(TrackingCommand { - context: command.context.clone(), - notifiers: vec![], - }); - let old = self - .progress_map - .insert(creating_mv_id, (progress, tracking_job)); - assert!(old.is_none()); - // We return the original tracking job immediately. // This is because sink can be decoupled with backfill progress. // We don't need to wait for sink to finish backfill.