From 0b4c1e2e6dd2f7dd39516965ee505cddc75b9163 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 22 Nov 2024 17:49:55 +0800 Subject: [PATCH] chore: more logs for offline scaling (#19407) (#19541) Signed-off-by: Shanicky Chen --- src/meta/src/barrier/recovery.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1ff673d7da081..48496cf6b7271 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -192,6 +192,12 @@ impl GlobalBarrierManager { .list_background_creating_jobs() .await?; + info!( + "background streaming jobs: {:?} total {}", + background_streaming_jobs, + background_streaming_jobs.len() + ); + // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. // FIXME: Transactions should be used. @@ -199,6 +205,7 @@ impl GlobalBarrierManager { let mut info = if !self.env.opts.disable_automatic_parallelism_control && background_streaming_jobs.is_empty() { + info!("trigger offline scaling"); self.context .scale_actors(&active_streaming_nodes) .await @@ -210,6 +217,7 @@ impl GlobalBarrierManager { warn!(error = %err.as_report(), "resolve actor info failed"); })? } else { + info!("trigger actor migration"); // Migrate actors in expired CN to newly joined one. self.context .migrate_actors(&mut active_streaming_nodes) @@ -532,7 +540,7 @@ impl GlobalBarrierManagerContext { mgr.catalog_controller.migrate_actors(plan).await?; - debug!("migrate actors succeed."); + info!("migrate actors succeed."); self.resolve_graph_info().await } @@ -603,6 +611,11 @@ impl GlobalBarrierManagerContext { result }; + info!( + "target table parallelisms for offline scaling: {:?}", + table_parallelisms + ); + let schedulable_worker_ids = active_nodes .current() .values() @@ -616,6 +629,11 @@ impl GlobalBarrierManagerContext { .map(|worker| worker.id as WorkerId) .collect(); + info!( + "target worker ids for offline scaling: {:?}", + schedulable_worker_ids + ); + let plan = self .scale_controller .generate_table_resize_plan(TableResizePolicy { @@ -653,6 +671,8 @@ impl GlobalBarrierManagerContext { // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. debug_assert_eq!(compared_table_parallelisms, table_parallelisms); + info!("post applying reschedule for offline scaling"); + if let Err(e) = self .scale_controller .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) @@ -666,7 +686,7 @@ impl GlobalBarrierManagerContext { return Err(e); } - debug!("scaling actors succeed."); + info!("scaling actors succeed."); Ok(()) }