diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 6af1fe305dec3..1ff673d7da081 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -87,7 +87,6 @@ impl GlobalBarrierManagerContext { Ok(()) } - // FIXME: didn't consider Values here async fn recover_background_mv_progress(&self) -> MetaResult { let mgr = &self.metadata_manager; let mviews = mgr @@ -103,7 +102,14 @@ impl GlobalBarrierManagerContext { .get_job_fragments_by_id(mview.table_id) .await?; let table_fragments = TableFragments::from_protobuf(table_fragments); - mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); + if table_fragments.tracking_progress_actor_ids().is_empty() { + // If there's no tracking actor in the mview, we can finish the job directly. + mgr.catalog_controller + .finish_streaming_job(mview.table_id, None) + .await?; + } else { + mview_map.insert(table_id, (mview.definition.clone(), table_fragments)); + } } let version_stats = self.hummock_manager.get_version_stats().await; diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 48dfbfaf55c0a..e684c4071dad8 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -602,7 +602,7 @@ impl MetadataManager { } impl MetadataManager { - /// Wait for job finishing notification in `TrackingJob::pre_finish`. + /// Wait for job finishing notification in `TrackingJob::finish`. /// The progress is updated per barrier. pub(crate) async fn wait_streaming_job_finished( &self, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 5137c74b587cd..cca00b565b56f 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -412,14 +412,6 @@ impl TableFragments { .cloned() } - /// Returns actors that contains backfill executors. - pub fn backfill_actor_ids(&self) -> HashSet { - Self::filter_actor_ids(self, |fragment_type_mask| { - (fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 - }) - .collect() - } - pub fn snapshot_backfill_actor_ids(&self) -> HashSet { Self::filter_actor_ids(self, |mask| { (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0