Skip to content

Commit

Permalink
Refactor WorkerSlotMapping and update stream.rs usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 20, 2024
1 parent 4795bb1 commit f24ea98
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
41 changes: 18 additions & 23 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,29 +361,7 @@ impl ActorMapping {
}
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Self::new_uniform(worker_slot_ids.iter().cloned())
}

/// Create a worker mapping from the protobuf representation.
pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
}
}

/// Convert this worker mapping to the protobuf representation.
pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
PbWorkerSlotMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|id| id.0).collect(),
}
}
}
impl WorkerSlotMapping {}

impl ParallelUnitMapping {
/// Create a uniform parallel unit mapping from the given parallel units, essentially
Expand Down Expand Up @@ -462,6 +440,23 @@ impl WorkerSlotMapping {
//
// self.transform(&map)
}

/// Create a worker mapping from the protobuf representation.
pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
}
}

/// Convert this worker mapping to the protobuf representation.
pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
PbWorkerSlotMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|id| id.0).collect(),
}
}
}

#[cfg(test)]
Expand Down
8 changes: 5 additions & 3 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ impl TableFragments {
) -> Self {
let actor_status = actor_locations
.iter()
.map(|(&actor_id, WorkerSlotId(worker_id, slot_id))| {
.map(|(&actor_id, worker_slot_id)| {
let worker_id = worker_slot_id.worker_id();
let slot_id = worker_slot_id.slot_idx();
(
actor_id,
ActorStatus {
parallel_unit: Some(ParallelUnit {
id: *worker_id << 10 | *slot_id,
worker_node_id: *worker_id,
id: worker_id << 10 | slot_id,
worker_node_id: worker_id,
}),
state: ActorState::Inactive as i32,
},
Expand Down
12 changes: 7 additions & 5 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Scheduler {
let scheduled_worker_slots = scheduled
.into_iter()
.flat_map(|(worker_id, size)| {
(0..size).map(move |slot| WorkerSlotId(worker_id, slot as u32))
(0..size).map(move |slot| WorkerSlotId::new(worker_id, slot))
})
.collect_vec();

Expand All @@ -249,7 +249,7 @@ impl Scheduler {
let single_scheduled = schedule_units_for_slots_v2(&slots, 1, streaming_job_id)?;
let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap();

let default_singleton_worker_slot = WorkerSlotId(default_single_worker_id, 0);
let default_singleton_worker_slot = WorkerSlotId::new(default_single_worker_id, 0);

Ok(Self {
default_hash_mapping,
Expand Down Expand Up @@ -357,17 +357,19 @@ impl Locations {
pub fn worker_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
self.actor_locations
.iter()
.map(|(actor_id, WorkerSlotId(worker_id, _))| (*worker_id, *actor_id))
.map(|(actor_id, worker_slot_id)| (worker_slot_id.worker_id(), *actor_id))
.into_group_map()
}

/// Returns an iterator of `ActorInfo`.
pub fn actor_infos(&self) -> impl Iterator<Item = ActorInfo> + '_ {
self.actor_locations
.iter()
.map(|(actor_id, WorkerSlotId(worker_id, _))| ActorInfo {
.map(|(actor_id, worker_slot_id)| ActorInfo {
actor_id: *actor_id,
host: self.worker_locations[worker_id].host.clone(),
host: self.worker_locations[&worker_slot_id.worker_id()]
.host
.clone(),
})
}
}
Expand Down

0 comments on commit f24ea98

Please sign in to comment.