diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ea4354f9b23ad..2bae32def8d30 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -229,6 +229,18 @@ pub struct MetaConfig { #[serde(default)] pub disable_automatic_parallelism_control: bool, + /// The number of streaming jobs per scaling operation. + #[serde(default = "default::meta::parallelism_control_batch_size")] + pub parallelism_control_batch_size: usize, + + /// The period of parallelism control trigger. + #[serde(default = "default::meta::parallelism_control_trigger_period_sec")] + pub parallelism_control_trigger_period_sec: u64, + + /// The first delay of parallelism control. + #[serde(default = "default::meta::parallelism_control_trigger_first_delay_sec")] + pub parallelism_control_trigger_first_delay_sec: u64, + #[serde(default = "default::meta::meta_leader_lease_secs")] pub meta_leader_lease_secs: u64, @@ -1103,6 +1115,18 @@ pub mod default { pub fn event_log_channel_max_size() -> u32 { 10 } + + pub fn parallelism_control_batch_size() -> usize { + 10 + } + + pub fn parallelism_control_trigger_period_sec() -> u64 { + 10 + } + + pub fn parallelism_control_trigger_first_delay_sec() -> u64 { + 30 + } } pub mod server { diff --git a/src/config/example.toml b/src/config/example.toml index 6a6314d7832d2..954a9c0f12e38 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -26,6 +26,9 @@ min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 300 disable_recovery = false disable_automatic_parallelism_control = false +parallelism_control_batch_size = 10 +parallelism_control_trigger_period_sec = 10 +parallelism_control_trigger_first_delay_sec = 30 meta_leader_lease_secs = 30 default_parallelism = "Full" enable_compaction_deterministic = false diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 867e2409c178f..b7f290ecff469 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -281,6 +281,13 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { disable_automatic_parallelism_control: config .meta .disable_automatic_parallelism_control, + parallelism_control_batch_size: config.meta.parallelism_control_batch_size, + parallelism_control_trigger_period_sec: config + .meta + .parallelism_control_trigger_period_sec, + parallelism_control_trigger_first_delay_sec: config + .meta + .parallelism_control_trigger_first_delay_sec, in_flight_barrier_nums, max_idle_ms, compaction_deterministic_test: config.meta.enable_compaction_deterministic, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6b1b73d6ca697..86cc3fa078784 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -615,7 +615,7 @@ impl CommandContext { actor_splits, actor_new_dispatchers, }); - tracing::debug!("update mutation: {mutation:#?}"); + tracing::debug!("update mutation: {mutation:?}"); Some(mutation) } }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1208d2c49b58d..824cc8b0c090e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -695,7 +695,7 @@ impl GlobalBarrierManagerContext { return Err(e); } - debug!("scaling-in actors succeed."); + debug!("scaling actors succeed."); Ok(()) } @@ -833,7 +833,7 @@ impl GlobalBarrierManagerContext { return Err(e); } - debug!("scaling-in actors succeed."); + debug!("scaling actors succeed."); Ok(()) } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index f10138b56e81e..ea26fea83ebcc 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -93,6 +93,12 @@ pub struct MetaOpts { pub enable_recovery: bool, /// Whether to disable the auto-scaling feature. pub disable_automatic_parallelism_control: bool, + /// The number of streaming jobs per scaling operation. + pub parallelism_control_batch_size: usize, + /// The period of parallelism control trigger. + pub parallelism_control_trigger_period_sec: u64, + /// The first delay of parallelism control. + pub parallelism_control_trigger_first_delay_sec: u64, /// The maximum number of barriers in-flight in the compute nodes. pub in_flight_barrier_nums: usize, /// After specified seconds of idle (no mview or flush), the process will be exited. @@ -221,6 +227,9 @@ impl MetaOpts { Self { enable_recovery, disable_automatic_parallelism_control: false, + parallelism_control_batch_size: 1, + parallelism_control_trigger_period_sec: 10, + parallelism_control_trigger_first_delay_sec: 30, in_flight_barrier_nums: 40, max_idle_ms: 0, compaction_deterministic_test: false, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 0e571a0afebf7..3b65c73d059cc 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -31,7 +31,9 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode}; +use risingwave_pb::common::{ + ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode, WorkerType, +}; use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -48,7 +50,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::task::JoinHandle; -use tokio::time::MissedTickBehavior; +use tokio::time::{Instant, MissedTickBehavior}; use crate::barrier::{Command, Reschedule, StreamRpcManager}; use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId}; @@ -2676,12 +2678,14 @@ impl GlobalStreamManager { Ok(()) } - async fn trigger_parallelism_control(&self) -> MetaResult<()> { + async fn trigger_parallelism_control(&self) -> MetaResult { + tracing::info!("trigger parallelism control"); + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; - match &self.metadata_manager { + let (schedulable_worker_ids, table_parallelisms) = match &self.metadata_manager { MetadataManager::V1(mgr) => { - let table_parallelisms = { + let table_parallelisms: HashMap = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; guard @@ -2697,7 +2701,7 @@ impl GlobalStreamManager { .list_active_streaming_compute_nodes() .await; - let schedulable_worker_ids = workers + let schedulable_worker_ids: BTreeSet<_> = workers .iter() .filter(|worker| { !worker @@ -2709,26 +2713,7 @@ impl GlobalStreamManager { .map(|worker| worker.id) .collect(); - let reschedules = self - .scale_controller - .generate_table_resize_plan(TableResizePolicy { - worker_ids: schedulable_worker_ids, - table_parallelisms, - }) - .await?; - - if reschedules.is_empty() { - return Ok(()); - } - - self.reschedule_actors( - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: true, - }, - None, - ) - .await?; + (schedulable_worker_ids, table_parallelisms) } MetadataManager::V2(mgr) => { let table_parallelisms: HashMap<_, _> = { @@ -2768,33 +2753,90 @@ impl GlobalStreamManager { .map(|worker| worker.id) .collect(); - let reschedules = self - .scale_controller - .generate_table_resize_plan(TableResizePolicy { - worker_ids: schedulable_worker_ids, - table_parallelisms: table_parallelisms.clone(), - }) - .await?; + (schedulable_worker_ids, table_parallelisms) + } + }; - if reschedules.is_empty() { - return Ok(()); - } + if table_parallelisms.is_empty() { + tracing::info!("no streaming jobs for scaling, maybe an empty cluster"); + return Ok(false); + } - self.reschedule_actors( - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: true, - }, - None, - ) + let batch_size = match self.env.opts.parallelism_control_batch_size { + 0 => table_parallelisms.len(), + n => n, + }; + + tracing::info!( + "total {} streaming jobs, batch size {}, schedulable worker ids: {:?}", + table_parallelisms.len(), + batch_size, + schedulable_worker_ids + ); + + let batches: Vec<_> = table_parallelisms + .into_iter() + .chunks(batch_size) + .into_iter() + .map(|chunk| chunk.collect_vec()) + .collect(); + + let mut reschedules = None; + + for batch in batches { + let parallelisms: HashMap<_, _> = batch.into_iter().collect(); + + let plan = self + .scale_controller + .generate_table_resize_plan(TableResizePolicy { + worker_ids: schedulable_worker_ids.clone(), + table_parallelisms: parallelisms.clone(), + }) .await?; + + if !plan.is_empty() { + tracing::info!( + "reschedule plan generated for streaming jobs {:?}", + parallelisms + ); + reschedules = Some(plan); + break; } } - Ok(()) + let Some(reschedules) = reschedules else { + tracing::info!("no reschedule plan generated"); + return Ok(false); + }; + + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: false, + }, + None, + ) + .await?; + + Ok(true) } async fn run(&self, mut shutdown_rx: Receiver<()>) { + tracing::info!("starting automatic parallelism control monitor"); + + let check_period = + Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec); + + let mut ticker = tokio::time::interval_at( + Instant::now() + + Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec), + check_period, + ); + ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // waiting for first tick + ticker.tick().await; + let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -2803,11 +2845,6 @@ impl GlobalStreamManager { .insert_local_sender(local_notification_tx) .await; - let check_period = Duration::from_secs(10); - let mut ticker = tokio::time::interval(check_period); - ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - ticker.reset(); - let worker_nodes = self .metadata_manager .list_active_streaming_compute_nodes() @@ -2819,7 +2856,7 @@ impl GlobalStreamManager { .map(|worker| (worker.id, worker)) .collect(); - let mut changed = true; + let mut should_trigger = false; loop { tokio::select! { @@ -2830,18 +2867,18 @@ impl GlobalStreamManager { break; } - _ = ticker.tick(), if changed => { + _ = ticker.tick(), if should_trigger => { let include_workers = worker_cache.keys().copied().collect_vec(); if include_workers.is_empty() { tracing::debug!("no available worker nodes"); - changed = false; + should_trigger = false; continue; } match self.trigger_parallelism_control().await { - Ok(_) => { - changed = false; + Ok(cont) => { + should_trigger = cont; } Err(e) => { tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs()); @@ -2855,13 +2892,26 @@ impl GlobalStreamManager { match notification { LocalNotification::WorkerNodeActivated(worker) => { + match (worker.get_type(), worker.property.as_ref()) { + (Ok(WorkerType::ComputeNode), Some(prop)) if prop.is_streaming => { + tracing::info!("worker {} activated notification received", worker.id); + } + _ => continue + } + let prev_worker = worker_cache.insert(worker.id, worker.clone()); - if let Some(prev_worker) = prev_worker && prev_worker.parallel_units != worker.parallel_units { - tracing::info!(worker = worker.id, "worker parallelism changed"); + match prev_worker { + Some(prev_worker) if prev_worker.parallel_units != worker.parallel_units => { + tracing::info!(worker = worker.id, "worker parallelism changed"); + should_trigger = true; + } + None => { + tracing::info!(worker = worker.id, "new worker joined"); + should_trigger = true; + } + _ => {} } - - changed = true; } // Since our logic for handling passive scale-in is within the barrier manager, diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index b4178e01f8786..f3d2f0559e998 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -189,6 +189,9 @@ impl Configuration { r#"[meta] max_heartbeat_interval_secs = {max_heartbeat_interval_secs} disable_automatic_parallelism_control = {disable_automatic_parallelism_control} +parallelism_control_trigger_first_delay_sec = 0 +parallelism_control_batch_size = 0 +parallelism_control_trigger_period_sec = 10 [system] barrier_interval_ms = 250