From 89da1ff889319c5e22fb09edf3a29da5893e5ad6 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 7 Mar 2024 00:44:42 +0800 Subject: [PATCH] fix(scale): Add skip_create_new_actors to RescheduleOptions (#15493) --- src/meta/service/src/scale_service.rs | 1 + src/meta/src/barrier/recovery.rs | 2 ++ src/meta/src/stream/scale.rs | 20 +++++++++++++------- src/meta/src/stream/stream_manager.rs | 1 + 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 33899856a57bc..379420a5da651 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -210,6 +210,7 @@ impl ScaleService for ScaleServiceImpl { .collect(), RescheduleOptions { resolve_no_shuffle_upstream, + skip_create_new_actors: false, }, Some(table_parallelisms), ) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 824cc8b0c090e..48b91c5e16931 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -671,6 +671,7 @@ impl GlobalBarrierManagerContext { plan, RescheduleOptions { resolve_no_shuffle_upstream: true, + skip_create_new_actors: true, }, Some(&mut compared_table_parallelisms), ) @@ -807,6 +808,7 @@ impl GlobalBarrierManagerContext { plan, RescheduleOptions { resolve_no_shuffle_upstream: true, + skip_create_new_actors: true, }, Some(&mut compared_table_parallelisms), ) diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 3b65c73d059cc..b113c2b6cb92e 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -439,6 +439,9 @@ pub fn rebalance_actor_vnode( pub struct RescheduleOptions { /// Whether to resolve the upstream of NoShuffle when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree. pub resolve_no_shuffle_upstream: bool, + + /// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created. + pub skip_create_new_actors: bool, } pub type ScaleControllerRef = Arc; @@ -1195,13 +1198,15 @@ impl ScaleController { } } - self.create_actors_on_compute_node( - &ctx.worker_nodes, - actor_infos_to_broadcast, - node_actors_to_create, - broadcast_worker_ids, - ) - .await?; + if !options.skip_create_new_actors { + self.create_actors_on_compute_node( + &ctx.worker_nodes, + actor_infos_to_broadcast, + node_actors_to_create, + broadcast_worker_ids, + ) + .await?; + } // For stream source fragments, we need to reallocate the splits. // Because we are in the Pause state, so it's no problem to reallocate @@ -2813,6 +2818,7 @@ impl GlobalStreamManager { reschedules, RescheduleOptions { resolve_no_shuffle_upstream: false, + skip_create_new_actors: false, }, None, ) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a41426c74487a..4a617fde012a7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -727,6 +727,7 @@ impl GlobalStreamManager { reschedules, RescheduleOptions { resolve_no_shuffle_upstream: false, + skip_create_new_actors: false, }, Some(table_parallelism_assignment), )