diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index b5a1286804408..f7637a9d0c046 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -394,7 +394,7 @@ impl Eq for ActorSplitsAssignment {} impl PartialEq for ActorSplitsAssignment { fn eq(&self, other: &Self) -> bool { - self.splits.len() == other.splits.len() && self.actor_id == other.actor_id + self.splits.len() == other.splits.len() } } @@ -407,12 +407,7 @@ impl PartialOrd for ActorSplitsAssignment { impl Ord for ActorSplitsAssignment { fn cmp(&self, other: &Self) -> Ordering { // Note: this is reversed order, to make BinaryHeap a min heap. - other - .splits - .len() - .cmp(&self.splits.len()) - // To make the BinaryHeap have a deterministic order - .then(other.actor_id.cmp(&self.actor_id)) + other.splits.len().cmp(&self.splits.len()) } } @@ -508,6 +503,12 @@ where for split_id in new_discovered_splits { // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e., // we get the assignment with the least splits here. + + // Note: If multiple actors have the same number of splits, it will be randomly picked. + // When the number of source actors is larger than the number of splits, + // It's possible that the assignment is uneven. + // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158 + // TODO: We should make the assignment rack-aware to make sure it's even. let mut peek_ref = heap.peek_mut().unwrap(); peek_ref .splits @@ -1073,46 +1074,29 @@ mod tests { #[test] fn test_reassign_splits() { - fn check( - actor_splits: HashMap>, - discovered_splits: BTreeMap, - expected: expect_test::Expect, - ) { - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .map(BTreeMap::from_iter); // ensure deterministic debug string - expected.assert_debug_eq(&diff); - } - let actor_splits = HashMap::new(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + assert!(reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - None - "#]], - ); + &discovered_splits, + Default::default() + ) + .is_none()); let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [], - 1: [], - 2: [], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert!(splits.is_empty()) + } let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = (0..3) @@ -1121,31 +1105,20 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); let discovered_splits: BTreeMap = (0..5) @@ -1154,82 +1127,46 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - TestSplit { - id: 3, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - TestSplit { - id: 4, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + let len = splits.len(); + assert!(len == 1 || len == 2); + } + + check_all_splits(&discovered_splits, &diff); let mut actor_splits: HashMap> = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); actor_splits.insert(3, vec![]); actor_splits.insert(4, vec![]); + let discovered_splits: BTreeMap = (0..5) .map(|i| { let split = TestSplit { id: i }; (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - 3: [ - TestSplit { - id: 3, - }, - ], - 4: [ - TestSplit { - id: 4, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 5); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); } }