From 3b60e0c7787196a90fcc34fed91aa707bc724113 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 7 Mar 2024 14:46:38 +0800 Subject: [PATCH] step write etcd in recovery Signed-off-by: Shanicky Chen --- src/meta/src/barrier/recovery.rs | 87 ++++++++++++++++---------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 9d1676da0489f..b951fadf00be4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -786,7 +786,7 @@ impl GlobalBarrierManagerContext { } } - let table_parallelisms: HashMap<_, _> = { + let all_table_parallelisms: HashMap<_, _> = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; guard @@ -794,15 +794,15 @@ impl GlobalBarrierManagerContext { .iter() .filter(|&(_, table)| matches!(table.state(), State::Created)) .map(|(table_id, table)| { - let target_parallelism = - derive_target_parallelism_for_custom(current_parallelism, table); - - (table_id.table_id, target_parallelism) + ( + *table_id, + derive_target_parallelism_for_custom(current_parallelism, table), + ) }) .collect() }; - let schedulable_worker_ids = workers + let schedulable_worker_ids: BTreeSet<_> = workers .iter() .filter(|worker| { !worker @@ -814,29 +814,25 @@ impl GlobalBarrierManagerContext { .map(|worker| worker.id) .collect(); - let plan = self - .scale_controller - .generate_table_resize_plan(TableResizePolicy { - worker_ids: schedulable_worker_ids, - table_parallelisms: table_parallelisms.clone(), - }) - .await?; + for (table_id, table_parallelism) in all_table_parallelisms { + let plan = self + .scale_controller + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids.clone(), + table_parallelisms: HashMap::from([(table_id.table_id, table_parallelism)]), + }) + .await?; - let table_parallelisms: HashMap<_, _> = table_parallelisms - .into_iter() - .map(|(table_id, parallelism)| { - debug_assert_ne!(parallelism, TableParallelism::Custom); - (TableId::new(table_id), parallelism) - }) - .collect(); + if plan.is_empty() { + continue; + } - let mut compared_table_parallelisms = table_parallelisms.clone(); + let table_parallelisms = HashMap::from([(table_id, table_parallelism)]); - // skip reschedule if no reschedule is generated. - let (reschedule_fragment, applied_reschedules) = if plan.is_empty() { - (HashMap::new(), HashMap::new()) - } else { - self.scale_controller + let mut compared_table_parallelisms = table_parallelisms.clone(); + + let (reschedule_fragment, applied_reschedules) = self + .scale_controller .prepare_reschedule_command( plan, RescheduleOptions { @@ -845,27 +841,32 @@ impl GlobalBarrierManagerContext { }, Some(&mut compared_table_parallelisms), ) - .await? - }; + .await?; - // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. - debug_assert_eq!(compared_table_parallelisms, table_parallelisms); + // Because custom parallelism doesn't exist, this function won't result in a no-shuffle rewrite for table parallelisms. + debug_assert_eq!(compared_table_parallelisms, table_parallelisms); - if let Err(e) = self - .scale_controller - .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) - .await - { - tracing::error!( - error = %e.as_report(), - "failed to apply reschedule for offline scaling in recovery", - ); + if let Err(e) = self + .scale_controller + .post_apply_reschedule(&reschedule_fragment, &table_parallelisms) + .await + { + tracing::error!( + error = %e.as_report(), + "failed to apply reschedule for offline scaling in recovery", + ); - mgr.fragment_manager - .cancel_apply_reschedules(applied_reschedules) - .await; + mgr.fragment_manager + .cancel_apply_reschedules(applied_reschedules) + .await; - return Err(e); + return Err(e); + } + + tracing::info!( + "offline rescheduling for job {} in recovery is done", + table_id.table_id + ); } debug!("scaling actors succeed.");