diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 68ffbd16f90bd..2d19b33216d48 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -100,7 +100,7 @@ impl From> for HashMap { } } -pub(crate) type TableActorMap = TableMap>; +pub(crate) type TableActorMap = TableMap>; pub(crate) type TableUpstreamMvCountMap = TableMap>; pub(crate) type TableDefinitionMap = TableMap; pub(crate) type TableNotifierMap = TableMap; diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 5aad2b4106a9f..2fc012c031a9b 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -270,7 +270,7 @@ impl CreateMviewProgressTracker { ) -> Self { let mut actor_map = HashMap::new(); let mut progress_map = HashMap::new(); - let table_map: HashMap<_, Vec> = table_map.into(); + let table_map: HashMap<_, HashSet> = table_map.into(); for (creating_table_id, actors) in table_map { // 1. Recover `BackfillState` in the tracker. let mut states = HashMap::new(); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index ad8bf85a67439..ab6d93d0d1dca 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -172,7 +172,7 @@ impl FragmentManager { pub async fn get_table_id_stream_scan_actor_mapping( &self, table_ids: &[TableId], - ) -> HashMap> { + ) -> HashMap> { let map = &self.core.read().await.table_fragments; let mut table_map = HashMap::new(); // TODO(kwannoel): Can this be unified with `PlanVisitor`? @@ -186,13 +186,13 @@ impl FragmentManager { } for table_id in table_ids { if let Some(table_fragment) = map.get(table_id) { - let mut actors = vec![]; + let mut actors = HashSet::new(); for fragment in table_fragment.fragments.values() { for actor in &fragment.actors { if let Some(node) = &actor.nodes && has_stream_scan(node) { - actors.push(actor.actor_id) + actors.insert(actor.actor_id); } else { tracing::trace!("ignoring actor: {:?}", actor); }