From 777760b44a34964fa41bc037dcf1343f55e1102c Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Fri, 19 Jul 2024 15:13:56 +0800 Subject: [PATCH] Add migration timeout; update logging terminology; scale worker slots dynamically based on need --- src/meta/src/barrier/recovery.rs | 169 +++++++++++++++++++++++-------- src/meta/src/manager/metadata.rs | 13 ++- 2 files changed, 134 insertions(+), 48 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 454098e9e14b3..2187bd00e3064 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -48,6 +48,8 @@ use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResize use crate::{model, MetaError, MetaResult}; impl GlobalBarrierManager { + // Migration timeout. + const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration = Duration::from_secs(100); // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. @@ -470,16 +472,13 @@ impl GlobalBarrierManagerContext { .collect(); if expired_worker_slots.is_empty() { - debug!("no expired parallel units, skipping."); + debug!("no expired worker slots, skipping."); return self.resolve_actor_info(active_nodes).await; } debug!("start migrate actors."); let mut to_migrate_worker_slots = expired_worker_slots.into_iter().rev().collect_vec(); - debug!( - "got to migrate parallel units {:#?}", - to_migrate_worker_slots - ); + debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots); let mut inuse_worker_slots: HashSet<_> = all_inuse_worker_slots .intersection(&active_worker_slots) @@ -489,15 +488,51 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); let mut plan = HashMap::new(); 'discovery: while !to_migrate_worker_slots.is_empty() { - let new_worker_slots = active_nodes + let mut new_worker_slots = active_nodes .current() .values() - .flat_map(|node| { - (0..node.parallelism) - .map(|idx| WorkerSlotId::new(node.id, idx as _)) - .filter(|worker_slot| !inuse_worker_slots.contains(worker_slot)) + .flat_map(|worker| { + (0..worker.parallelism).map(move |i| WorkerSlotId::new(worker.id, i as _)) }) .collect_vec(); + + let all_size = new_worker_slots.len(); + + new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot)); + let to_migration_size = to_migrate_worker_slots.len(); + let available_size = new_worker_slots.len(); + let used_size = all_size - available_size; + + if available_size < to_migration_size + && start.elapsed() > GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT + { + let factor = + (((to_migration_size + used_size) as f32) / (all_size as f32)).ceil() as u32; + let mut extended_worker_slots = active_nodes + .current() + .values() + .flat_map(|node| { + (0..node.parallelism * factor) + .map(|idx| WorkerSlotId::new(node.id, idx as _)) + .filter(|worker_slot| !inuse_worker_slots.contains(worker_slot)) + }) + .collect_vec(); + + extended_worker_slots.sort_by(|a, b| { + a.slot_idx() + .cmp(&b.slot_idx()) + .then(a.worker_id().cmp(&b.worker_id())) + }); + + tracing::info!( + "migration timed out, extending worker slots to {:?} by factor {}", + extended_worker_slots, + factor, + ); + + new_worker_slots = extended_worker_slots; + } + if !new_worker_slots.is_empty() { debug!("new worker slots found: {:#?}", new_worker_slots); for target_worker_slot in new_worker_slots { @@ -520,20 +555,24 @@ impl GlobalBarrierManagerContext { // wait to get newly joined CN let changed = active_nodes - .wait_changed(Duration::from_millis(5000), |active_nodes| { - let current_nodes = active_nodes - .current() - .values() - .map(|node| (node.id, &node.host, node.parallelism)) - .collect_vec(); - warn!( - current_nodes = ?current_nodes, - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); - }) + .wait_changed( + Duration::from_millis(5000), + GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT, + |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, node.parallelism)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }, + ) .await; - warn!(?changed, "get worker changed. Retry migrate"); + warn!(?changed, "get worker changed or timed out. Retry migrate"); } mgr.catalog_controller.migrate_actors(plan).await?; @@ -768,7 +807,7 @@ impl GlobalBarrierManagerContext { .sum(); if available_parallelism == 0 { - return Err(anyhow!("no available parallel units for auto scaling").into()); + return Err(anyhow!("no available worker slots for auto scaling").into()); } let all_table_parallelisms: HashMap<_, _> = { @@ -896,7 +935,7 @@ impl GlobalBarrierManagerContext { cached_plan.worker_slot_plan.retain(|from, to| { if to_migrate_worker_slots.contains(from) { if !to_migrate_worker_slots.contains(to) { - // clean up target parallel units in migration plan that are expired and not + // clean up target worker slots in migration plan that are expired and not // used by any actors. return !expired_workers.contains(&to.worker_id()); } @@ -908,18 +947,15 @@ impl GlobalBarrierManagerContext { inuse_worker_slots.extend(cached_plan.worker_slot_plan.values()); if to_migrate_worker_slots.is_empty() { - // all expired parallel units are already in migration plan. - debug!("all expired parallel units are already in migration plan."); + // all expired worker slots are already in migration plan. + debug!("all expired worker slots are already in migration plan."); return Ok(cached_plan); } let mut to_migrate_worker_slots = to_migrate_worker_slots.into_iter().rev().collect_vec(); - debug!( - "got to migrate parallel units {:#?}", - to_migrate_worker_slots - ); + debug!("got to migrate worker slots {:#?}", to_migrate_worker_slots); let start = Instant::now(); - // if in-used expire parallel units are not empty, should wait for newly joined worker. + // if in-used expire worker slots are not empty, should wait for newly joined worker. 'discovery: while !to_migrate_worker_slots.is_empty() { let mut new_worker_slots = active_nodes .current() @@ -929,8 +965,47 @@ impl GlobalBarrierManagerContext { }) .collect_vec(); + let all_size = new_worker_slots.len(); + new_worker_slots.retain(|worker_slot| !inuse_worker_slots.contains(worker_slot)); + let to_migration_size = to_migrate_worker_slots.len(); + let available_size = new_worker_slots.len(); + let used_size = all_size - available_size; + + if available_size < to_migration_size + && start.elapsed() > GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT + { + let factor = + (((to_migration_size + used_size) as f32) / (all_size as f32)).ceil() as u32; + + let mut extended_worker_slots = active_nodes + .current() + .values() + .flat_map(|worker| { + (0..worker.parallelism * factor) + .map(move |i| WorkerSlotId::new(worker.id, i as _)) + }) + .collect_vec(); + + extended_worker_slots + .retain(|worker_slot| !inuse_worker_slots.contains(worker_slot)); + + extended_worker_slots.sort_by(|a, b| { + a.slot_idx() + .cmp(&b.slot_idx()) + .then(a.worker_id().cmp(&b.worker_id())) + }); + + tracing::info!( + "migration timed out, extending worker slots to {:?} by factor {}", + extended_worker_slots, + factor, + ); + + new_worker_slots = extended_worker_slots; + } + if !new_worker_slots.is_empty() { debug!("new worker slots found: {:#?}", new_worker_slots); for target_worker_slot in new_worker_slots { @@ -955,20 +1030,24 @@ impl GlobalBarrierManagerContext { // wait to get newly joined CN let changed = active_nodes - .wait_changed(Duration::from_millis(5000), |active_nodes| { - let current_nodes = active_nodes - .current() - .values() - .map(|node| (node.id, &node.host, &node.parallelism)) - .collect_vec(); - warn!( - current_nodes = ?current_nodes, - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); - }) + .wait_changed( + Duration::from_millis(5000), + GlobalBarrierManager::RECOVERY_FORCE_MIGRATION_TIMEOUT, + |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, &node.parallelism)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }, + ) .await; - warn!(?changed, "get worker changed. Retry migrate"); + warn!(?changed, "get worker changed or timed out. Retry migrate"); } // update migration plan, if there is a chain in the plan, update it. diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 84143ddca80a0..b82449a7eb51e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -29,7 +29,7 @@ use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; use risingwave_pb::stream_service::BuildActorInfo; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; -use tokio::time::sleep; +use tokio::time::{sleep, Instant}; use tracing::warn; use crate::barrier::Reschedule; @@ -104,14 +104,21 @@ impl ActiveStreamingWorkerNodes { pub(crate) async fn wait_changed( &mut self, verbose_internal: Duration, + verbose_timeout: Duration, verbose_fn: impl Fn(&Self), - ) -> ActiveStreamingWorkerChange { + ) -> Option { + let start = Instant::now(); loop { if let Either::Left((change, _)) = select(pin!(self.changed()), pin!(sleep(verbose_internal))).await { - break change; + break Some(change); } + + if start.elapsed() > verbose_timeout { + break None; + } + verbose_fn(self) } }