diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d73d38841322b..3a21c812086f7 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -187,6 +187,11 @@ pub fn rebalance_actor_vnode( actors_to_remove: &BTreeSet, actors_to_create: &BTreeSet, ) -> HashMap { + let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect(); + + assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0); + assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0); + assert!(actors.len() >= actors_to_remove.len()); let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d7a35fc1d90f4..4d14f0caa2828 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1093,7 +1093,9 @@ mod tests { let actor_locations = fragments .values() .flat_map(|f| &f.actors) - .map(|a| (a.actor_id, parallel_units[&0].clone())) + .sorted_by(|a, b| a.actor_id.cmp(&b.actor_id)) + .enumerate() + .map(|(idx, a)| (a.actor_id, parallel_units[&(idx as u32)].clone())) .collect(); Locations { @@ -1182,6 +1184,14 @@ mod tests { let table_id = TableId::new(0); let actors = make_mview_stream_actors(&table_id, 4); + let StreamingClusterInfo { parallel_units, .. } = services + .global_stream_manager + .metadata_manager + .get_streaming_cluster_info() + .await?; + + let parallel_unit_ids = parallel_units.keys().cloned().sorted().collect_vec(); + let mut fragments = BTreeMap::default(); fragments.insert( 0, @@ -1191,7 +1201,9 @@ mod tests { distribution_type: FragmentDistributionType::Hash as i32, actors: actors.clone(), state_table_ids: vec![0], - vnode_mapping: Some(ParallelUnitMapping::new_single(0).to_protobuf()), + vnode_mapping: Some( + ParallelUnitMapping::new_uniform(parallel_unit_ids.into_iter()).to_protobuf(), + ), ..Default::default() }, );