Skip to content

Commit

Permalink
fix(batch): avoid setting max_parallelism for batch scan that is no s…
Browse files Browse the repository at this point in the history
…ingleton (#15683)
  • Loading branch information
zwang28 authored Mar 14, 2024
1 parent ef2b4fe commit 9716484
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
24 changes: 12 additions & 12 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode};
pub fn place_vnode(
hint_pu_mapping: Option<&ParallelUnitMapping>,
new_workers: &[WorkerNode],
max_parallelism: usize,
max_parallelism: Option<usize>,
) -> Option<ParallelUnitMapping> {
// Get all serving parallel units from all available workers, grouped by worker id and ordered
// by parallel unit id in each group.
Expand All @@ -42,7 +42,7 @@ pub fn place_vnode(
// `max_parallelism` and total number of virtual nodes.
let serving_parallelism = std::cmp::min(
new_pus.iter().map(|pus| pus.len()).sum(),
std::cmp::min(max_parallelism, VirtualNode::COUNT),
std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT),
);

// Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload
Expand Down Expand Up @@ -236,11 +236,11 @@ mod tests {
..Default::default()
};
assert!(
place_vnode(None, &[worker_1.clone()], 0).is_none(),
place_vnode(None, &[worker_1.clone()], Some(0)).is_none(),
"max_parallelism should >= 0"
);

let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], 10000).unwrap();
let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap();
assert_eq!(re_pu_mapping_2.iter_unique().count(), 1);
let worker_2 = WorkerNode {
id: 2,
Expand All @@ -251,7 +251,7 @@ mod tests {
let re_pu_mapping = place_vnode(
Some(&re_pu_mapping_2),
&[worker_1.clone(), worker_2.clone()],
10000,
None,
)
.unwrap();

Expand All @@ -269,7 +269,7 @@ mod tests {
let re_pu_mapping_2 = place_vnode(
Some(&re_pu_mapping),
&[worker_1.clone(), worker_2.clone(), worker_3.clone()],
10000,
None,
)
.unwrap();

Expand All @@ -281,7 +281,7 @@ mod tests {
let re_pu_mapping = place_vnode(
Some(&re_pu_mapping_2),
&[worker_1.clone(), worker_2.clone(), worker_3.clone()],
50,
Some(50),
)
.unwrap();
// limited by max_parallelism
Expand All @@ -292,23 +292,23 @@ mod tests {
let re_pu_mapping_2 = place_vnode(
Some(&re_pu_mapping),
&[worker_1.clone(), worker_2, worker_3.clone()],
10000,
None,
)
.unwrap();
assert_eq!(re_pu_mapping_2.iter_unique().count(), 111);
// 50 * 5 + 6 -> 111 * 2 + 34
let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping);
assert!(score >= 50 * 2);
let re_pu_mapping =
place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], 10000).unwrap();
place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], None).unwrap();
// limited by total pu number
assert_eq!(re_pu_mapping.iter_unique().count(), 61);
// 111 * 2 + 34 -> 61 * 4 + 12
let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2);
assert!(score >= 61 * 2);
assert!(place_vnode(Some(&re_pu_mapping), &[], 10000).is_none());
let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], 10000).unwrap();
assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none());
let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], None).unwrap();
assert_eq!(re_pu_mapping.iter_unique().count(), 60);
assert!(place_vnode(Some(&re_pu_mapping), &[], 10000).is_none());
assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none());
}
}
6 changes: 4 additions & 2 deletions src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,15 @@ impl WorkerNodeSelector {
// 1. Stable mapping for most cases.
return Ok(o);
}
let max_parallelism = o.iter_unique().count();
// If it's a singleton, set max_parallelism=1 for place_vnode.
let max_parallelism = o.to_single().map(|_| 1);
(Some(o), max_parallelism)
}
Err(e) => {
if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) {
return Err(e);
}
// We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton.
let max_parallelism = 1;
tracing::warn!(
fragment_id,
Expand All @@ -367,7 +369,7 @@ impl WorkerNodeSelector {
);
// Workaround the case that new mapping is not available yet due to asynchronous
// notification.
(None, max_parallelism)
(None, Some(max_parallelism))
}
};
// 2. Temporary mapping that filters out unavailable workers.
Expand Down
8 changes: 6 additions & 2 deletions src/meta/src/serving/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ impl ServingVnodeMapping {
for (fragment_id, streaming_parallelism) in streaming_parallelisms {
let new_mapping = {
let old_mapping = serving_vnode_mappings.get(&fragment_id);
// Set max serving parallelism to `streaming_parallelism`. It's not a must.
place_vnode(old_mapping, workers, streaming_parallelism)
let max_parallelism = if streaming_parallelism == 1 {
Some(1)
} else {
None
};
place_vnode(old_mapping, workers, max_parallelism)
};
match new_mapping {
None => {
Expand Down

0 comments on commit 9716484

Please sign in to comment.