Skip to content

Commit

Permalink
feat: batch-written to etcd in scaling from recovery (#15510)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Mar 7, 2024
1 parent 176eacf commit dbace2d
Showing 1 changed file with 44 additions and 43 deletions.
87 changes: 44 additions & 43 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,23 +786,23 @@ impl GlobalBarrierManagerContext {
}
}

let table_parallelisms: HashMap<_, _> = {
let all_table_parallelisms: HashMap<_, _> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

guard
.table_fragments()
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| {
let target_parallelism =
derive_target_parallelism_for_custom(current_parallelism, table);

(table_id.table_id, target_parallelism)
(
*table_id,
derive_target_parallelism_for_custom(current_parallelism, table),
)
})
.collect()
};

let schedulable_worker_ids = workers
let schedulable_worker_ids: BTreeSet<_> = workers
.iter()
.filter(|worker| {
!worker
Expand All @@ -814,29 +814,25 @@ impl GlobalBarrierManagerContext {
.map(|worker| worker.id)
.collect();

let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms: table_parallelisms.clone(),
})
.await?;
for (table_id, table_parallelism) in all_table_parallelisms {
let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids.clone(),
table_parallelisms: HashMap::from([(table_id.table_id, table_parallelism)]),
})
.await?;

let table_parallelisms: HashMap<_, _> = table_parallelisms
.into_iter()
.map(|(table_id, parallelism)| {
debug_assert_ne!(parallelism, TableParallelism::Custom);
(TableId::new(table_id), parallelism)
})
.collect();
if plan.is_empty() {
continue;
}

let mut compared_table_parallelisms = table_parallelisms.clone();
let table_parallelisms = HashMap::from([(table_id, table_parallelism)]);

// skip reschedule if no reschedule is generated.
let (reschedule_fragment, applied_reschedules) = if plan.is_empty() {
(HashMap::new(), HashMap::new())
} else {
self.scale_controller
let mut compared_table_parallelisms = table_parallelisms.clone();

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(
plan,
RescheduleOptions {
Expand All @@ -845,27 +841,32 @@ impl GlobalBarrierManagerContext {
},
Some(&mut compared_table_parallelisms),
)
.await?
};
.await?;

// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);
// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
.await
{
tracing::error!(
error = %e.as_report(),
"failed to apply reschedule for offline scaling in recovery",
);
if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
.await
{
tracing::error!(
error = %e.as_report(),
"failed to apply reschedule for offline scaling in recovery",
);

mgr.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;
mgr.fragment_manager
.cancel_apply_reschedules(applied_reschedules)
.await;

return Err(e);
return Err(e);
}

tracing::info!(
"offline rescheduling for job {} in recovery is done",
table_id.table_id
);
}

debug!("scaling actors succeed.");
Expand Down

0 comments on commit dbace2d

Please sign in to comment.