Skip to content

Commit

Permalink
fix: revert to random source split assignment when there are too many…
Browse files Browse the repository at this point in the history
… actors (#14346)
  • Loading branch information
xxchan authored Jan 3, 2024
1 parent 58e1326 commit c8cdb9f
Showing 1 changed file with 65 additions and 128 deletions.
193 changes: 65 additions & 128 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl<T: SplitMetaData + Clone> Eq for ActorSplitsAssignment<T> {}

impl<T: SplitMetaData + Clone> PartialEq<Self> for ActorSplitsAssignment<T> {
fn eq(&self, other: &Self) -> bool {
self.splits.len() == other.splits.len() && self.actor_id == other.actor_id
self.splits.len() == other.splits.len()
}
}

Expand All @@ -409,12 +409,7 @@ impl<T: SplitMetaData + Clone> PartialOrd<Self> for ActorSplitsAssignment<T> {
impl<T: SplitMetaData + Clone> Ord for ActorSplitsAssignment<T> {
fn cmp(&self, other: &Self) -> Ordering {
// Note: this is reversed order, to make BinaryHeap a min heap.
other
.splits
.len()
.cmp(&self.splits.len())
// To make the BinaryHeap have a deterministic order
.then(other.actor_id.cmp(&self.actor_id))
other.splits.len().cmp(&self.splits.len())
}
}

Expand Down Expand Up @@ -510,6 +505,12 @@ where
for split_id in new_discovered_splits {
// ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e.,
// we get the assignment with the least splits here.

// Note: If multiple actors have the same number of splits, it will be randomly picked.
// When the number of source actors is larger than the number of splits,
// It's possible that the assignment is uneven.
// e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158
// TODO: We should make the assignment rack-aware to make sure it's even.
let mut peek_ref = heap.peek_mut().unwrap();
peek_ref
.splits
Expand Down Expand Up @@ -1076,46 +1077,29 @@ mod tests {

#[test]
fn test_reassign_splits() {
fn check(
actor_splits: HashMap<ActorId, Vec<TestSplit>>,
discovered_splits: BTreeMap<SplitId, TestSplit>,
expected: expect_test::Expect,
) {
let diff = reassign_splits(
FragmentId::default(),
actor_splits,
&discovered_splits,
Default::default(),
)
.map(BTreeMap::from_iter); // ensure deterministic debug string
expected.assert_debug_eq(&diff);
}

let actor_splits = HashMap::new();
let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
check(
assert!(reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
None
"#]],
);
&discovered_splits,
Default::default()
)
.is_none());

let actor_splits = (0..3).map(|i| (i, vec![])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = BTreeMap::new();
check(
let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [],
1: [],
2: [],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
assert!(splits.is_empty())
}

let actor_splits = (0..3).map(|i| (i, vec![])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..3)
Expand All @@ -1124,31 +1108,20 @@ mod tests {
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
],
1: [
TestSplit {
id: 1,
},
],
2: [
TestSplit {
id: 2,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
assert_eq!(splits.len(), 1);
}

check_all_splits(&discovered_splits, &diff);

let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
Expand All @@ -1157,82 +1130,46 @@ mod tests {
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
TestSplit {
id: 3,
},
],
1: [
TestSplit {
id: 1,
},
TestSplit {
id: 4,
},
],
2: [
TestSplit {
id: 2,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 3);
for splits in diff.values() {
let len = splits.len();
assert!(len == 1 || len == 2);
}

check_all_splits(&discovered_splits, &diff);

let mut actor_splits: HashMap<ActorId, Vec<TestSplit>> =
(0..3).map(|i| (i, vec![TestSplit { id: i }])).collect();
actor_splits.insert(3, vec![]);
actor_splits.insert(4, vec![]);

let discovered_splits: BTreeMap<SplitId, TestSplit> = (0..5)
.map(|i| {
let split = TestSplit { id: i };
(split.id(), split)
})
.collect();
check(

let diff = reassign_splits(
FragmentId::default(),
actor_splits,
discovered_splits,
expect_test::expect![[r#"
Some(
{
0: [
TestSplit {
id: 0,
},
],
1: [
TestSplit {
id: 1,
},
],
2: [
TestSplit {
id: 2,
},
],
3: [
TestSplit {
id: 3,
},
],
4: [
TestSplit {
id: 4,
},
],
},
)
"#]],
);
&discovered_splits,
Default::default(),
)
.unwrap();
assert_eq!(diff.len(), 5);
for splits in diff.values() {
assert_eq!(splits.len(), 1);
}

check_all_splits(&discovered_splits, &diff);
}
}

0 comments on commit c8cdb9f

Please sign in to comment.