Skip to content

Commit

Permalink
fix(scale): Add skip_create_new_actors to RescheduleOptions (#15493)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 6, 2024
1 parent f9cfbd0 commit 32c6e8c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl ScaleService for ScaleServiceImpl {
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ impl GlobalBarrierManagerContext {
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
skip_create_new_actors: true,
},
Some(&mut compared_table_parallelisms),
)
Expand Down Expand Up @@ -828,6 +829,7 @@ impl GlobalBarrierManagerContext {
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
skip_create_new_actors: true,
},
Some(&mut compared_table_parallelisms),
)
Expand Down
20 changes: 13 additions & 7 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ pub fn rebalance_actor_vnode(
pub struct RescheduleOptions {
/// Whether to resolve the upstream of NoShuffle when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree.
pub resolve_no_shuffle_upstream: bool,

/// Whether to skip creating new actors. If it is true, the scaling-out actors will not be created.
pub skip_create_new_actors: bool,
}

pub type ScaleControllerRef = Arc<ScaleController>;
Expand Down Expand Up @@ -1195,13 +1198,15 @@ impl ScaleController {
}
}

self.create_actors_on_compute_node(
&ctx.worker_nodes,
actor_infos_to_broadcast,
node_actors_to_create,
broadcast_worker_ids,
)
.await?;
if !options.skip_create_new_actors {
self.create_actors_on_compute_node(
&ctx.worker_nodes,
actor_infos_to_broadcast,
node_actors_to_create,
broadcast_worker_ids,
)
.await?;
}

// For stream source fragments, we need to reallocate the splits.
// Because we are in the Pause state, so it's no problem to reallocate
Expand Down Expand Up @@ -2813,6 +2818,7 @@ impl GlobalStreamManager {
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
skip_create_new_actors: false,
},
None,
)
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ impl GlobalStreamManager {
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
skip_create_new_actors: false,
},
Some(table_parallelism_assignment),
)
Expand Down

0 comments on commit 32c6e8c

Please sign in to comment.