Skip to content

Commit

Permalink
feat: refine skipping for creating actors in recovery scale (#15573)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 8, 2024
1 parent b4d8d16 commit 7181e58
Showing 1 changed file with 65 additions and 65 deletions.
130 changes: 65 additions & 65 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,88 +1117,88 @@ impl ScaleController {
}
}

// After modification, for newly created actors, both upstream and downstream actor ids
// have been modified
let mut actor_infos_to_broadcast = BTreeMap::new();
let mut node_actors_to_create: HashMap<WorkerId, Vec<_>> = HashMap::new();
let mut broadcast_worker_ids = HashSet::new();

for actors_to_create in fragment_actors_to_create.values() {
for (new_actor_id, new_parallel_unit_id) in actors_to_create {
let new_actor = new_created_actors.get(new_actor_id).unwrap();
for upstream_actor_id in &new_actor.upstream_actor_id {
if new_created_actors.contains_key(upstream_actor_id) {
continue;
}

let upstream_worker_id = ctx
.actor_id_to_parallel_unit(upstream_actor_id)?
.worker_node_id;
let upstream_worker =
ctx.worker_nodes.get(&upstream_worker_id).with_context(|| {
format!("upstream worker {} not found", upstream_worker_id)
})?;

// Force broadcast upstream actor info, because the actor information of the new
// node may not have been synchronized yet
actor_infos_to_broadcast.insert(
*upstream_actor_id,
ActorInfo {
actor_id: *upstream_actor_id,
host: upstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(upstream_worker_id);
}

for dispatcher in &new_actor.dispatcher {
for downstream_actor_id in &dispatcher.downstream_actor_id {
if new_created_actors.contains_key(downstream_actor_id) {
if !options.skip_create_new_actors {
// After modification, for newly created actors, both upstream and downstream actor ids
// have been modified
let mut actor_infos_to_broadcast = BTreeMap::new();
let mut node_actors_to_create: HashMap<WorkerId, Vec<_>> = HashMap::new();
let mut broadcast_worker_ids = HashSet::new();

for actors_to_create in fragment_actors_to_create.values() {
for (new_actor_id, new_parallel_unit_id) in actors_to_create {
let new_actor = new_created_actors.get(new_actor_id).unwrap();
for upstream_actor_id in &new_actor.upstream_actor_id {
if new_created_actors.contains_key(upstream_actor_id) {
continue;
}
let downstream_worker_id = ctx
.actor_id_to_parallel_unit(downstream_actor_id)?

let upstream_worker_id = ctx
.actor_id_to_parallel_unit(upstream_actor_id)?
.worker_node_id;
let downstream_worker = ctx
.worker_nodes
.get(&downstream_worker_id)
.with_context(|| {
format!("downstream worker {} not found", downstream_worker_id)
let upstream_worker =
ctx.worker_nodes.get(&upstream_worker_id).with_context(|| {
format!("upstream worker {} not found", upstream_worker_id)
})?;

// Force broadcast upstream actor info, because the actor information of the new
// node may not have been synchronized yet
actor_infos_to_broadcast.insert(
*downstream_actor_id,
*upstream_actor_id,
ActorInfo {
actor_id: *downstream_actor_id,
host: downstream_worker.host.clone(),
actor_id: *upstream_actor_id,
host: upstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(downstream_worker_id);
broadcast_worker_ids.insert(upstream_worker_id);
}
}

let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?;
for dispatcher in &new_actor.dispatcher {
for downstream_actor_id in &dispatcher.downstream_actor_id {
if new_created_actors.contains_key(downstream_actor_id) {
continue;
}
let downstream_worker_id = ctx
.actor_id_to_parallel_unit(downstream_actor_id)?
.worker_node_id;
let downstream_worker = ctx
.worker_nodes
.get(&downstream_worker_id)
.with_context(|| {
format!("downstream worker {} not found", downstream_worker_id)
})?;

actor_infos_to_broadcast.insert(
*downstream_actor_id,
ActorInfo {
actor_id: *downstream_actor_id,
host: downstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(downstream_worker_id);
}
}

node_actors_to_create
.entry(worker.id)
.or_default()
.push(new_actor.clone());
let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?;

broadcast_worker_ids.insert(worker.id);
node_actors_to_create
.entry(worker.id)
.or_default()
.push(new_actor.clone());

actor_infos_to_broadcast.insert(
*new_actor_id,
ActorInfo {
actor_id: *new_actor_id,
host: worker.host.clone(),
},
);
broadcast_worker_ids.insert(worker.id);

actor_infos_to_broadcast.insert(
*new_actor_id,
ActorInfo {
actor_id: *new_actor_id,
host: worker.host.clone(),
},
);
}
}
}

if !options.skip_create_new_actors {
self.create_actors_on_compute_node(
&ctx.worker_nodes,
actor_infos_to_broadcast,
Expand Down

0 comments on commit 7181e58

Please sign in to comment.