diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 69a89e6fc4860..e51d1f12357df 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -361,29 +361,7 @@ impl ActorMapping { } } -impl WorkerSlotMapping { - /// Create a uniform worker mapping from the given worker ids - pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { - Self::new_uniform(worker_slot_ids.iter().cloned()) - } - - /// Create a worker mapping from the protobuf representation. - pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self { - assert_eq!(proto.original_indices.len(), proto.data.len()); - Self { - original_indices: proto.original_indices.clone(), - data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(), - } - } - - /// Convert this worker mapping to the protobuf representation. - pub fn to_protobuf(&self) -> PbWorkerSlotMapping { - PbWorkerSlotMapping { - original_indices: self.original_indices.clone(), - data: self.data.iter().map(|id| id.0).collect(), - } - } -} +impl WorkerSlotMapping {} impl ParallelUnitMapping { /// Create a uniform parallel unit mapping from the given parallel units, essentially @@ -462,6 +440,23 @@ impl WorkerSlotMapping { // // self.transform(&map) } + + /// Create a worker mapping from the protobuf representation. + pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self { + assert_eq!(proto.original_indices.len(), proto.data.len()); + Self { + original_indices: proto.original_indices.clone(), + data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(), + } + } + + /// Convert this worker mapping to the protobuf representation. + pub fn to_protobuf(&self) -> PbWorkerSlotMapping { + PbWorkerSlotMapping { + original_indices: self.original_indices.clone(), + data: self.data.iter().map(|id| id.0).collect(), + } + } } #[cfg(test)] diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 39395e02f1a23..33e7220b5897e 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -213,13 +213,15 @@ impl TableFragments { ) -> Self { let actor_status = actor_locations .iter() - .map(|(&actor_id, WorkerSlotId(worker_id, slot_id))| { + .map(|(&actor_id, worker_slot_id)| { + let worker_id = worker_slot_id.worker_id(); + let slot_id = worker_slot_id.slot_idx(); ( actor_id, ActorStatus { parallel_unit: Some(ParallelUnit { - id: *worker_id << 10 | *slot_id, - worker_node_id: *worker_id, + id: worker_id << 10 | slot_id, + worker_node_id: worker_id, }), state: ActorState::Inactive as i32, }, diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 9bec0d3978085..3b8e840d46aa7 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -237,7 +237,7 @@ impl Scheduler { let scheduled_worker_slots = scheduled .into_iter() .flat_map(|(worker_id, size)| { - (0..size).map(move |slot| WorkerSlotId(worker_id, slot as u32)) + (0..size).map(move |slot| WorkerSlotId::new(worker_id, slot)) }) .collect_vec(); @@ -249,7 +249,7 @@ impl Scheduler { let single_scheduled = schedule_units_for_slots_v2(&slots, 1, streaming_job_id)?; let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap(); - let default_singleton_worker_slot = WorkerSlotId(default_single_worker_id, 0); + let default_singleton_worker_slot = WorkerSlotId::new(default_single_worker_id, 0); Ok(Self { default_hash_mapping, @@ -357,7 +357,7 @@ impl Locations { pub fn worker_actors(&self) -> HashMap> { self.actor_locations .iter() - .map(|(actor_id, WorkerSlotId(worker_id, _))| (*worker_id, *actor_id)) + .map(|(actor_id, worker_slot_id)| (worker_slot_id.worker_id(), *actor_id)) .into_group_map() } @@ -365,9 +365,11 @@ impl Locations { pub fn actor_infos(&self) -> impl Iterator + '_ { self.actor_locations .iter() - .map(|(actor_id, WorkerSlotId(worker_id, _))| ActorInfo { + .map(|(actor_id, worker_slot_id)| ActorInfo { actor_id: *actor_id, - host: self.worker_locations[worker_id].host.clone(), + host: self.worker_locations[&worker_slot_id.worker_id()] + .host + .clone(), }) } }