Skip to content

Commit

Permalink
Add assertions, actor_ids in rebalance_actor_vnode
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jan 31, 2024
1 parent 5d9b9d8 commit 706fb3d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ pub fn rebalance_actor_vnode(
actors_to_remove: &BTreeSet<ActorId>,
actors_to_create: &BTreeSet<ActorId>,
) -> HashMap<ActorId, Bitmap> {
let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect();

assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0);
assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0);

assert!(actors.len() >= actors_to_remove.len());

let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len();
Expand Down
16 changes: 14 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,9 @@ mod tests {
let actor_locations = fragments
.values()
.flat_map(|f| &f.actors)
.map(|a| (a.actor_id, parallel_units[&0].clone()))
.sorted_by(|a, b| a.actor_id.cmp(&b.actor_id))
.enumerate()
.map(|(idx, a)| (a.actor_id, parallel_units[&(idx as u32)].clone()))
.collect();

Locations {
Expand Down Expand Up @@ -1165,6 +1167,14 @@ mod tests {
let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);

let StreamingClusterInfo { parallel_units, .. } = services
.global_stream_manager
.metadata_manager
.get_streaming_cluster_info()
.await?;

let parallel_unit_ids = parallel_units.keys().cloned().sorted().collect_vec();

let mut fragments = BTreeMap::default();
fragments.insert(
0,
Expand All @@ -1174,7 +1184,9 @@ mod tests {
distribution_type: FragmentDistributionType::Hash as i32,
actors: actors.clone(),
state_table_ids: vec![0],
vnode_mapping: Some(ParallelUnitMapping::new_single(0).to_protobuf()),
vnode_mapping: Some(
ParallelUnitMapping::new_uniform(parallel_unit_ids.into_iter()).to_protobuf(),
),
..Default::default()
},
);
Expand Down

0 comments on commit 706fb3d

Please sign in to comment.