Skip to content

Commit

Permalink
chore: more logs for offline scaling (#19407) (#19541)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Nov 22, 2024
1 parent 6aae038 commit 0b4c1e2
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,20 @@ impl GlobalBarrierManager {
.list_background_creating_jobs()
.await?;

info!(
"background streaming jobs: {:?} total {}",
background_streaming_jobs,
background_streaming_jobs.len()
);

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
// FIXME: Transactions should be used.
// TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
let mut info = if !self.env.opts.disable_automatic_parallelism_control
&& background_streaming_jobs.is_empty()
{
info!("trigger offline scaling");
self.context
.scale_actors(&active_streaming_nodes)
.await
Expand All @@ -210,6 +217,7 @@ impl GlobalBarrierManager {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
info!("trigger actor migration");
// Migrate actors in expired CN to newly joined one.
self.context
.migrate_actors(&mut active_streaming_nodes)
Expand Down Expand Up @@ -532,7 +540,7 @@ impl GlobalBarrierManagerContext {

mgr.catalog_controller.migrate_actors(plan).await?;

debug!("migrate actors succeed.");
info!("migrate actors succeed.");

self.resolve_graph_info().await
}
Expand Down Expand Up @@ -603,6 +611,11 @@ impl GlobalBarrierManagerContext {
result
};

info!(
"target table parallelisms for offline scaling: {:?}",
table_parallelisms
);

let schedulable_worker_ids = active_nodes
.current()
.values()
Expand All @@ -616,6 +629,11 @@ impl GlobalBarrierManagerContext {
.map(|worker| worker.id as WorkerId)
.collect();

info!(
"target worker ids for offline scaling: {:?}",
schedulable_worker_ids
);

let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
Expand Down Expand Up @@ -653,6 +671,8 @@ impl GlobalBarrierManagerContext {
// Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms.
debug_assert_eq!(compared_table_parallelisms, table_parallelisms);

info!("post applying reschedule for offline scaling");

if let Err(e) = self
.scale_controller
.post_apply_reschedule(&reschedule_fragment, &table_parallelisms)
Expand All @@ -666,7 +686,7 @@ impl GlobalBarrierManagerContext {
return Err(e);
}

debug!("scaling actors succeed.");
info!("scaling actors succeed.");
Ok(())
}

Expand Down

0 comments on commit 0b4c1e2

Please sign in to comment.