diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index b113c2b6cb92e..9afca2d65a036 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1117,88 +1117,88 @@ impl ScaleController { } } - // After modification, for newly created actors, both upstream and downstream actor ids - // have been modified - let mut actor_infos_to_broadcast = BTreeMap::new(); - let mut node_actors_to_create: HashMap> = HashMap::new(); - let mut broadcast_worker_ids = HashSet::new(); - - for actors_to_create in fragment_actors_to_create.values() { - for (new_actor_id, new_parallel_unit_id) in actors_to_create { - let new_actor = new_created_actors.get(new_actor_id).unwrap(); - for upstream_actor_id in &new_actor.upstream_actor_id { - if new_created_actors.contains_key(upstream_actor_id) { - continue; - } - - let upstream_worker_id = ctx - .actor_id_to_parallel_unit(upstream_actor_id)? - .worker_node_id; - let upstream_worker = - ctx.worker_nodes.get(&upstream_worker_id).with_context(|| { - format!("upstream worker {} not found", upstream_worker_id) - })?; - - // Force broadcast upstream actor info, because the actor information of the new - // node may not have been synchronized yet - actor_infos_to_broadcast.insert( - *upstream_actor_id, - ActorInfo { - actor_id: *upstream_actor_id, - host: upstream_worker.host.clone(), - }, - ); - - broadcast_worker_ids.insert(upstream_worker_id); - } - - for dispatcher in &new_actor.dispatcher { - for downstream_actor_id in &dispatcher.downstream_actor_id { - if new_created_actors.contains_key(downstream_actor_id) { + if !options.skip_create_new_actors { + // After modification, for newly created actors, both upstream and downstream actor ids + // have been modified + let mut actor_infos_to_broadcast = BTreeMap::new(); + let mut node_actors_to_create: HashMap> = HashMap::new(); + let mut broadcast_worker_ids = HashSet::new(); + + for actors_to_create in fragment_actors_to_create.values() { + for (new_actor_id, new_parallel_unit_id) in actors_to_create { + let new_actor = new_created_actors.get(new_actor_id).unwrap(); + for upstream_actor_id in &new_actor.upstream_actor_id { + if new_created_actors.contains_key(upstream_actor_id) { continue; } - let downstream_worker_id = ctx - .actor_id_to_parallel_unit(downstream_actor_id)? + + let upstream_worker_id = ctx + .actor_id_to_parallel_unit(upstream_actor_id)? .worker_node_id; - let downstream_worker = ctx - .worker_nodes - .get(&downstream_worker_id) - .with_context(|| { - format!("downstream worker {} not found", downstream_worker_id) + let upstream_worker = + ctx.worker_nodes.get(&upstream_worker_id).with_context(|| { + format!("upstream worker {} not found", upstream_worker_id) })?; + // Force broadcast upstream actor info, because the actor information of the new + // node may not have been synchronized yet actor_infos_to_broadcast.insert( - *downstream_actor_id, + *upstream_actor_id, ActorInfo { - actor_id: *downstream_actor_id, - host: downstream_worker.host.clone(), + actor_id: *upstream_actor_id, + host: upstream_worker.host.clone(), }, ); - broadcast_worker_ids.insert(downstream_worker_id); + broadcast_worker_ids.insert(upstream_worker_id); } - } - let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?; + for dispatcher in &new_actor.dispatcher { + for downstream_actor_id in &dispatcher.downstream_actor_id { + if new_created_actors.contains_key(downstream_actor_id) { + continue; + } + let downstream_worker_id = ctx + .actor_id_to_parallel_unit(downstream_actor_id)? + .worker_node_id; + let downstream_worker = ctx + .worker_nodes + .get(&downstream_worker_id) + .with_context(|| { + format!("downstream worker {} not found", downstream_worker_id) + })?; + + actor_infos_to_broadcast.insert( + *downstream_actor_id, + ActorInfo { + actor_id: *downstream_actor_id, + host: downstream_worker.host.clone(), + }, + ); + + broadcast_worker_ids.insert(downstream_worker_id); + } + } - node_actors_to_create - .entry(worker.id) - .or_default() - .push(new_actor.clone()); + let worker = ctx.parallel_unit_id_to_worker(new_parallel_unit_id)?; - broadcast_worker_ids.insert(worker.id); + node_actors_to_create + .entry(worker.id) + .or_default() + .push(new_actor.clone()); - actor_infos_to_broadcast.insert( - *new_actor_id, - ActorInfo { - actor_id: *new_actor_id, - host: worker.host.clone(), - }, - ); + broadcast_worker_ids.insert(worker.id); + + actor_infos_to_broadcast.insert( + *new_actor_id, + ActorInfo { + actor_id: *new_actor_id, + host: worker.host.clone(), + }, + ); + } } - } - if !options.skip_create_new_actors { self.create_actors_on_compute_node( &ctx.worker_nodes, actor_infos_to_broadcast,