Skip to content

Commit

Permalink
fix: background mview without backfilling should not tracked in barri…
Browse files Browse the repository at this point in the history
…er runtime info (#19355)
  • Loading branch information
yezizp2012 committed Nov 13, 2024
1 parent aebf896 commit 643fc75
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
10 changes: 8 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ impl GlobalBarrierManagerContext {
Ok(())
}

// FIXME: didn't consider Values here
async fn recover_background_mv_progress(&self) -> MetaResult<CreateMviewProgressTracker> {
let mgr = &self.metadata_manager;
let mviews = mgr
Expand All @@ -103,7 +102,14 @@ impl GlobalBarrierManagerContext {
.get_job_fragments_by_id(mview.table_id)
.await?;
let table_fragments = TableFragments::from_protobuf(table_fragments);
mview_map.insert(table_id, (mview.definition.clone(), table_fragments));
if table_fragments.tracking_progress_actor_ids().is_empty() {
// If there's no tracking actor in the mview, we can finish the job directly.
mgr.catalog_controller
.finish_streaming_job(mview.table_id, None)
.await?;
} else {
mview_map.insert(table_id, (mview.definition.clone(), table_fragments));
}
}

let version_stats = self.hummock_manager.get_version_stats().await;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ impl MetadataManager {
}

impl MetadataManager {
/// Wait for job finishing notification in `TrackingJob::pre_finish`.
/// Wait for job finishing notification in `TrackingJob::finish`.
/// The progress is updated per barrier.
pub(crate) async fn wait_streaming_job_finished(
&self,
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,6 @@ impl TableFragments {
.cloned()
}

/// Returns actors that contains backfill executors.
pub fn backfill_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0
})
.collect()
}

pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |mask| {
(mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
Expand Down

0 comments on commit 643fc75

Please sign in to comment.