From 99fcd9a0cc30ac6650412546a9bec0671852b147 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 8 Jul 2024 15:56:10 +0800 Subject: [PATCH] chore: Refactor worker_node_manager for simpler worker-node mapping. (#17603) --- src/batch/src/worker_manager/worker_node_manager.rs | 10 ++-------- src/common/src/hash/consistent_hash/mapping.rs | 8 +++++++- src/meta/service/src/notification_service.rs | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 0fc3d539101bb..b1e08517954db 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -156,18 +156,12 @@ impl WorkerNodeManager { let guard = self.inner.read().unwrap(); - let worker_slot_index: HashMap<_, _> = guard - .worker_nodes - .iter() - .flat_map(|worker| { - (0..worker.parallelism()).map(move |i| (WorkerSlotId::new(worker.id, i), worker)) - }) - .collect(); + let worker_index: HashMap<_, _> = guard.worker_nodes.iter().map(|w| (w.id, w)).collect(); let mut workers = Vec::with_capacity(worker_slot_ids.len()); for worker_slot_id in worker_slot_ids { - match worker_slot_index.get(worker_slot_id) { + match worker_index.get(&worker_slot_id.worker_id()) { Some(worker) => workers.push((*worker).clone()), None => bail!( "No worker node found for worker slot id: {}", diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 51e3aa02d262c..2ca4f6fc9686e 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -33,7 +33,7 @@ use crate::util::iter_util::ZipEqDebug; // TODO: find a better place for this. pub type ActorId = u32; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] pub struct WorkerSlotId(u64); impl WorkerSlotId { @@ -68,6 +68,12 @@ impl Display for WorkerSlotId { } } +impl Debug for WorkerSlotId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx())) + } +} + /// Trait for items that can be used as keys in [`VnodeMapping`]. pub trait VnodeMappingItem { /// The type of the item. diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index e4a8d298e0788..5d68211a71d57 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -148,19 +148,19 @@ impl NotificationServiceImpl { match &self.metadata_manager { MetadataManager::V1(mgr) => { let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; - let parallel_unit_mappings = + let worker_slot_mappings = fragment_guard.all_running_fragment_mappings().collect_vec(); let notification_version = self.env.notification_manager().current_version().await; - Ok((parallel_unit_mappings, notification_version)) + Ok((worker_slot_mappings, notification_version)) } MetadataManager::V2(mgr) => { let fragment_guard = mgr.catalog_controller.get_inner_read_guard().await; - let parallel_unit_mappings = fragment_guard + let worker_slot_mappings = fragment_guard .all_running_fragment_mappings() .await? .collect_vec(); let notification_version = self.env.notification_manager().current_version().await; - Ok((parallel_unit_mappings, notification_version)) + Ok((worker_slot_mappings, notification_version)) } } }