From 4accc4a6c641871d73095443875d0f0b1c573d9e Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Fri, 6 Sep 2024 14:46:54 +0800 Subject: [PATCH] feat: introduce cluster limit (#18383) --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + proto/meta.proto | 27 ++++ src/common/src/config.rs | 18 +++ src/common/src/session_config/mod.rs | 6 + src/common/src/util/cluster_limit.rs | 134 ++++++++++++++++++ src/common/src/util/mod.rs | 1 + src/config/example.toml | 2 + .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_worker_actor_count.rs | 31 ++++ src/frontend/src/handler/create_mv.rs | 3 + src/frontend/src/handler/create_sink.rs | 2 + src/frontend/src/handler/create_table.rs | 2 + src/frontend/src/meta_client.rs | 7 + src/frontend/src/session.rs | 44 +++++- src/frontend/src/test_utils.rs | 7 +- src/meta/node/src/lib.rs | 8 ++ src/meta/node/src/server.rs | 6 +- src/meta/service/src/cluster_limit_service.rs | 107 ++++++++++++++ src/meta/service/src/lib.rs | 1 + src/meta/src/manager/env.rs | 28 ++-- src/rpc_client/src/meta_client.rs | 15 +- 21 files changed, 439 insertions(+), 12 deletions(-) create mode 100644 src/common/src/util/cluster_limit.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs create mode 100644 src/meta/service/src/cluster_limit_service.rs diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 3482ce4850246..e05d466c3a4d6 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -22,6 +22,7 @@ user backfill_rate_limit user background_ddl user batch_enable_distributed_dml user batch_parallelism +user bypass_cluster_limits user bytea_output user cdc_source_wait_streaming_start_timeout user client_encoding diff --git a/proto/meta.proto b/proto/meta.proto index 8932dcbc9e033..98a7f267c0124 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -791,3 +791,30 @@ message RelationIdInfos { // relation_id -> FragmentIdToActorIdMap map map = 1; } + +message ActorCountPerParallelism { + message WorkerActorCount { + uint64 actor_count = 1; + uint64 parallelism = 2; + } + map worker_id_to_actor_count = 1; + uint64 hard_limit = 2; + uint64 soft_limit = 3; +} + +message ClusterLimit { + oneof limit { + ActorCountPerParallelism actor_count = 1; + // TODO: limit DDL using compaction pending bytes + } +} + +message GetClusterLimitsRequest {} + +message GetClusterLimitsResponse { + repeated ClusterLimit active_limits = 1; +} + +service ClusterLimitService { + rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse); +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 88ea110869b79..ed7ac8619252c 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -466,6 +466,16 @@ pub struct MetaDeveloperConfig { #[serde(default = "default::developer::max_get_task_probe_times")] pub max_get_task_probe_times: usize, + + /// Max number of actor allowed per parallelism (default = 100). + /// CREATE MV/Table will be noticed when the number of actors exceeds this limit. + #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")] + pub actor_cnt_per_worker_parallelism_soft_limit: usize, + + /// Max number of actor allowed per parallelism (default = 400). + /// CREATE MV/Table will be rejected when the number of actors exceeds this limit. + #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] + pub actor_cnt_per_worker_parallelism_hard_limit: usize, } /// The section `[server]` in `risingwave.toml`. @@ -1859,6 +1869,14 @@ pub mod default { 5 } + pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize { + 100 + } + + pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize { + 400 + } + pub fn memory_controller_threshold_aggressive() -> f64 { 0.9 } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index ffdbe6753acb5..163aa18799390 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -292,6 +292,12 @@ pub struct SessionConfig { #[parameter(default = "hex", check_hook = check_bytea_output)] bytea_output: String, + + /// Bypass checks on cluster limits + /// + /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit. + #[parameter(default = false)] + bypass_cluster_limits: bool, } fn check_timezone(val: &str) -> Result<(), String> { diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs new file mode 100644 index 0000000000000..048ea4fdab305 --- /dev/null +++ b/src/common/src/util/cluster_limit.rs @@ -0,0 +1,134 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::{self, Display, Formatter}; + +use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount; +use risingwave_pb::meta::cluster_limit::PbLimit; +use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit}; +pub enum ClusterLimit { + ActorCount(ActorCountPerParallelism), +} + +impl From for PbClusterLimit { + fn from(limit: ClusterLimit) -> Self { + match limit { + ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit { + limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())), + }, + } + } +} + +impl From for ClusterLimit { + fn from(pb_limit: PbClusterLimit) -> Self { + match pb_limit.limit.unwrap() { + PbLimit::ActorCount(actor_count_per_parallelism) => { + ClusterLimit::ActorCount(actor_count_per_parallelism.into()) + } + } + } +} + +#[derive(Debug)] +pub struct WorkerActorCount { + pub actor_count: usize, + pub parallelism: usize, +} + +impl From for PbWorkerActorCount { + fn from(worker_actor_count: WorkerActorCount) -> Self { + PbWorkerActorCount { + actor_count: worker_actor_count.actor_count as u64, + parallelism: worker_actor_count.parallelism as u64, + } + } +} + +impl From for WorkerActorCount { + fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self { + WorkerActorCount { + actor_count: pb_worker_actor_count.actor_count as usize, + parallelism: pb_worker_actor_count.parallelism as usize, + } + } +} + +pub struct ActorCountPerParallelism { + pub worker_id_to_actor_count: HashMap, + pub hard_limit: usize, + pub soft_limit: usize, +} + +impl From for PbActorCountPerParallelism { + fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self { + PbActorCountPerParallelism { + worker_id_to_actor_count: actor_count_per_parallelism + .worker_id_to_actor_count + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + hard_limit: actor_count_per_parallelism.hard_limit as u64, + soft_limit: actor_count_per_parallelism.soft_limit as u64, + } + } +} + +impl From for ActorCountPerParallelism { + fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self { + ActorCountPerParallelism { + worker_id_to_actor_count: pb_actor_count_per_parallelism + .worker_id_to_actor_count + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + hard_limit: pb_actor_count_per_parallelism.hard_limit as usize, + soft_limit: pb_actor_count_per_parallelism.soft_limit as usize, + } + } +} + +impl ActorCountPerParallelism { + pub fn exceed_hard_limit(&self) -> bool { + self.worker_id_to_actor_count + .values() + .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism)) + } + + pub fn exceed_soft_limit(&self) -> bool { + self.worker_id_to_actor_count + .values() + .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism)) + } + + pub fn exceed_limit(&self) -> bool { + self.exceed_soft_limit() || self.exceed_hard_limit() + } +} + +impl Display for ActorCountPerParallelism { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let worker_id_to_actor_count_str: Vec<_> = self + .worker_id_to_actor_count + .iter() + .map(|(k, v)| format!("{} -> {:?}", k, v)) + .collect(); + write!( + f, + "ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}", + self.hard_limit, self.soft_limit, worker_id_to_actor_count_str + ) + } +} diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 20dac5906c91d..bfa15c8327037 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -42,3 +42,4 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; pub use tokio_util; +pub mod cluster_limit; diff --git a/src/config/example.toml b/src/config/example.toml index c81b35163eafa..f3c127cdc7825 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -81,6 +81,8 @@ meta_enable_trivial_move = true meta_enable_check_task_level_overlap = false meta_max_trivial_move_task_count_per_loop = 256 meta_max_get_task_probe_times = 5 +meta_actor_cnt_per_worker_parallelism_soft_limit = 100 +meta_actor_cnt_per_worker_parallelism_hard_limit = 400 [batch] enable_barrier_read = false diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 879e375e2b762..5e3261c06d186 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -59,3 +59,4 @@ mod rw_worker_nodes; mod rw_actor_id_to_ddl; mod rw_fragment_id_to_ddl; +mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs new file mode 100644 index 0000000000000..a336f69b2029f --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs @@ -0,0 +1,31 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +#[system_catalog( + view, + "rw_catalog.rw_worker_actor_count", + "SELECT t2.id as worker_id, parallelism, count(*) as actor_count + FROM rw_actors t1, rw_worker_nodes t2 + where t1.worker_id = t2.id + GROUP BY t2.id, t2.parallelism;" +)] +#[derive(Fields)] +struct RwWorkerActorCount { + worker_id: i32, + parallelism: i32, + actor_count: i64, +} diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 4399d80811c19..9d48f2a429cca 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -205,6 +205,9 @@ pub async fn handle_create_mv_bound( ) -> Result { let session = handler_args.session.clone(); + // Check cluster limits + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( name.clone(), StatementType::CREATE_MATERIALIZED_VIEW, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index d0bd1d0cc8f2f..bb8d528ab1205 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -419,6 +419,8 @@ pub async fn handle_create_sink( ) -> Result { let session = handle_args.session.clone(); + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( stmt.sink_name.clone(), StatementType::CREATE_SINK, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a10453a43ea4e..386d50e791886 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1235,6 +1235,8 @@ pub async fn handle_create_table( session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature."); } + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( table_name.clone(), StatementType::CREATE_TABLE, diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 60fa992bdbe2d..020e3380b29b7 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use anyhow::Context; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::backup_service::MetaSnapshotMetadata; @@ -136,6 +137,8 @@ pub trait FrontendMetaClient: Send + Sync { ) -> Result>; async fn get_cluster_recovery_status(&self) -> Result; + + async fn get_cluster_limits(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -345,4 +348,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn get_cluster_recovery_status(&self) -> Result { self.0.get_cluster_recovery_status().await } + + async fn get_cluster_limits(&self) -> Result> { + self.0.get_cluster_limits().await + } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 16f0c7226be21..a1150798951cb 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -59,9 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::cluster_limit::ActorCountPerParallelism; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::resource_util; use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_common::util::{cluster_limit, resource_util}; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager}; @@ -1194,6 +1195,47 @@ impl SessionImpl { pub fn temporary_source_manager(&self) -> TemporarySourceManager { self.temporary_source_manager.lock().clone() } + + pub async fn check_cluster_limits(&self) -> Result<()> { + if self.config().bypass_cluster_limits() { + return Ok(()); + } + + let gen_message = |violated_limit: &ActorCountPerParallelism, + exceed_hard_limit: bool| + -> String { + let (limit_type, action) = if exceed_hard_limit { + ("critical", "Please scale the cluster before proceeding!") + } else { + ("recommended", "Scaling the cluster is recommended.") + }; + format!( + "\n- {}\n- {}\n- {}\n- {}\n- {}\n{}", + format_args!("Actor count per parallelism exceeds the {} limit.", limit_type), + format_args!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action), + "Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.", + "You can bypass this check via SQL `SET bypass_cluster_limits TO true`.", + "You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.", + violated_limit, + ) + }; + + let limits = self.env().meta_client().get_cluster_limits().await?; + for limit in limits { + match limit { + cluster_limit::ClusterLimit::ActorCount(l) => { + if l.exceed_hard_limit() { + return Err(RwError::from(ErrorCode::ProtocolError(gen_message( + &l, true, + )))); + } else if l.exceed_soft_limit() { + self.notice_to_user(gen_message(&l, false)); + } + } + } + } + Ok(()) + } } pub static SESSION_MANAGER: std::sync::OnceLock> = diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ee6ff589e0cdb..10dad2105ed94 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::backup_service::MetaSnapshotMetadata; @@ -1065,7 +1066,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { } async fn list_all_nodes(&self) -> RpcResult> { - unimplemented!() + Ok(vec![]) } async fn list_compact_task_progress(&self) -> RpcResult> { @@ -1097,6 +1098,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { ) -> RpcResult> { unimplemented!() } + + async fn get_cluster_limits(&self) -> RpcResult> { + Ok(vec![]) + } } #[cfg(test)] diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 049519372c81e..88a76d1a1c706 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -457,6 +457,14 @@ pub fn start( table_info_statistic_history_times: config .storage .table_info_statistic_history_times, + actor_cnt_per_worker_parallelism_hard_limit: config + .meta + .developer + .actor_cnt_per_worker_parallelism_hard_limit, + actor_cnt_per_worker_parallelism_soft_limit: config + .meta + .developer + .actor_cnt_per_worker_parallelism_soft_limit, }, config.system.into_init_system_params(), Default::default(), diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 1f0f7f6a3fe8e..87c429ed9ccd3 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -40,6 +40,7 @@ use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; +use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; use risingwave_meta_service::event_log_service::EventLogServiceImpl; @@ -63,6 +64,7 @@ use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoor use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; +use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer; use risingwave_pb::meta::cluster_service_server::ClusterServiceServer; use risingwave_pb::meta::event_log_service_server::EventLogServiceServer; use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer; @@ -657,6 +659,7 @@ pub async fn start_service_as_election_leader( ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); + let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); if let Some(prometheus_addr) = address_info.prometheus_addr { MetricsManager::boot_metrics_service(prometheus_addr.to_string()) @@ -795,7 +798,8 @@ pub async fn start_service_as_election_leader( .add_service(ServingServiceServer::new(serving_srv)) .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) - .add_service(EventLogServiceServer::new(event_log_srv)); + .add_service(EventLogServiceServer::new(event_log_srv)) + .add_service(ClusterLimitServiceServer::new(cluster_limit_srv)); #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv)); diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs new file mode 100644 index 0000000000000..df19b24b234e6 --- /dev/null +++ b/src/meta/service/src/cluster_limit_service.rs @@ -0,0 +1,107 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use risingwave_common::util::cluster_limit::{ + ActorCountPerParallelism, ClusterLimit, WorkerActorCount, +}; +use risingwave_meta::manager::{MetaSrvEnv, MetadataManager, WorkerId}; +use risingwave_meta::MetaResult; +use risingwave_pb::common::worker_node::State; +use risingwave_pb::common::WorkerType; +use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService; +use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse}; +use tonic::{Request, Response, Status}; + +#[derive(Clone)] +pub struct ClusterLimitServiceImpl { + env: MetaSrvEnv, + metadata_manager: MetadataManager, +} + +impl ClusterLimitServiceImpl { + pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self { + ClusterLimitServiceImpl { + env, + metadata_manager, + } + } + + async fn get_active_actor_limit(&self) -> MetaResult> { + let (soft_limit, hard_limit) = ( + self.env.opts.actor_cnt_per_worker_parallelism_soft_limit, + self.env.opts.actor_cnt_per_worker_parallelism_hard_limit, + ); + + let running_worker_parallelism: HashMap = self + .metadata_manager + .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running)) + .await? + .into_iter() + .map(|e| (e.id, e.parallelism())) + .collect(); + let worker_actor_count: HashMap = self + .metadata_manager + .worker_actor_count() + .await? + .into_iter() + .filter_map(|(worker_id, actor_count)| { + running_worker_parallelism + .get(&worker_id) + .map(|parallelism| { + ( + worker_id, + WorkerActorCount { + actor_count, + parallelism: *parallelism, + }, + ) + }) + }) + .collect(); + + let limit = ActorCountPerParallelism { + worker_id_to_actor_count: worker_actor_count, + hard_limit, + soft_limit, + }; + + if limit.exceed_limit() { + Ok(Some(ClusterLimit::ActorCount(limit))) + } else { + Ok(None) + } + } +} + +#[async_trait::async_trait] +impl ClusterLimitService for ClusterLimitServiceImpl { + #[cfg_attr(coverage, coverage(off))] + async fn get_cluster_limits( + &self, + _request: Request, + ) -> Result, Status> { + // TODO: support more limits + match self.get_active_actor_limit().await { + Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse { + active_limits: vec![limit.into()], + })), + Ok(None) => Ok(Response::new(GetClusterLimitsResponse { + active_limits: vec![], + })), + Err(e) => Err(e.into()), + } + } +} diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 9ab248802772e..e2f57d4a26bbb 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -21,6 +21,7 @@ use risingwave_meta::*; pub mod backup_service; pub mod cloud_service; +pub mod cluster_limit_service; pub mod cluster_service; pub mod ddl_service; pub mod event_log_service; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 22f88bd9c0a75..ed18be6b0f483 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -294,6 +294,10 @@ pub struct MetaOpts { pub temp_secret_file_dir: String, pub table_info_statistic_history_times: usize, + + // Cluster limits + pub actor_cnt_per_worker_parallelism_hard_limit: usize, + pub actor_cnt_per_worker_parallelism_soft_limit: usize, } impl MetaOpts { @@ -358,6 +362,8 @@ impl MetaOpts { secret_store_private_key: Some("0123456789abcdef".as_bytes().to_vec()), temp_secret_file_dir: "./secrets".to_string(), table_info_statistic_history_times: 240, + actor_cnt_per_worker_parallelism_hard_limit: usize::MAX, + actor_cnt_per_worker_parallelism_soft_limit: usize::MAX, } } } @@ -408,9 +414,11 @@ impl MetaSrvEnv { (ClusterId::new(), true) }; - // For new clusters, the name of the object store needs to be prefixed according to the object id. - // For old clusters, the prefix is ​​not divided for the sake of compatibility. - + // For new clusters: + // - the name of the object store needs to be prefixed according to the object id. + // + // For old clusters + // - the prefix is ​​not divided for the sake of compatibility. init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); let system_params_manager = Arc::new( SystemParamsManager::new( @@ -455,7 +463,7 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { - let is_sql_backend_cluster_first_launch = + let cluster_first_launch = is_first_launch_for_sql_backend_cluster(sql_meta_store).await?; // Try to upgrade if any new model changes are added. Migrator::up(&sql_meta_store.conn, None) @@ -469,10 +477,14 @@ impl MetaSrvEnv { .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); - init_system_params.use_new_object_prefix_strategy = - Some(is_sql_backend_cluster_first_launch); - // For new clusters, the name of the object store needs to be prefixed according to the object id. - // For old clusters, the prefix is ​​not divided for the sake of compatibility. + + // For new clusters: + // - the name of the object store needs to be prefixed according to the object id. + // + // For old clusters + // - the prefix is ​​not divided for the sake of compatibility. + init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); + let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index b4e06d8690b72..db66e60c91eeb 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -22,6 +22,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; @@ -1436,6 +1437,14 @@ impl MetaClient { let resp = self.inner.get_version_by_epoch(req).await?; Ok(resp.version.unwrap()) } + + pub async fn get_cluster_limits( + &self, + ) -> Result> { + let req = GetClusterLimitsRequest {}; + let resp = self.inner.get_cluster_limits(req).await?; + Ok(resp.active_limits.into_iter().map(|l| l.into()).collect()) + } } #[async_trait] @@ -1636,6 +1645,7 @@ struct GrpcMetaClientCore { cloud_client: CloudServiceClient, sink_coordinate_client: SinkCoordinationRpcClient, event_log_client: EventLogServiceClient, + cluster_limit_client: ClusterLimitServiceClient, } impl GrpcMetaClientCore { @@ -1662,7 +1672,8 @@ impl GrpcMetaClientCore { let serving_client = ServingServiceClient::new(channel.clone()); let cloud_client = CloudServiceClient::new(channel.clone()); let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone()); - let event_log_client = EventLogServiceClient::new(channel); + let event_log_client = EventLogServiceClient::new(channel.clone()); + let cluster_limit_client = ClusterLimitServiceClient::new(channel); GrpcMetaClientCore { cluster_client, @@ -1682,6 +1693,7 @@ impl GrpcMetaClientCore { cloud_client, sink_coordinate_client, event_log_client, + cluster_limit_client, } } } @@ -2126,6 +2138,7 @@ macro_rules! for_all_meta_rpc { ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse } ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse } ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse } + ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse } } }; }