Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refine skipping for creating actors in recovery scale (#15573) #15576

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 65 additions & 65 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,88 +1117,88 @@ impl ScaleController {
}
}

// After modification, for newly created actors, both upstream and downstream actor ids
// have been modified
let mut actor_infos_to_broadcast = BTreeMap::new();
let mut node_actors_to_create: HashMap<WorkerId, Vec<_>> = HashMap::new();
let mut broadcast_worker_ids = HashSet::new();

for actors_to_create in fragment_actors_to_create.values() {
for (new_actor_id, new_parallel_unit_id) in actors_to_create {
let new_actor = new_created_actors.get(new_actor_id).unwrap();
for upstream_actor_id in &new_actor.upstream_actor_id {
if new_created_actors.contains_key(upstream_actor_id) {
continue;
}

let upstream_worker_id = ctx
.actor_id_to_parallel_unit(upstream_actor_id)?
.worker_node_id;
let upstream_worker =
ctx.worker_nodes.get(&upstream_worker_id).with_context(|| {
format!("upstream worker {} not found", upstream_worker_id)
})?;

// Force broadcast upstream actor info, because the actor information of the new
// node may not have been synchronized yet
actor_infos_to_broadcast.insert(
*upstream_actor_id,
ActorInfo {
actor_id: *upstream_actor_id,
host: upstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(upstream_worker_id);
}

for dispatcher in &new_actor.dispatcher {
for downstream_actor_id in &dispatcher.downstream_actor_id {
if new_created_actors.contains_key(downstream_actor_id) {
if !options.skip_create_new_actors {
// After modification, for newly created actors, both upstream and downstream actor ids
// have been modified
let mut actor_infos_to_broadcast = BTreeMap::new();
let mut node_actors_to_create: HashMap<WorkerId, Vec<_>> = HashMap::new();
let mut broadcast_worker_ids = HashSet::new();

for actors_to_create in fragment_actors_to_create.values() {
for (new_actor_id, new_parallel_unit_id) in actors_to_create {
let new_actor = new_created_actors.get(new_actor_id).unwrap();
for upstream_actor_id in &new_actor.upstream_actor_id {
if new_created_actors.contains_key(upstream_actor_id) {
continue;
}
let downstream_worker_id = ctx
.actor_id_to_parallel_unit(downstream_actor_id)?

let upstream_worker_id = ctx
.actor_id_to_parallel_unit(upstream_actor_id)?
.worker_node_id;
let downstream_worker = ctx
.worker_nodes
.get(&downstream_worker_id)
.with_context(|| {
format!("downstream worker {} not found", downstream_worker_id)
let upstream_worker =
ctx.worker_nodes.get(&upstream_worker_id).with_context(|| {
format!("upstream worker {} not found", upstream_worker_id)
})?;

// Force broadcast upstream actor info, because the actor information of the new
// node may not have been synchronized yet
actor_infos_to_broadcast.insert(
*downstream_actor_id,
*upstream_actor_id,
ActorInfo {
actor_id: *downstream_actor_id,
host: downstream_worker.host.clone(),
actor_id: *upstream_actor_id,
host: upstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(downstream_worker_id);
broadcast_worker_ids.insert(upstream_worker_id);
}
}

let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?;
for dispatcher in &new_actor.dispatcher {
for downstream_actor_id in &dispatcher.downstream_actor_id {
if new_created_actors.contains_key(downstream_actor_id) {
continue;
}
let downstream_worker_id = ctx
.actor_id_to_parallel_unit(downstream_actor_id)?
.worker_node_id;
let downstream_worker = ctx
.worker_nodes
.get(&downstream_worker_id)
.with_context(|| {
format!("downstream worker {} not found", downstream_worker_id)
})?;

actor_infos_to_broadcast.insert(
*downstream_actor_id,
ActorInfo {
actor_id: *downstream_actor_id,
host: downstream_worker.host.clone(),
},
);

broadcast_worker_ids.insert(downstream_worker_id);
}
}

node_actors_to_create
.entry(worker.id)
.or_default()
.push(new_actor.clone());
let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?;

broadcast_worker_ids.insert(worker.id);
node_actors_to_create
.entry(worker.id)
.or_default()
.push(new_actor.clone());

actor_infos_to_broadcast.insert(
*new_actor_id,
ActorInfo {
actor_id: *new_actor_id,
host: worker.host.clone(),
},
);
broadcast_worker_ids.insert(worker.id);

actor_infos_to_broadcast.insert(
*new_actor_id,
ActorInfo {
actor_id: *new_actor_id,
host: worker.host.clone(),
},
);
}
}
}

if !options.skip_create_new_actors {
self.create_actors_on_compute_node(
&ctx.worker_nodes,
actor_infos_to_broadcast,
Expand Down
Loading