diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f3cd926b6802b..d395f42975d8e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -225,13 +225,9 @@ pub struct MetaConfig { #[serde(default)] pub disable_recovery: bool, - /// Whether to enable scale-in when recovery. + /// Whether to disable adaptive-scaling feature. #[serde(default)] - pub enable_scale_in_when_recovery: bool, - - /// Whether to enable auto-scaling feature. - #[serde(default)] - pub enable_automatic_parallelism_control: bool, + pub disable_automatic_parallelism_control: bool, #[serde(default = "default::meta::meta_leader_lease_secs")] pub meta_leader_lease_secs: u64, diff --git a/src/config/example.toml b/src/config/example.toml index 34937858db526..59c68aff3c7c0 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -25,8 +25,7 @@ enable_hummock_data_archive = false min_delta_log_num_for_hummock_version_checkpoint = 10 max_heartbeat_interval_secs = 300 disable_recovery = false -enable_scale_in_when_recovery = false -enable_automatic_parallelism_control = false +disable_automatic_parallelism_control = false meta_leader_lease_secs = 30 default_parallelism = "Full" enable_compaction_deterministic = false diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index af8190e1ca8d6..618420a89da14 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -277,10 +277,9 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { config.meta.meta_leader_lease_secs, MetaOpts { enable_recovery: !config.meta.disable_recovery, - enable_scale_in_when_recovery: config.meta.enable_scale_in_when_recovery, - enable_automatic_parallelism_control: config + disable_automatic_parallelism_control: config .meta - .enable_automatic_parallelism_control, + .disable_automatic_parallelism_control, in_flight_barrier_nums, max_idle_ms, compaction_deterministic_test: config.meta.enable_compaction_deterministic, diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 83a47a98effdf..8f9b2f3d313ac 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -706,7 +706,7 @@ pub async fn start_service_as_election_leader( sub_tasks.push(task); sub_tasks.push(GlobalBarrierManager::start(barrier_manager)); - if env.opts.enable_automatic_parallelism_control { + if !env.opts.disable_automatic_parallelism_control { sub_tasks.push(stream_manager.start_auto_parallelism_monitor()); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 88f0f22b0e07e..37c270da30c78 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -374,7 +374,7 @@ impl GlobalBarrierManagerContext { // Resolve actor info for recovery. If there's no actor to recover, most of the // following steps will be no-op, while the compute nodes will still be reset. - let mut info = if self.env.opts.enable_scale_in_when_recovery { + let mut info = if !self.env.opts.disable_automatic_parallelism_control { let info = self.resolve_actor_info().await; let scaled = self.scale_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "scale actors failed"); @@ -704,12 +704,21 @@ impl GlobalBarrierManagerContext { let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); + if info.actor_location_map.is_empty() { + debug!("empty cluster, skipping"); + return Ok(true); + } + let current_parallelism = info .node_map .values() .flat_map(|worker_node| worker_node.parallel_units.iter()) .count(); + if current_parallelism == 0 { + return Err(anyhow!("no available parallel units for auto scaling").into()); + } + /// We infer the new parallelism strategy based on the prior level of parallelism of the table. /// If the parallelism strategy is Fixed or Auto, we won't make any modifications. /// For Custom, we'll assess the parallelism of the core fragment; diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index dbffa250b4cf1..38e1ff3ca8f33 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -112,6 +112,7 @@ impl ClusterManager { let mut core = self.core.write().await; if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) { + tracing::info!("worker {} re-joined the cluster", worker.worker_id()); worker.update_resource(Some(resource)); worker.update_started_at(timestamp_now_sec()); if let Some(property) = &mut property { @@ -149,7 +150,7 @@ impl ClusterManager { new_worker.worker_node.parallel_units.extend(parallel_units); } Ordering::Greater => { - if self.env.opts.enable_scale_in_when_recovery { + if !self.env.opts.disable_automatic_parallelism_control { // Handing over to the subsequent recovery loop for a forced reschedule. tracing::info!( "worker {} parallelism reduced from {} to {}", @@ -233,12 +234,25 @@ impl ClusterManager { worker.insert(self.env.meta_store_checked()).await?; // Update core. core.add_worker_node(worker); + + tracing::info!( + "new worker {} from {}:{} joined cluster", + worker_id, + host_address.get_host(), + host_address.get_port() + ); + Ok(worker_node) } pub async fn activate_worker_node(&self, host_address: HostAddress) -> MetaResult<()> { let mut core = self.core.write().await; let mut worker = core.get_worker_by_host_checked(host_address.clone())?; + + let worker_id = worker.worker_id(); + + tracing::info!("worker {} activating", worker_id); + if worker.worker_node.state != State::Running as i32 { worker.worker_node.state = State::Running as i32; worker.insert(self.env.meta_store_checked()).await?; @@ -258,6 +272,8 @@ impl ClusterManager { .notify_local_subscribers(LocalNotification::WorkerNodeActivated(worker.worker_node)) .await; + tracing::info!("worker {} activated", worker_id); + Ok(()) } @@ -790,8 +806,11 @@ mod tests { async fn test_cluster_manager() -> MetaResult<()> { let env = MetaSrvEnv::for_test().await; - let cluster_manager = - Arc::new(ClusterManager::new(env, Duration::new(0, 0)).await.unwrap()); + let cluster_manager = Arc::new( + ClusterManager::new(env.clone(), Duration::new(0, 0)) + .await + .unwrap(), + ); let mut worker_nodes = Vec::new(); let worker_count = 5usize; @@ -878,8 +897,15 @@ mod tests { ) .await .unwrap(); - assert_eq!(worker_node.parallel_units.len(), fake_parallelism + 4); - assert_cluster_manager(&cluster_manager, parallel_count + 4).await; + + if !env.opts.disable_automatic_parallelism_control { + assert_eq!(worker_node.parallel_units.len(), fake_parallelism - 2); + assert_cluster_manager(&cluster_manager, parallel_count - 2).await; + } else { + // compatibility mode + assert_eq!(worker_node.parallel_units.len(), fake_parallelism + 4); + assert_cluster_manager(&cluster_manager, parallel_count + 4).await; + } let worker_to_delete_count = 4usize; for i in 0..worker_to_delete_count { diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index be287ac864860..f10138b56e81e 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -91,11 +91,8 @@ pub struct MetaOpts { /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on /// abnormal cases. pub enable_recovery: bool, - /// Whether to enable the scale-in feature when compute-node is removed. - pub enable_scale_in_when_recovery: bool, - /// Whether to enable the auto-scaling feature when compute-node is joined. - /// The semantics of this configuration will be expanded in the future to control the automatic scaling of the entire cluster. - pub enable_automatic_parallelism_control: bool, + /// Whether to disable the auto-scaling feature. + pub disable_automatic_parallelism_control: bool, /// The maximum number of barriers in-flight in the compute nodes. pub in_flight_barrier_nums: usize, /// After specified seconds of idle (no mview or flush), the process will be exited. @@ -223,8 +220,7 @@ impl MetaOpts { pub fn test(enable_recovery: bool) -> Self { Self { enable_recovery, - enable_scale_in_when_recovery: false, - enable_automatic_parallelism_control: false, + disable_automatic_parallelism_control: false, in_flight_barrier_nums: 40, max_idle_ms: 0, compaction_deterministic_test: false, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d73d38841322b..3a21c812086f7 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -187,6 +187,11 @@ pub fn rebalance_actor_vnode( actors_to_remove: &BTreeSet, actors_to_create: &BTreeSet, ) -> HashMap { + let actor_ids: BTreeSet<_> = actors.iter().map(|actor| actor.actor_id).collect(); + + assert_eq!(actors_to_remove.difference(&actor_ids).count(), 0); + assert_eq!(actors_to_create.intersection(&actor_ids).count(), 0); + assert!(actors.len() >= actors_to_remove.len()); let target_actor_count = actors.len() - actors_to_remove.len() + actors_to_create.len(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d7a35fc1d90f4..4d14f0caa2828 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -1093,7 +1093,9 @@ mod tests { let actor_locations = fragments .values() .flat_map(|f| &f.actors) - .map(|a| (a.actor_id, parallel_units[&0].clone())) + .sorted_by(|a, b| a.actor_id.cmp(&b.actor_id)) + .enumerate() + .map(|(idx, a)| (a.actor_id, parallel_units[&(idx as u32)].clone())) .collect(); Locations { @@ -1182,6 +1184,14 @@ mod tests { let table_id = TableId::new(0); let actors = make_mview_stream_actors(&table_id, 4); + let StreamingClusterInfo { parallel_units, .. } = services + .global_stream_manager + .metadata_manager + .get_streaming_cluster_info() + .await?; + + let parallel_unit_ids = parallel_units.keys().cloned().sorted().collect_vec(); + let mut fragments = BTreeMap::default(); fragments.insert( 0, @@ -1191,7 +1201,9 @@ mod tests { distribution_type: FragmentDistributionType::Hash as i32, actors: actors.clone(), state_table_ids: vec![0], - vnode_mapping: Some(ParallelUnitMapping::new_single(0).to_protobuf()), + vnode_mapping: Some( + ParallelUnitMapping::new_uniform(parallel_unit_ids.into_iter()).to_protobuf(), + ), ..Default::default() }, ); diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index dd591954d945a..b4178e01f8786 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -177,9 +177,10 @@ impl Configuration { pub fn for_auto_parallelism( max_heartbeat_interval_secs: u64, - enable_auto_scale_in: bool, enable_auto_parallelism: bool, ) -> Self { + let disable_automatic_parallelism_control = !enable_auto_parallelism; + let config_path = { let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); @@ -187,8 +188,7 @@ impl Configuration { let config_data = format!( r#"[meta] max_heartbeat_interval_secs = {max_heartbeat_interval_secs} -enable_scale_in_when_recovery = {enable_auto_scale_in} -enable_automatic_parallelism_control = {enable_auto_parallelism} +disable_automatic_parallelism_control = {disable_automatic_parallelism_control} [system] barrier_interval_ms = 250 diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index 9ec41df977c3b..4943516bb9ddc 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -32,7 +32,6 @@ async fn test_passive_online_and_offline() -> Result<()> { let config = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, - true, ); let mut cluster = Cluster::start(config.clone()).await?; let mut session = cluster.start_session(); @@ -215,7 +214,6 @@ async fn test_passive_online_and_offline() -> Result<()> { async fn test_active_online() -> Result<()> { let config = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, - false, true, ); let mut cluster = Cluster::start(config.clone()).await?; @@ -302,7 +300,6 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper( ) -> Result<()> { let config = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, - true, enable_auto_parallelism_control, ); let mut cluster = Cluster::start(config.clone()).await?; @@ -490,7 +487,6 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper( async fn test_compatibility_with_low_level() -> Result<()> { let config = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, - false, true, ); let mut cluster = Cluster::start(config.clone()).await?; @@ -632,7 +628,6 @@ async fn test_compatibility_with_low_level() -> Result<()> { async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> { let config = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, - false, true, ); let mut cluster = Cluster::start(config.clone()).await?;