Skip to content

Commit

Permalink
chore: Refactor worker_node_manager for simpler worker-node mapping. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Jul 8, 2024
1 parent febef35 commit 99fcd9a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
10 changes: 2 additions & 8 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,12 @@ impl WorkerNodeManager {

let guard = self.inner.read().unwrap();

let worker_slot_index: HashMap<_, _> = guard
.worker_nodes
.iter()
.flat_map(|worker| {
(0..worker.parallelism()).map(move |i| (WorkerSlotId::new(worker.id, i), worker))
})
.collect();
let worker_index: HashMap<_, _> = guard.worker_nodes.iter().map(|w| (w.id, w)).collect();

let mut workers = Vec::with_capacity(worker_slot_ids.len());

for worker_slot_id in worker_slot_ids {
match worker_slot_index.get(worker_slot_id) {
match worker_index.get(&worker_slot_id.worker_id()) {
Some(worker) => workers.push((*worker).clone()),
None => bail!(
"No worker node found for worker slot id: {}",
Expand Down
8 changes: 7 additions & 1 deletion src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::util::iter_util::ZipEqDebug;
// TODO: find a better place for this.
pub type ActorId = u32;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);

impl WorkerSlotId {
Expand Down Expand Up @@ -68,6 +68,12 @@ impl Display for WorkerSlotId {
}
}

impl Debug for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}

/// Trait for items that can be used as keys in [`VnodeMapping`].
pub trait VnodeMappingItem {
/// The type of the item.
Expand Down
8 changes: 4 additions & 4 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,19 @@ impl NotificationServiceImpl {
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await;
let parallel_unit_mappings =
let worker_slot_mappings =
fragment_guard.all_running_fragment_mappings().collect_vec();
let notification_version = self.env.notification_manager().current_version().await;
Ok((parallel_unit_mappings, notification_version))
Ok((worker_slot_mappings, notification_version))
}
MetadataManager::V2(mgr) => {
let fragment_guard = mgr.catalog_controller.get_inner_read_guard().await;
let parallel_unit_mappings = fragment_guard
let worker_slot_mappings = fragment_guard
.all_running_fragment_mappings()
.await?
.collect_vec();
let notification_version = self.env.notification_manager().current_version().await;
Ok((parallel_unit_mappings, notification_version))
Ok((worker_slot_mappings, notification_version))
}
}
}
Expand Down

0 comments on commit 99fcd9a

Please sign in to comment.