From 97164845e557fd7e929796023692d2c3fdd88a74 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:52:56 +0800 Subject: [PATCH] fix(batch): avoid setting max_parallelism for batch scan that is no singleton (#15683) --- .../src/vnode_mapping/vnode_placement.rs | 24 +++++++++---------- .../src/scheduler/worker_node_manager.rs | 6 +++-- src/meta/src/serving/mod.rs | 8 +++++-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 2c16e08a04081..49f45d66512eb 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -27,7 +27,7 @@ use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; pub fn place_vnode( hint_pu_mapping: Option<&ParallelUnitMapping>, new_workers: &[WorkerNode], - max_parallelism: usize, + max_parallelism: Option, ) -> Option { // Get all serving parallel units from all available workers, grouped by worker id and ordered // by parallel unit id in each group. @@ -42,7 +42,7 @@ pub fn place_vnode( // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( new_pus.iter().map(|pus| pus.len()).sum(), - std::cmp::min(max_parallelism, VirtualNode::COUNT), + std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), ); // Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload @@ -236,11 +236,11 @@ mod tests { ..Default::default() }; assert!( - place_vnode(None, &[worker_1.clone()], 0).is_none(), + place_vnode(None, &[worker_1.clone()], Some(0)).is_none(), "max_parallelism should >= 0" ); - let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], 10000).unwrap(); + let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); assert_eq!(re_pu_mapping_2.iter_unique().count(), 1); let worker_2 = WorkerNode { id: 2, @@ -251,7 +251,7 @@ mod tests { let re_pu_mapping = place_vnode( Some(&re_pu_mapping_2), &[worker_1.clone(), worker_2.clone()], - 10000, + None, ) .unwrap(); @@ -269,7 +269,7 @@ mod tests { let re_pu_mapping_2 = place_vnode( Some(&re_pu_mapping), &[worker_1.clone(), worker_2.clone(), worker_3.clone()], - 10000, + None, ) .unwrap(); @@ -281,7 +281,7 @@ mod tests { let re_pu_mapping = place_vnode( Some(&re_pu_mapping_2), &[worker_1.clone(), worker_2.clone(), worker_3.clone()], - 50, + Some(50), ) .unwrap(); // limited by max_parallelism @@ -292,7 +292,7 @@ mod tests { let re_pu_mapping_2 = place_vnode( Some(&re_pu_mapping), &[worker_1.clone(), worker_2, worker_3.clone()], - 10000, + None, ) .unwrap(); assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); @@ -300,15 +300,15 @@ mod tests { let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); assert!(score >= 50 * 2); let re_pu_mapping = - place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], 10000).unwrap(); + place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], None).unwrap(); // limited by total pu number assert_eq!(re_pu_mapping.iter_unique().count(), 61); // 111 * 2 + 34 -> 61 * 4 + 12 let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2); assert!(score >= 61 * 2); - assert!(place_vnode(Some(&re_pu_mapping), &[], 10000).is_none()); - let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], 10000).unwrap(); + assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); + let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], None).unwrap(); assert_eq!(re_pu_mapping.iter_unique().count(), 60); - assert!(place_vnode(Some(&re_pu_mapping), &[], 10000).is_none()); + assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); } } diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index e4ef5e05c157c..bba3cd97911cc 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -352,13 +352,15 @@ impl WorkerNodeSelector { // 1. Stable mapping for most cases. return Ok(o); } - let max_parallelism = o.iter_unique().count(); + // If it's a singleton, set max_parallelism=1 for place_vnode. + let max_parallelism = o.to_single().map(|_| 1); (Some(o), max_parallelism) } Err(e) => { if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) { return Err(e); } + // We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton. let max_parallelism = 1; tracing::warn!( fragment_id, @@ -367,7 +369,7 @@ impl WorkerNodeSelector { ); // Workaround the case that new mapping is not available yet due to asynchronous // notification. - (None, max_parallelism) + (None, Some(max_parallelism)) } }; // 2. Temporary mapping that filters out unavailable workers. diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 7448f6f9496a9..36e7b77ccf63a 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -52,8 +52,12 @@ impl ServingVnodeMapping { for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { let old_mapping = serving_vnode_mappings.get(&fragment_id); - // Set max serving parallelism to `streaming_parallelism`. It's not a must. - place_vnode(old_mapping, workers, streaming_parallelism) + let max_parallelism = if streaming_parallelism == 1 { + Some(1) + } else { + None + }; + place_vnode(old_mapping, workers, max_parallelism) }; match new_mapping { None => {