diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ae92218803503..bba8f50d5af68 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -208,6 +208,10 @@ pub struct MetaConfig { #[serde(default)] pub disable_recovery: bool, + /// Whether to enable scale-in when recovery. + #[serde(default)] + pub enable_scale_in_when_recovery: bool, + #[serde(default = "default::meta::meta_leader_lease_secs")] pub meta_leader_lease_secs: u64, diff --git a/src/config/example.toml b/src/config/example.toml index 93d6a0820102b..61d9e23544a05 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -22,6 +22,7 @@ hummock_version_checkpoint_interval_sec = 30 min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 300 disable_recovery = false +enable_scale_in_when_recovery = false meta_leader_lease_secs = 30 default_parallelism = "Full" enable_compaction_deterministic = false diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 2737c35e68eb2..dfd3cdf77d4b1 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -249,6 +249,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { config.meta.meta_leader_lease_secs, MetaOpts { enable_recovery: !config.meta.disable_recovery, + enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery, in_flight_barrier_nums, max_idle_ms, compaction_deterministic_test: config.meta.enable_compaction_deterministic, diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 676180adc7581..c054b9b18bd6d 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use risingwave_meta::stream::{ScaleController, ScaleControllerRef}; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; use risingwave_pb::meta::{ @@ -35,6 +38,7 @@ pub struct ScaleServiceImpl { catalog_manager: CatalogManagerRef, stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, + scale_controller: ScaleControllerRef, } impl ScaleServiceImpl { @@ -46,6 +50,12 @@ impl ScaleServiceImpl { stream_manager: GlobalStreamManagerRef, barrier_manager: BarrierManagerRef, ) -> Self { + let scale_controller = Arc::new(ScaleController::new( + fragment_manager.clone(), + cluster_manager.clone(), + source_manager.clone(), + stream_manager.env.clone(), + )); Self { fragment_manager, cluster_manager, @@ -53,6 +63,7 @@ impl ScaleServiceImpl { catalog_manager, stream_manager, barrier_manager, + scale_controller, } } } @@ -203,7 +214,7 @@ impl ScaleService for ScaleServiceImpl { .policy .ok_or_else(|| Status::invalid_argument("policy is required"))?; - let plan = self.stream_manager.get_reschedule_plan(policy).await?; + let plan = self.scale_controller.get_reschedule_plan(policy).await?; let next_revision = self.fragment_manager.get_revision().await; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 5ea20daccb85b..bc2692f4bad2a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -39,7 +39,9 @@ use crate::barrier::CommandChanges; use crate::hummock::HummockManagerRef; use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; -use crate::stream::{build_actor_connector_splits, SourceManagerRef, SplitAssignment}; +use crate::stream::{ + build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment, +}; use crate::MetaResult; /// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors @@ -238,6 +240,8 @@ pub struct CommandContext { source_manager: SourceManagerRef, + scale_controller: ScaleControllerRef, + /// The tracing span of this command. /// /// Differs from [`TracedEpoch`], this span focuses on the lifetime of the corresponding @@ -260,6 +264,7 @@ impl CommandContext { command: Command, kind: BarrierKind, source_manager: SourceManagerRef, + scale_controller: ScaleControllerRef, span: tracing::Span, ) -> Self { Self { @@ -274,6 +279,7 @@ impl CommandContext { command, kind, source_manager, + scale_controller, span, } } @@ -760,60 +766,11 @@ impl CommandContext { } Command::RescheduleFragment { reschedules } => { - let mut node_dropped_actors = HashMap::new(); - for table_fragments in self - .fragment_manager - .get_fragment_read_guard() - .await - .table_fragments() - .values() - { - for fragment_id in table_fragments.fragments.keys() { - if let Some(reschedule) = reschedules.get(fragment_id) { - for actor_id in &reschedule.removed_actors { - let node_id = table_fragments - .actor_status - .get(actor_id) - .unwrap() - .parallel_unit - .as_ref() - .unwrap() - .worker_node_id; - node_dropped_actors - .entry(node_id as WorkerId) - .or_insert(vec![]) - .push(*actor_id as ActorId); - } - } - } - } - self.clean_up(node_dropped_actors).await?; - - // Update fragment info after rescheduling in meta store. - self.fragment_manager - .post_apply_reschedules(reschedules.clone()) + let node_dropped_actors = self + .scale_controller + .post_apply_reschedule(reschedules) .await?; - - let mut stream_source_actor_splits = HashMap::new(); - let mut stream_source_dropped_actors = HashSet::new(); - - for (fragment_id, reschedule) in reschedules { - if !reschedule.actor_splits.is_empty() { - stream_source_actor_splits - .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone()); - stream_source_dropped_actors.extend(reschedule.removed_actors.clone()); - } - } - - if !stream_source_actor_splits.is_empty() { - self.source_manager - .apply_source_change( - None, - Some(stream_source_actor_splits), - Some(stream_source_dropped_actors), - ) - .await; - } + self.clean_up(node_dropped_actors).await?; } Command::ReplaceTable { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 654c6359ca1bf..acd90dce8f522 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -60,7 +60,7 @@ use crate::manager::{ }; use crate::model::{ActorId, BarrierManagerState, TableFragments}; use crate::rpc::metrics::MetaMetrics; -use crate::stream::SourceManagerRef; +use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef}; use crate::{MetaError, MetaResult}; mod command; @@ -176,6 +176,8 @@ pub struct GlobalBarrierManager { source_manager: SourceManagerRef, + scale_controller: ScaleControllerRef, + sink_manager: SinkCoordinatorManager, metrics: Arc, @@ -529,6 +531,12 @@ impl GlobalBarrierManager { let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; let tracker = CreateMviewProgressTracker::new(); + let scale_controller = Arc::new(ScaleController::new( + fragment_manager.clone(), + cluster_manager.clone(), + source_manager.clone(), + env.clone(), + )); Self { enable_recovery, status: Mutex::new(BarrierManagerStatus::Starting), @@ -539,6 +547,7 @@ impl GlobalBarrierManager { fragment_manager, hummock_manager, source_manager, + scale_controller, sink_manager, metrics, env, @@ -733,6 +742,7 @@ impl GlobalBarrierManager { command, kind, self.source_manager.clone(), + self.scale_controller.clone(), span.clone(), )); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 3e319f0e69a52..fc18ba4fbb612 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -21,6 +21,7 @@ use futures::future::try_join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::get_reschedule_plan_request::{PbWorkerChanges, StableResizePolicy}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation}; use risingwave_pb::stream_plan::AddMutation; @@ -40,7 +41,7 @@ use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; use crate::manager::WorkerId; use crate::model::{BarrierManagerState, MigrationPlan}; -use crate::stream::build_actor_connector_splits; +use crate::stream::{build_actor_connector_splits, RescheduleOptions}; use crate::MetaResult; impl GlobalBarrierManager { @@ -254,12 +255,21 @@ impl GlobalBarrierManager { // following steps will be no-op, while the compute nodes will still be reset. let mut info = self.resolve_actor_info_for_recovery().await; - // Migrate actors in expired CN to newly joined one. - let migrated = self.migrate_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "migrate actors failed"); - })?; - if migrated { - info = self.resolve_actor_info_for_recovery().await; + if self.env.opts.enable_scale_in_when_recovery { + let scaled = self.scale_actors(&info).await.inspect_err(|err| { + warn!(err = ?err, "scale actors failed"); + })?; + if scaled { + info = self.resolve_actor_info_for_recovery().await; + } + } else { + // Migrate actors in expired CN to newly joined one. + let migrated = self.migrate_actors(&info).await.inspect_err(|err| { + warn!(err = ?err, "migrate actors failed"); + })?; + if migrated { + info = self.resolve_actor_info_for_recovery().await; + } } // Reset all compute nodes, stop and drop existing actors. @@ -301,6 +311,7 @@ impl GlobalBarrierManager { command, BarrierKind::Initial, self.source_manager.clone(), + self.scale_controller.clone(), tracing::Span::current(), // recovery span )); @@ -386,6 +397,86 @@ impl GlobalBarrierManager { Ok(true) } + async fn scale_actors(&self, info: &BarrierActorInfo) -> MetaResult { + debug!("start scaling-in offline actors."); + + let expired_workers: HashSet = info + .actor_map + .iter() + .filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker)) + .map(|(&worker, _)| worker) + .collect(); + + if expired_workers.is_empty() { + debug!("no expired workers, skipping."); + return Ok(false); + } + + let all_worker_parallel_units = self.fragment_manager.all_worker_parallel_units().await; + + let expired_worker_parallel_units: HashMap<_, _> = all_worker_parallel_units + .into_iter() + .filter(|(worker, _)| expired_workers.contains(worker)) + .collect(); + + let fragment_worker_changes = { + let guard = self.fragment_manager.get_fragment_read_guard().await; + let mut policy = HashMap::new(); + for table_fragments in guard.table_fragments().values() { + for fragment_id in table_fragments.fragment_ids() { + policy.insert( + fragment_id, + PbWorkerChanges { + exclude_worker_ids: expired_workers.iter().cloned().collect(), + ..Default::default() + }, + ); + } + } + policy + }; + + let plan = self + .scale_controller + .generate_stable_resize_plan( + StableResizePolicy { + fragment_worker_changes, + }, + Some(expired_worker_parallel_units), + ) + .await?; + + let (reschedule_fragment, applied_reschedules) = self + .scale_controller + .prepare_reschedule_command( + plan, + RescheduleOptions { + resolve_no_shuffle_upstream: true, + }, + ) + .await?; + + if let Err(e) = self + .scale_controller + .post_apply_reschedule(&reschedule_fragment) + .await + { + tracing::error!( + "failed to apply reschedule for offline scaling in recovery: {}", + e.to_string() + ); + + self.fragment_manager + .cancel_apply_reschedules(applied_reschedules) + .await; + + return Err(e); + } + + debug!("scaling-in actors succeed."); + Ok(true) + } + /// This function will generate a migration plan, which includes the mapping for all expired and /// in-used parallel unit to a new one. async fn generate_migration_plan( diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 28d8200c73ea5..c9c22ff438cf8 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -81,6 +81,8 @@ pub struct MetaOpts { /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on /// abnormal cases. pub enable_recovery: bool, + /// Whether to enable the scale-in feature when compute-node is removed. + pub enable_scale_in_when_recovery: bool, /// The maximum number of barriers in-flight in the compute nodes. pub in_flight_barrier_nums: usize, /// After specified seconds of idle (no mview or flush), the process will be exited. @@ -174,6 +176,7 @@ impl MetaOpts { pub fn test(enable_recovery: bool) -> Self { Self { enable_recovery, + enable_scale_in_when_recovery: false, in_flight_barrier_nums: 40, max_idle_ms: 0, compaction_deterministic_test: false, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 582c6585f5ba8..99a4d6fc76e92 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -15,9 +15,9 @@ use std::cmp::{min, Ordering}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::iter::repeat; +use std::sync::Arc; use anyhow::{anyhow, Context}; -use futures::future::BoxFuture; use itertools::Itertools; use num_integer::Integer; use num_traits::abs; @@ -37,11 +37,11 @@ use risingwave_pb::stream_service::{ }; use uuid::Uuid; -use crate::barrier::{Command, Reschedule}; -use crate::manager::{IdCategory, WorkerId}; +use crate::barrier::Reschedule; +use crate::manager::{ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments}; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; -use crate::stream::GlobalStreamManager; +use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] @@ -92,11 +92,6 @@ pub struct ParallelUnitReschedule { pub removed_parallel_units: BTreeSet, } -#[derive(Debug, Clone, Copy)] -pub struct RescheduleOptions { - pub resolve_no_shuffle_upstream: bool, -} - pub struct RescheduleContext { /// Index used to map `ParallelUnitId` to `WorkerId` parallel_unit_id_to_worker_id: BTreeMap, @@ -346,7 +341,39 @@ pub fn rebalance_actor_vnode( result } -impl GlobalStreamManager { +#[derive(Debug, Clone, Copy)] +pub struct RescheduleOptions { + /// Whether to resolve the upstream of NoShuffle when scaling. It will check whether all the reschedules in the no shuffle dependency tree are corresponding, and rewrite them to the root of the no shuffle dependency tree. + pub resolve_no_shuffle_upstream: bool, +} + +pub type ScaleControllerRef = Arc; + +pub struct ScaleController { + pub(super) fragment_manager: FragmentManagerRef, + + pub cluster_manager: ClusterManagerRef, + + pub source_manager: SourceManagerRef, + + pub env: MetaSrvEnv, +} + +impl ScaleController { + pub fn new( + fragment_manager: FragmentManagerRef, + cluster_manager: ClusterManagerRef, + source_manager: SourceManagerRef, + env: MetaSrvEnv, + ) -> Self { + Self { + fragment_manager, + cluster_manager, + source_manager, + env, + } + } + /// Build the context for rescheduling and do some validation for the request. async fn build_reschedule_context( &self, @@ -622,123 +649,68 @@ impl GlobalStreamManager { }) } - fn resolve_no_shuffle_upstream( - reschedule: &mut HashMap, - fragment_map: &HashMap, - no_shuffle_source_fragment_ids: &HashSet, - no_shuffle_target_fragment_ids: &HashSet, - ) -> MetaResult<()> - where - T: Clone + Eq, - { - let mut queue: VecDeque = reschedule.keys().cloned().collect(); - - // We trace the upstreams of each downstream under the hierarchy until we reach the top - // for every no_shuffle relation. - while let Some(fragment_id) = queue.pop_front() { - if !no_shuffle_target_fragment_ids.contains(&fragment_id) - && !no_shuffle_source_fragment_ids.contains(&fragment_id) - { - continue; - } - - // for upstream - for upstream_fragment_id in &fragment_map - .get(&fragment_id) - .unwrap() - .upstream_fragment_ids - { - if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) { - continue; - } - - let reschedule_plan = reschedule.get(&fragment_id).unwrap(); - - if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) { - if upstream_reschedule_plan != reschedule_plan { - bail!("Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}", fragment_id, upstream_fragment_id); - } + async fn create_actors_on_compute_node( + &self, + worker_nodes: &HashMap, + actor_infos_to_broadcast: BTreeMap, + node_actors_to_create: HashMap>, + broadcast_worker_ids: HashSet, + ) -> MetaResult<()> { + for worker_id in &broadcast_worker_ids { + let node = worker_nodes.get(worker_id).unwrap(); + let client = self.env.stream_client_pool().get(node).await?; - continue; - } + let actor_infos_to_broadcast = actor_infos_to_broadcast.values().cloned().collect(); - reschedule.insert(*upstream_fragment_id, reschedule_plan.clone()); - queue.push_back(*upstream_fragment_id); - } + client + .to_owned() + .broadcast_actor_info_table(BroadcastActorInfoTableRequest { + info: actor_infos_to_broadcast, + }) + .await?; } - reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id)); - - Ok(()) - } + for (node_id, stream_actors) in &node_actors_to_create { + let node = worker_nodes.get(node_id).unwrap(); + let client = self.env.stream_client_pool().get(node).await?; + let request_id = Uuid::new_v4().to_string(); + let request = UpdateActorsRequest { + request_id, + actors: stream_actors.clone(), + }; - fn build_fragment_dispatcher_index( - actor_map: &HashMap, - fragment_dispatcher_map: &mut HashMap>, - ) { - for actor in actor_map.values() { - for dispatcher in &actor.dispatcher { - for downstream_actor_id in &dispatcher.downstream_actor_id { - if let Some(downstream_actor) = actor_map.get(downstream_actor_id) { - fragment_dispatcher_map - .entry(actor.fragment_id as FragmentId) - .or_default() - .insert( - downstream_actor.fragment_id as FragmentId, - dispatcher.r#type(), - ); - } - } - } + client.to_owned().update_actors(request).await?; } - } - fn build_no_shuffle_relation_index( - actor_map: &HashMap, - no_shuffle_source_fragment_ids: &mut HashSet, - no_shuffle_target_fragment_ids: &mut HashSet, - ) { - for actor in actor_map.values() { - for dispatcher in &actor.dispatcher { - for downstream_actor_id in &dispatcher.downstream_actor_id { - if let Some(downstream_actor) = actor_map.get(downstream_actor_id) { - // Checking for no shuffle dispatchers - if dispatcher.r#type() == DispatcherType::NoShuffle { - no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId); - no_shuffle_target_fragment_ids - .insert(downstream_actor.fragment_id as FragmentId); - } - } - } - } - } - } + for (node_id, stream_actors) in node_actors_to_create { + let node = worker_nodes.get(&node_id).unwrap(); + let client = self.env.stream_client_pool().get(node).await?; + let request_id = Uuid::new_v4().to_string(); - pub async fn reschedule_actors( - &self, - reschedules: HashMap, - options: RescheduleOptions, - ) -> MetaResult<()> { - let mut revert_funcs = vec![]; - if let Err(e) = self - .reschedule_actors_impl(&mut revert_funcs, reschedules, options) - .await - { - for revert_func in revert_funcs.into_iter().rev() { - revert_func.await; - } - return Err(e); + client + .to_owned() + .build_actors(BuildActorsRequest { + request_id, + actor_id: stream_actors + .iter() + .map(|stream_actor| stream_actor.actor_id) + .collect(), + }) + .await?; } Ok(()) } - async fn reschedule_actors_impl( + // Results are the generated reschedule plan and the changes that need to be updated to the meta store. + pub(crate) async fn prepare_reschedule_command( &self, - revert_funcs: &mut Vec>, mut reschedules: HashMap, options: RescheduleOptions, - ) -> MetaResult<()> { + ) -> MetaResult<( + HashMap, + HashMap>, + )> { let ctx = self .build_reschedule_context(&mut reschedules, options) .await?; @@ -1297,78 +1269,7 @@ impl GlobalStreamManager { .pre_apply_reschedules(fragment_created_actors) .await; - let fragment_manager_ref = self.fragment_manager.clone(); - - revert_funcs.push(Box::pin(async move { - fragment_manager_ref - .cancel_apply_reschedules(applied_reschedules) - .await; - })); - - let _source_pause_guard = self.source_manager.paused.lock().await; - - tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); - - self.barrier_scheduler - .run_config_change_command_with_pause(Command::RescheduleFragment { - reschedules: reschedule_fragment, - }) - .await?; - - Ok(()) - } - - async fn create_actors_on_compute_node( - &self, - worker_nodes: &HashMap, - actor_infos_to_broadcast: BTreeMap, - node_actors_to_create: HashMap>, - broadcast_worker_ids: HashSet, - ) -> MetaResult<()> { - for worker_id in &broadcast_worker_ids { - let node = worker_nodes.get(worker_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - - let actor_infos_to_broadcast = actor_infos_to_broadcast.values().cloned().collect(); - - client - .to_owned() - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { - info: actor_infos_to_broadcast, - }) - .await?; - } - - for (node_id, stream_actors) in &node_actors_to_create { - let node = worker_nodes.get(node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); - let request = UpdateActorsRequest { - request_id, - actors: stream_actors.clone(), - }; - - client.to_owned().update_actors(request).await?; - } - - for (node_id, stream_actors) in node_actors_to_create { - let node = worker_nodes.get(&node_id).unwrap(); - let client = self.env.stream_client_pool().get(node).await?; - let request_id = Uuid::new_v4().to_string(); - - client - .to_owned() - .build_actors(BuildActorsRequest { - request_id, - actor_id: stream_actors - .iter() - .map(|stream_actor| stream_actor.actor_id) - .collect(), - }) - .await?; - } - - Ok(()) + Ok((reschedule_fragment, applied_reschedules)) } async fn arrange_reschedules( @@ -1593,12 +1494,72 @@ impl GlobalStreamManager { Ok(()) } -} -impl GlobalStreamManager { - async fn generate_stable_resize_plan( + pub async fn post_apply_reschedule( + &self, + reschedules: &HashMap, + ) -> MetaResult>> { + let mut node_dropped_actors = HashMap::new(); + for table_fragments in self + .fragment_manager + .get_fragment_read_guard() + .await + .table_fragments() + .values() + { + for fragment_id in table_fragments.fragments.keys() { + if let Some(reschedule) = reschedules.get(fragment_id) { + for actor_id in &reschedule.removed_actors { + let node_id = table_fragments + .actor_status + .get(actor_id) + .unwrap() + .parallel_unit + .as_ref() + .unwrap() + .worker_node_id; + node_dropped_actors + .entry(node_id as WorkerId) + .or_insert(vec![]) + .push(*actor_id as ActorId); + } + } + } + } + + // Update fragment info after rescheduling in meta store. + self.fragment_manager + .post_apply_reschedules(reschedules.clone()) + .await?; + + let mut stream_source_actor_splits = HashMap::new(); + let mut stream_source_dropped_actors = HashSet::new(); + + for (fragment_id, reschedule) in reschedules { + if !reschedule.actor_splits.is_empty() { + stream_source_actor_splits + .insert(*fragment_id as FragmentId, reschedule.actor_splits.clone()); + stream_source_dropped_actors.extend(reschedule.removed_actors.clone()); + } + } + + if !stream_source_actor_splits.is_empty() { + self.source_manager + .apply_source_change( + None, + Some(stream_source_actor_splits), + Some(stream_source_dropped_actors), + ) + .await; + } + + Ok(node_dropped_actors) + } + + pub async fn generate_stable_resize_plan( &self, policy: StableResizePolicy, + parallel_unit_hints: Option>>, ) -> MetaResult> { let StableResizePolicy { fragment_worker_changes, @@ -1742,7 +1703,12 @@ impl GlobalStreamManager { } for worker_id in include_worker_ids.iter().chain(exclude_worker_ids.iter()) { - if !worker_parallel_units.contains_key(worker_id) { + if !worker_parallel_units.contains_key(worker_id) + && !parallel_unit_hints + .as_ref() + .map(|hints| hints.contains_key(worker_id)) + .unwrap_or(false) + { bail!("Worker id {} not found", worker_id); } } @@ -1759,17 +1725,25 @@ impl GlobalStreamManager { }) .collect(); - let include_worker_parallel_unit_ids = include_worker_ids - .iter() - .flat_map(|worker_id| worker_parallel_units.get(worker_id).unwrap()) - .cloned() - .collect_vec(); + let worker_to_parallel_unit_ids = |worker_ids: &BTreeSet| { + worker_ids + .iter() + .flat_map(|worker_id| { + worker_parallel_units + .get(worker_id) + .or_else(|| { + parallel_unit_hints + .as_ref() + .and_then(|hints| hints.get(worker_id)) + }) + .expect("worker id should be valid") + }) + .cloned() + .collect_vec() + }; - let exclude_worker_parallel_unit_ids = exclude_worker_ids - .iter() - .flat_map(|worker_id| worker_parallel_units.get(worker_id).unwrap()) - .cloned() - .collect_vec(); + let include_worker_parallel_unit_ids = worker_to_parallel_unit_ids(&include_worker_ids); + let exclude_worker_parallel_unit_ids = worker_to_parallel_unit_ids(&exclude_worker_ids); fn refilter_parallel_unit_id_by_target_parallelism( worker_parallel_units: &HashMap>, @@ -1920,7 +1894,101 @@ impl GlobalStreamManager { policy: Policy, ) -> MetaResult> { match policy { - Policy::StableResizePolicy(resize) => self.generate_stable_resize_plan(resize).await, + Policy::StableResizePolicy(resize) => { + self.generate_stable_resize_plan(resize, None).await + } } } + + pub fn build_no_shuffle_relation_index( + actor_map: &HashMap, + no_shuffle_source_fragment_ids: &mut HashSet, + no_shuffle_target_fragment_ids: &mut HashSet, + ) { + for actor in actor_map.values() { + for dispatcher in &actor.dispatcher { + for downstream_actor_id in &dispatcher.downstream_actor_id { + if let Some(downstream_actor) = actor_map.get(downstream_actor_id) { + // Checking for no shuffle dispatchers + if dispatcher.r#type() == DispatcherType::NoShuffle { + no_shuffle_source_fragment_ids.insert(actor.fragment_id as FragmentId); + no_shuffle_target_fragment_ids + .insert(downstream_actor.fragment_id as FragmentId); + } + } + } + } + } + } + + pub fn build_fragment_dispatcher_index( + actor_map: &HashMap, + fragment_dispatcher_map: &mut HashMap>, + ) { + for actor in actor_map.values() { + for dispatcher in &actor.dispatcher { + for downstream_actor_id in &dispatcher.downstream_actor_id { + if let Some(downstream_actor) = actor_map.get(downstream_actor_id) { + fragment_dispatcher_map + .entry(actor.fragment_id as FragmentId) + .or_default() + .insert( + downstream_actor.fragment_id as FragmentId, + dispatcher.r#type(), + ); + } + } + } + } + } + + pub fn resolve_no_shuffle_upstream( + reschedule: &mut HashMap, + fragment_map: &HashMap, + no_shuffle_source_fragment_ids: &HashSet, + no_shuffle_target_fragment_ids: &HashSet, + ) -> MetaResult<()> + where + T: Clone + Eq, + { + let mut queue: VecDeque = reschedule.keys().cloned().collect(); + + // We trace the upstreams of each downstream under the hierarchy until we reach the top + // for every no_shuffle relation. + while let Some(fragment_id) = queue.pop_front() { + if !no_shuffle_target_fragment_ids.contains(&fragment_id) + && !no_shuffle_source_fragment_ids.contains(&fragment_id) + { + continue; + } + + // for upstream + for upstream_fragment_id in &fragment_map + .get(&fragment_id) + .unwrap() + .upstream_fragment_ids + { + if !no_shuffle_source_fragment_ids.contains(upstream_fragment_id) { + continue; + } + + let reschedule_plan = reschedule.get(&fragment_id).unwrap(); + + if let Some(upstream_reschedule_plan) = reschedule.get(upstream_fragment_id) { + if upstream_reschedule_plan != reschedule_plan { + bail!("Inconsistent NO_SHUFFLE plan, check target worker ids of fragment {} and {}", fragment_id, upstream_fragment_id); + } + + continue; + } + + reschedule.insert(*upstream_fragment_id, reschedule_plan.clone()); + queue.push_back(*upstream_fragment_id); + } + } + + reschedule.retain(|fragment_id, _| !no_shuffle_target_fragment_ids.contains(fragment_id)); + + Ok(()) + } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 184ca096734e1..acd874f4bdc00 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -29,11 +29,13 @@ use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; use uuid::Uuid; -use super::Locations; +use super::{ + Locations, ParallelUnitReschedule, RescheduleOptions, ScaleController, ScaleControllerRef, +}; use crate::barrier::{BarrierScheduler, Command}; use crate::hummock::HummockManagerRef; use crate::manager::{ClusterManagerRef, FragmentManagerRef, MetaSrvEnv}; -use crate::model::{ActorId, TableFragments}; +use crate::model::{ActorId, FragmentId, TableFragments}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -195,6 +197,8 @@ pub struct GlobalStreamManager { hummock_manager: HummockManagerRef, pub reschedule_lock: RwLock<()>, + + pub(crate) scale_controller: ScaleControllerRef, } impl GlobalStreamManager { @@ -206,6 +210,12 @@ impl GlobalStreamManager { source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, ) -> MetaResult { + let scale_controller = Arc::new(ScaleController::new( + fragment_manager.clone(), + cluster_manager.clone(), + source_manager.clone(), + env.clone(), + )); Ok(Self { env, fragment_manager, @@ -215,6 +225,7 @@ impl GlobalStreamManager { hummock_manager, creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), reschedule_lock: RwLock::new(()), + scale_controller, }) } @@ -641,6 +652,61 @@ impl GlobalStreamManager { } } +impl GlobalStreamManager { + pub async fn reschedule_actors( + &self, + reschedules: HashMap, + options: RescheduleOptions, + ) -> MetaResult<()> { + let mut revert_funcs = vec![]; + if let Err(e) = self + .reschedule_actors_impl(&mut revert_funcs, reschedules, options) + .await + { + for revert_func in revert_funcs.into_iter().rev() { + revert_func.await; + } + return Err(e); + } + + Ok(()) + } + + async fn reschedule_actors_impl( + &self, + revert_funcs: &mut Vec>, + reschedules: HashMap, + options: RescheduleOptions, + ) -> MetaResult<()> { + let (reschedule_fragment, applied_reschedules) = self + .scale_controller + .prepare_reschedule_command(reschedules, options) + .await?; + + tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); + + let command = Command::RescheduleFragment { + reschedules: reschedule_fragment, + }; + + let fragment_manager_ref = self.fragment_manager.clone(); + + revert_funcs.push(Box::pin(async move { + fragment_manager_ref + .cancel_apply_reschedules(applied_reschedules) + .await; + })); + + let _source_pause_guard = self.source_manager.paused.lock().await; + + self.barrier_scheduler + .run_config_change_command_with_pause(command) + .await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use std::collections::{BTreeMap, HashMap, HashSet}; diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 202e2f63ed2da..d4947e225008a 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -114,6 +114,27 @@ impl Configuration { } } + pub fn for_auto_scale() -> Self { + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("risingwave-auto-scale.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 3, + meta_nodes: 1, + compactor_nodes: 1, + compute_node_cores: 2, + etcd_timeout_rate: 0.0, + etcd_data_path: None, + } + } + /// Returns the config for backfill test. pub fn for_backfill() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted @@ -585,6 +606,23 @@ impl Cluster { .await; } + #[cfg_or_panic(madsim)] + pub async fn kill_nodes_and_restart( + &self, + nodes: impl IntoIterator>, + restart_delay_secs: u32, + ) { + join_all(nodes.into_iter().map(|name| async move { + let name = name.as_ref(); + tracing::info!("kill {name}"); + Handle::current().kill(name); + tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await; + tracing::info!("restart {name}"); + Handle::current().restart(name); + })) + .await; + } + /// Create a node for kafka producer and prepare data. #[cfg_or_panic(madsim)] pub async fn create_kafka_producer(&self, datadir: &str) { diff --git a/src/tests/simulation/src/risingwave-auto-scale.toml b/src/tests/simulation/src/risingwave-auto-scale.toml new file mode 100644 index 0000000000000..f2ffb036df4f2 --- /dev/null +++ b/src/tests/simulation/src/risingwave-auto-scale.toml @@ -0,0 +1,17 @@ +# The configuration for scaling simulation test. +# +# Note: this file is embedded in the binary and cannot be changed without recompiling. + +[meta] +# a relatively small number to make it easier to timeout +max_heartbeat_interval_secs = 15 +meta_leader_lease_secs = 10 +enable_scale_in_when_recovery = true + +[system] +barrier_interval_ms = 250 +checkpoint_frequency = 4 + +[server] +telemetry_enabled = false +metrics_level = "Disabled" diff --git a/src/tests/simulation/tests/integration_tests/recovery/mod.rs b/src/tests/simulation/tests/integration_tests/recovery/mod.rs index 2430daad760a1..d3c5572c8dea0 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/mod.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/mod.rs @@ -16,3 +16,4 @@ mod backfill; mod background_ddl; mod nexmark_recovery; mod pause_on_bootstrap; +mod scale_in_when_recovery; diff --git a/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs b/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs new file mode 100644 index 0000000000000..cfbce605d31d0 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/recovery/scale_in_when_recovery.rs @@ -0,0 +1,166 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Result; +use itertools::Itertools; +use risingwave_pb::common::{WorkerNode, WorkerType}; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; +use risingwave_simulation::utils::AssertResult; +use tokio::time::sleep; + +#[tokio::test] +async fn test_scale_in_when_recovery() -> Result<()> { + let config = Configuration::for_auto_scale(); + let mut cluster = Cluster::start(config.clone()).await?; + let mut session = cluster.start_session(); + + session.run("create table t (v1 int);").await?; + session + .run("create materialized view m as select count(*) from t;") + .await?; + + session + .run("insert into t select * from generate_series(1, 100)") + .await?; + session.run("flush").await?; + + sleep(Duration::from_secs(5)).await; + + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + no_identity_contains("simpleAgg"), + ]) + .await?; + + let single_agg_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("simpleAgg"), + identity_contains("materialize"), + ]) + .await?; + + let (_, single_used_parallel_unit_ids) = single_agg_fragment.parallel_unit_usage(); + + let used_parallel_unit_id = single_used_parallel_unit_ids.iter().next().unwrap(); + + let mut workers: Vec = cluster + .get_cluster_info() + .await? + .worker_nodes + .into_iter() + .filter(|worker| worker.r#type() == WorkerType::ComputeNode) + .collect(); + + let prev_workers = workers + .extract_if(|worker| { + worker + .parallel_units + .iter() + .map(|parallel_unit| parallel_unit.id) + .contains(used_parallel_unit_id) + }) + .collect_vec(); + + let prev_worker = prev_workers.into_iter().exactly_one().unwrap(); + let host = prev_worker.host.unwrap().host; + let host_name = format!("compute-{}", host.split('.').last().unwrap()); + + let (all_parallel_units, used_parallel_units) = table_mat_fragment.parallel_unit_usage(); + + assert_eq!(all_parallel_units.len(), used_parallel_units.len()); + + let initialized_parallelism = used_parallel_units.len(); + + assert_eq!( + initialized_parallelism, + config.compute_nodes * config.compute_node_cores + ); + + // ensure the restart delay is longer than config in `risingwave-auto-scale.toml` + let restart_delay = 30; + + cluster + .kill_nodes_and_restart(vec![host_name], restart_delay) + .await; + + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + no_identity_contains("simpleAgg"), + ]) + .await?; + + let (_, used_parallel_units) = table_mat_fragment.parallel_unit_usage(); + + assert_eq!( + initialized_parallelism - config.compute_node_cores, + used_parallel_units.len() + ); + + let stream_scan_fragment = cluster + .locate_one_fragment(vec![identity_contains("streamTableScan")]) + .await?; + + let (_, used_parallel_units) = stream_scan_fragment.parallel_unit_usage(); + + assert_eq!( + initialized_parallelism - config.compute_node_cores, + used_parallel_units.len() + ); + + let single_agg_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("simpleAgg"), + identity_contains("materialize"), + ]) + .await?; + + let (_, used_parallel_units_ids) = single_agg_fragment.parallel_unit_usage(); + + assert_eq!(used_parallel_units_ids.len(), 1); + + assert_ne!(single_used_parallel_unit_ids, used_parallel_units_ids); + + session + .run("select count(*) from t") + .await? + .assert_result_eq("100"); + + session + .run("select * from m") + .await? + .assert_result_eq("100"); + + session + .run("INSERT INTO t select * from generate_series(101, 150)") + .await?; + + session.run("flush").await?; + + session + .run("select count(*) from t") + .await? + .assert_result_eq("150"); + + session + .run("select * from m") + .await? + .assert_result_eq("150"); + + Ok(()) +}