diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index fd4d0e37bbbc4..772bc8a4b6da7 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -19,7 +19,7 @@ use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER; -use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; @@ -346,38 +346,26 @@ impl WorkerNodeSelector { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { - let (hint, parallelism) = match self.manager.serving_fragment_mapping(fragment_id) { - Ok(o) => { - if self.manager.worker_node_mask().is_empty() { - // 1. Stable mapping for most cases. - return Ok(o); - } - // If it's a singleton, set max_parallelism=1 for place_vnode. - let max_parallelism = o.to_single().map(|_| 1); - (Some(o), max_parallelism) - } - Err(e) => { - if !matches!(e, BatchError::ServingVnodeMappingNotFound(_)) { - return Err(e); - } - // We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton. - let max_parallelism = 1; - tracing::warn!( - fragment_id, - max_parallelism, - "Serving fragment mapping not found, fall back to temporary one." - ); - // Workaround the case that new mapping is not available yet due to asynchronous - // notification. - (None, Some(max_parallelism)) - } - }; - // 2. Temporary mapping that filters out unavailable workers. - let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()); - // TODO(var-vnode): use vnode count from config - let masked_mapping = - place_vnode(hint.as_ref(), &new_workers, parallelism, VirtualNode::COUNT); - masked_mapping.ok_or_else(|| BatchError::EmptyWorkerNodes) + let mapping = (self.manager.serving_fragment_mapping(fragment_id)).or_else(|_| { + tracing::warn!( + fragment_id, + "Serving fragment mapping not found, fall back to streaming one." + ); + self.manager.get_streaming_fragment_mapping(&fragment_id) + })?; + + // Filter out unavailable workers. + if self.manager.worker_node_mask().is_empty() { + Ok(mapping) + } else { + let workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()); + // If it's a singleton, set max_parallelism=1 for place_vnode. + let max_parallelism = mapping.to_single().map(|_| 1); + let masked_mapping = + place_vnode(Some(&mapping), &workers, max_parallelism, mapping.len()) + .ok_or_else(|| BatchError::EmptyWorkerNodes)?; + Ok(masked_mapping) + } } }