Skip to content

Commit

Permalink
fix: Single fragment scaling is too conservative (#19259) (#19313)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
Co-authored-by: Shanicky Chen <[email protected]>
  • Loading branch information
github-actions[bot] and shanicky authored Nov 12, 2024
1 parent b78021a commit aebf896
Showing 1 changed file with 31 additions and 43 deletions.
74 changes: 31 additions & 43 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,9 +968,9 @@ impl ScaleController {
return;
}

let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let upstream_fragment = ctx.fragment_map.get(upstream_fragment_id).unwrap();
let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];

// build actor group map
for upstream_actor in &upstream_fragment.actors {
Expand All @@ -993,8 +993,7 @@ impl ScaleController {
(upstream_fragment.fragment_id, upstream_actor.actor_id),
);
} else {
let root_actor_id =
*actor_group_map.get(&upstream_actor.actor_id).unwrap();
let root_actor_id = actor_group_map[&upstream_actor.actor_id];

actor_group_map.insert(downstream_actor_id, root_actor_id);
}
Expand Down Expand Up @@ -1182,7 +1181,7 @@ impl ScaleController {
.cloned()
.unwrap_or_default();

let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

assert!(!fragment.actors.is_empty());

Expand Down Expand Up @@ -1223,11 +1222,10 @@ impl ScaleController {
// Because we are in the Pause state, so it's no problem to reallocate
let mut fragment_actor_splits = HashMap::new();
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];

if ctx.stream_source_fragment_ids.contains(fragment_id) {
let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let prev_actor_ids = fragment
.actors
Expand Down Expand Up @@ -1257,14 +1255,13 @@ impl ScaleController {
// We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
if !ctx.stream_source_backfill_fragment_ids.is_empty() {
for fragment_id in reschedules.keys() {
let actors_after_reschedule =
fragment_actors_after_reschedule.get(fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[fragment_id];

if ctx
.stream_source_backfill_fragment_ids
.contains(fragment_id)
{
let fragment = ctx.fragment_map.get(fragment_id).unwrap();
let fragment = &ctx.fragment_map[fragment_id];

let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();

Expand Down Expand Up @@ -1314,12 +1311,11 @@ impl ScaleController {
.into_keys()
.collect();

let actors_after_reschedule =
fragment_actors_after_reschedule.get(&fragment_id).unwrap();
let actors_after_reschedule = &fragment_actors_after_reschedule[&fragment_id];

assert!(!actors_after_reschedule.is_empty());

let fragment = ctx.fragment_map.get(&fragment_id).unwrap();
let fragment = &ctx.fragment_map[&fragment_id];

let in_degree_types: HashSet<_> = fragment
.upstream_fragment_ids
Expand Down Expand Up @@ -1575,7 +1571,7 @@ impl ScaleController {
no_shuffle_downstream_actors_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
new_actor: &mut PbStreamActor,
) -> MetaResult<()> {
let fragment = &ctx.fragment_map.get(&new_actor.fragment_id).unwrap();
let fragment = &ctx.fragment_map[&new_actor.fragment_id];
let mut applied_upstream_fragment_actor_ids = HashMap::new();

for upstream_fragment_id in &fragment.upstream_fragment_ids {
Expand All @@ -1588,7 +1584,7 @@ impl ScaleController {
match upstream_dispatch_type {
DispatcherType::Unspecified => unreachable!(),
DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
let upstream_fragment = &ctx.fragment_map.get(upstream_fragment_id).unwrap();
let upstream_fragment = &ctx.fragment_map[upstream_fragment_id];
let mut upstream_actor_ids = upstream_fragment
.actors
.iter()
Expand Down Expand Up @@ -1946,10 +1942,9 @@ impl ScaleController {

let mut fragment_slots: BTreeMap<WorkerId, usize> = BTreeMap::new();

for actor_id in fragment_actor_id_map.get(&fragment_id).unwrap() {
let worker_id = actor_location.get(actor_id).unwrap();

*fragment_slots.entry(*worker_id).or_default() += 1;
for actor_id in &fragment_actor_id_map[&fragment_id] {
let worker_id = actor_location[actor_id];
*fragment_slots.entry(worker_id).or_default() += 1;
}

let all_available_slots: usize = schedulable_worker_slots.values().cloned().sum();
Expand All @@ -1961,22 +1956,19 @@ impl ScaleController {
);
}

let &(dist, vnode_count) = fragment_distribution_map.get(&fragment_id).unwrap();
let (dist, vnode_count) = fragment_distribution_map[&fragment_id];
let max_parallelism = vnode_count;

match dist {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => {
let (single_worker_id, should_be_one) =
fragment_slots.iter().exactly_one().unwrap();
let (single_worker_id, should_be_one) = fragment_slots
.iter()
.exactly_one()
.expect("single fragment should have only one worker slot");

assert_eq!(*should_be_one, 1);

if schedulable_worker_slots.contains_key(single_worker_id) {
// NOTE: shall we continue?
continue;
}

let units =
schedule_units_for_slots(&schedulable_worker_slots, 1, table_id)?;

Expand All @@ -1988,7 +1980,11 @@ impl ScaleController {
})?;

assert_eq!(*should_be_one, 1);
assert_ne!(*chosen_target_worker_id, *single_worker_id);

if *chosen_target_worker_id == *single_worker_id {
tracing::debug!("single fragment {fragment_id} already on target worker {chosen_target_worker_id}");
continue;
}

target_plan.insert(
fragment_id,
Expand Down Expand Up @@ -2188,17 +2184,13 @@ impl ScaleController {
}

// for upstream
for upstream_fragment_id in &fragment_map
.get(&fragment_id)
.unwrap()
.upstream_fragment_ids
{
for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids {
if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
continue;
}

let table_id = fragment_to_table.get(&fragment_id).unwrap();
let upstream_table_id = fragment_to_table.get(upstream_fragment_id).unwrap();
let table_id = &fragment_to_table[&fragment_id];
let upstream_table_id = &fragment_to_table[upstream_fragment_id];

// Only custom parallelism will be propagated to the no shuffle upstream.
if let Some(TableParallelism::Custom) = table_parallelisms.get(table_id) {
Expand Down Expand Up @@ -2255,16 +2247,12 @@ impl ScaleController {
}

// for upstream
for upstream_fragment_id in &fragment_map
.get(&fragment_id)
.unwrap()
.upstream_fragment_ids
{
for upstream_fragment_id in &fragment_map[&fragment_id].upstream_fragment_ids {
if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) {
continue;
}

let reschedule_plan = reschedule.get(&fragment_id).unwrap();
let reschedule_plan = &reschedule[&fragment_id];

if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) {
if upstream_reschedule_plan != reschedule_plan {
Expand Down Expand Up @@ -2728,7 +2716,7 @@ impl ConsistentHashRing {
let ring_range = self.ring.range(task_hash..).chain(self.ring.iter());

for (_, &worker_id) in ring_range {
let task_limit = *soft_limits.get(&worker_id).unwrap();
let task_limit = soft_limits[&worker_id];

let worker_task_count = task_distribution.entry(worker_id).or_insert(0);

Expand Down

0 comments on commit aebf896

Please sign in to comment.