From de9f1cc4768486c364cafc8696eaeb1fe634e503 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 28 Aug 2024 18:30:54 +0800 Subject: [PATCH 01/15] feat: introduce cluster limit --- proto/meta.proto | 29 ++++ src/common/src/config.rs | 26 ++++ src/common/src/session_config/mod.rs | 17 +++ src/common/src/util/cluster_limit.rs | 137 ++++++++++++++++++ src/common/src/util/mod.rs | 1 + src/config/example.toml | 2 + .../catalog/system_catalog/rw_catalog/mod.rs | 1 + .../rw_catalog/rw_worker_actor_count.rs | 31 ++++ src/frontend/src/handler/create_mv.rs | 3 + src/frontend/src/handler/util.rs | 2 + src/frontend/src/meta_client.rs | 7 + src/frontend/src/session.rs | 46 +++++- src/frontend/src/test_utils.rs | 5 + src/license/src/manager.rs | 4 + src/meta/node/src/lib.rs | 8 + src/meta/node/src/server.rs | 6 +- src/meta/service/src/cluster_limit_service.rs | 123 ++++++++++++++++ src/meta/service/src/lib.rs | 1 + src/meta/src/error.rs | 4 + src/meta/src/manager/env.rs | 28 +++- src/rpc_client/src/meta_client.rs | 13 +- 21 files changed, 483 insertions(+), 11 deletions(-) create mode 100644 src/common/src/util/cluster_limit.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs create mode 100644 src/meta/service/src/cluster_limit_service.rs diff --git a/proto/meta.proto b/proto/meta.proto index d75494625edd..42661c7c2c77 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -789,3 +789,32 @@ message RelationIdInfos { // relation_id -> FragmentIdToActorIdMap map map = 1; } + +message WorkerActorCount { + uint64 actor_count = 1; + uint64 parallelism = 2; +} + +message ActorCountPerParallelism { + map worker_id_to_actor_count = 1; + uint64 hard_limit = 2; + uint64 soft_limit = 3; +} + +message ClusterLimit { + oneof limit { + ActorCountPerParallelism actor_count = 1; + // TODO: limit DDL using compaction pending bytes + } +} + +message GetClusterLimitsRequest {} + +message GetClusterLimitsResponse { + repeated ClusterLimit active_limits = 1; +} + + +service ClusterLimitService { + rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse); +} \ No newline at end of file diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 88ea110869b7..9067b7ecd180 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -466,6 +466,24 @@ pub struct MetaDeveloperConfig { #[serde(default = "default::developer::max_get_task_probe_times")] pub max_get_task_probe_times: usize, + + /// Max number of actor allowed per parallelism (default = 100). + /// CREATE MV/Table will be noticed when the number of actors exceeds this limit. + /// This limit is effective for clusters that meets the following conditions: + /// - running with version >= 2.0 + /// - running with paied tier license + /// Free tier cluster's limit will be hardcoded to FREE_TIER_ACTOR_CNT_SOFT_LIMIT + #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")] + pub actor_cnt_per_worker_parallelism_soft_limit: usize, + + /// Max number of actor allowed per parallelism (default = 400). + /// CREATE MV/Table will be rejected when the number of actors exceeds this limit. + /// This limit is effective for clusters that meets the following conditions: + /// - created with version >= 2.0 + /// - running with paied tier license + /// Free tier cluster's limit will be hardcoded to FREE_TIER_ACTOR_CNT_HARD_LIMIT + #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] + pub actor_cnt_per_worker_parallelism_hard_limit: usize, } /// The section `[server]` in `risingwave.toml`. @@ -1859,6 +1877,14 @@ pub mod default { 5 } + pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize { + 100 + } + + pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize { + 400 + } + pub fn memory_controller_threshold_aggressive() -> f64 { 0.9 } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index ffdbe6753acb..7781198869fc 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,6 +24,7 @@ use chrono_tz::Tz; pub use over_window::OverWindowCachePolicy; pub use query_mode::QueryMode; use risingwave_common_proc_macro::{ConfigDoc, SessionConfig}; +use risingwave_license::LicenseManager; pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -292,6 +293,13 @@ pub struct SessionConfig { #[parameter(default = "hex", check_hook = check_bytea_output)] bytea_output: String, + + /// Bypass checks on cluster limits + /// + /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit. + /// This session variable is only mutable in paid tier. + #[parameter(default = false, check_hook = check_bypass_cluster_limits)] + bypass_cluster_limits: bool, } fn check_timezone(val: &str) -> Result<(), String> { @@ -318,6 +326,15 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } } +fn check_bypass_cluster_limits(_val: &bool) -> Result<(), String> { + match LicenseManager::get().tier().map_err(|e| e.to_string())? { + risingwave_license::Tier::Free => { + Err("Bypassing cluster limits is only allowed in paid tier".to_string()) + } + risingwave_license::Tier::Paid => Ok(()), + } +} + impl SessionConfig { pub fn set_force_two_phase_agg( &mut self, diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs new file mode 100644 index 000000000000..77c5b0d7777f --- /dev/null +++ b/src/common/src/util/cluster_limit.rs @@ -0,0 +1,137 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::{self, Display, Formatter}; + +use risingwave_pb::meta::cluster_limit::PbLimit; +use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit, PbWorkerActorCount}; + +pub const FREE_TIER_ACTOR_CNT_SOFT_LIMIT: usize = 25; +pub const FREE_TIER_ACTOR_CNT_HARD_LIMIT: usize = 100; + +pub enum ClusterLimit { + ActorCount(ActorCountPerParallelism), +} + +impl From for PbClusterLimit { + fn from(limit: ClusterLimit) -> Self { + match limit { + ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit { + limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())), + }, + } + } +} + +impl From for ClusterLimit { + fn from(pb_limit: PbClusterLimit) -> Self { + match pb_limit.limit.unwrap() { + PbLimit::ActorCount(actor_count_per_parallelism) => { + ClusterLimit::ActorCount(actor_count_per_parallelism.into()) + } + } + } +} + +#[derive(Debug)] +pub struct WorkerActorCount { + pub actor_count: usize, + pub parallelism: usize, +} + +impl From for PbWorkerActorCount { + fn from(worker_actor_count: WorkerActorCount) -> Self { + PbWorkerActorCount { + actor_count: worker_actor_count.actor_count as u64, + parallelism: worker_actor_count.parallelism as u64, + } + } +} + +impl From for WorkerActorCount { + fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self { + WorkerActorCount { + actor_count: pb_worker_actor_count.actor_count as usize, + parallelism: pb_worker_actor_count.parallelism as usize, + } + } +} + +pub struct ActorCountPerParallelism { + pub worker_id_to_actor_count: HashMap, + pub hard_limit: usize, + pub soft_limit: usize, +} + +impl From for PbActorCountPerParallelism { + fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self { + PbActorCountPerParallelism { + worker_id_to_actor_count: actor_count_per_parallelism + .worker_id_to_actor_count + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + hard_limit: actor_count_per_parallelism.hard_limit as u64, + soft_limit: actor_count_per_parallelism.soft_limit as u64, + } + } +} + +impl From for ActorCountPerParallelism { + fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self { + ActorCountPerParallelism { + worker_id_to_actor_count: pb_actor_count_per_parallelism + .worker_id_to_actor_count + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(), + hard_limit: pb_actor_count_per_parallelism.hard_limit as usize, + soft_limit: pb_actor_count_per_parallelism.soft_limit as usize, + } + } +} + +impl ActorCountPerParallelism { + pub fn exceed_hard_limit(&self) -> bool { + self.worker_id_to_actor_count + .values() + .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism) ) + } + + pub fn exceed_soft_limit(&self) -> bool { + self.worker_id_to_actor_count + .values() + .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism) ) + } + + pub fn exceed_limit(&self) -> bool { + self.exceed_soft_limit() || self.exceed_hard_limit() + } +} + +impl Display for ActorCountPerParallelism { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let worker_id_to_actor_count_str: Vec<_> = self + .worker_id_to_actor_count + .iter() + .map(|(k, v)| format!("{} -> {:?}", k, v)) + .collect(); + write!( + f, + "ActorCountPerParallelism {{ hard_limit: {:?}, soft_limit: {:?}. worker_id_to_actor_count: {:?} }}", + self.hard_limit, self.soft_limit, worker_id_to_actor_count_str + ) + } +} diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 20dac5906c91..bfa15c832703 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -42,3 +42,4 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; pub use tokio_util; +pub mod cluster_limit; diff --git a/src/config/example.toml b/src/config/example.toml index c81b35163eaf..9e9d7894a079 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -81,6 +81,8 @@ meta_enable_trivial_move = true meta_enable_check_task_level_overlap = false meta_max_trivial_move_task_count_per_loop = 256 meta_max_get_task_probe_times = 5 +actor_cnt_per_worker_parallelism_soft_limit = 100 +actor_cnt_per_worker_parallelism_hard_limit = 400 [batch] enable_barrier_read = false diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 879e375e2b76..5e3261c06d18 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -59,3 +59,4 @@ mod rw_worker_nodes; mod rw_actor_id_to_ddl; mod rw_fragment_id_to_ddl; +mod rw_worker_actor_count; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs new file mode 100644 index 000000000000..a336f69b2029 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_actor_count.rs @@ -0,0 +1,31 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::Fields; +use risingwave_frontend_macro::system_catalog; + +#[system_catalog( + view, + "rw_catalog.rw_worker_actor_count", + "SELECT t2.id as worker_id, parallelism, count(*) as actor_count + FROM rw_actors t1, rw_worker_nodes t2 + where t1.worker_id = t2.id + GROUP BY t2.id, t2.parallelism;" +)] +#[derive(Fields)] +struct RwWorkerActorCount { + worker_id: i32, + parallelism: i32, + actor_count: i64, +} diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 4399d80811c1..9d48f2a429cc 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -205,6 +205,9 @@ pub async fn handle_create_mv_bound( ) -> Result { let session = handler_args.session.clone(); + // Check cluster limits + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( name.clone(), StatementType::CREATE_MATERIALIZED_VIEW, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 0531ce5a6528..d6a14d9dd696 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -238,6 +238,8 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } + + #[cfg(test)] mod tests { use postgres_types::{ToSql, Type}; diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 60fa992bdbe2..020e3380b29b 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use anyhow::Context; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::backup_service::MetaSnapshotMetadata; @@ -136,6 +137,8 @@ pub trait FrontendMetaClient: Send + Sync { ) -> Result>; async fn get_cluster_recovery_status(&self) -> Result; + + async fn get_cluster_limits(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -345,4 +348,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn get_cluster_recovery_status(&self) -> Result { self.0.get_cluster_recovery_status().await } + + async fn get_cluster_limits(&self) -> Result> { + self.0.get_cluster_limits().await + } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 16f0c7226be2..0dd4a1a33ac4 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -49,6 +49,7 @@ use risingwave_common::catalog::{ use risingwave_common::config::{ load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig, }; +use risingwave_common::license::LicenseManager; use risingwave_common::memory::MemoryContext; use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode}; @@ -60,8 +61,8 @@ use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::resource_util; use risingwave_common::util::runtime::BackgroundShutdownRuntime; +use risingwave_common::util::{cluster_limit, resource_util}; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager}; @@ -1194,6 +1195,49 @@ impl SessionImpl { pub fn temporary_source_manager(&self) -> TemporarySourceManager { self.temporary_source_manager.lock().clone() } + + pub async fn check_cluster_limits(&self) -> Result<()> { + let bypass_cluster_limits = self.config().bypass_cluster_limits(); + let tier = LicenseManager::get() + .tier() + .map_err(|e| RwError::from(ErrorCode::ProtocolError(e.to_string())))?; + let limits = self.env().meta_client().get_cluster_limits().await?; + for limit in limits { + match limit { + cluster_limit::ClusterLimit::ActorCount(l) => { + if l.exceed_hard_limit() { + let mut msg = + "\n- Actor count per parallelism exceeds the hard limit".to_string(); + msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Please scale the cluster before proceeding."); + if matches!(tier, risingwave_common::license::Tier::Free) { + msg.push_str("\n- Feel free to contact us via https://risingwave.com/contact-us/ for a upgrade."); + } + msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); + msg.push_str(format!("\n{}", l).as_str()); + if bypass_cluster_limits { + // Only send a notice if `bypass_cluster_limits` is set. + self.notice_to_user(&msg); + } else { + // Return an error if `bypass_cluster_limits` is not set. + return Err(RwError::from(ErrorCode::ProtocolError(msg))); + } + } else if l.exceed_soft_limit() { + // Send a notice if soft limit is exceeded. + let mut msg = + "\n- Actor count per parallelism exceeds the soft limit".to_string(); + msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Scaling the cluster is recommended."); + if matches!(tier, risingwave_common::license::Tier::Free) { + msg.push_str("\n- Feel free to contact us via https://risingwave.com/contact-us/ for a upgrade."); + } + msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); + msg.push_str(format!("\n{}", l).as_str()); + self.notice_to_user(&msg); + } + } + } + } + Ok(()) + } } pub static SESSION_MANAGER: std::sync::OnceLock> = diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ee6ff589e0cd..f2382c62324f 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -30,6 +30,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; +use risingwave_common::util::cluster_limit::ClusterLimit; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_pb::backup_service::MetaSnapshotMetadata; @@ -1097,6 +1098,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { ) -> RpcResult> { unimplemented!() } + + async fn get_cluster_limits(&self) -> RpcResult> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/license/src/manager.rs b/src/license/src/manager.rs index 5c1bc298388d..ee9e8e51da0d 100644 --- a/src/license/src/manager.rs +++ b/src/license/src/manager.rs @@ -168,6 +168,10 @@ impl LicenseManager { } } + pub fn tier(&self) -> Result { + self.license().map(|l| l.tier) + } + /// Get the current license if it is valid. /// /// Since the license can expire, the returned license should not be cached by the caller. diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 049519372c81..88a76d1a1c70 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -457,6 +457,14 @@ pub fn start( table_info_statistic_history_times: config .storage .table_info_statistic_history_times, + actor_cnt_per_worker_parallelism_hard_limit: config + .meta + .developer + .actor_cnt_per_worker_parallelism_hard_limit, + actor_cnt_per_worker_parallelism_soft_limit: config + .meta + .developer + .actor_cnt_per_worker_parallelism_soft_limit, }, config.system.into_init_system_params(), Default::default(), diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 1f0f7f6a3fe8..87c429ed9ccd 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -40,6 +40,7 @@ use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; +use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; use risingwave_meta_service::event_log_service::EventLogServiceImpl; @@ -63,6 +64,7 @@ use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoor use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; +use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer; use risingwave_pb::meta::cluster_service_server::ClusterServiceServer; use risingwave_pb::meta::event_log_service_server::EventLogServiceServer; use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer; @@ -657,6 +659,7 @@ pub async fn start_service_as_election_leader( ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); + let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); if let Some(prometheus_addr) = address_info.prometheus_addr { MetricsManager::boot_metrics_service(prometheus_addr.to_string()) @@ -795,7 +798,8 @@ pub async fn start_service_as_election_leader( .add_service(ServingServiceServer::new(serving_srv)) .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) - .add_service(EventLogServiceServer::new(event_log_srv)); + .add_service(EventLogServiceServer::new(event_log_srv)) + .add_service(ClusterLimitServiceServer::new(cluster_limit_srv)); #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv)); diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs new file mode 100644 index 000000000000..5c6db9dd2643 --- /dev/null +++ b/src/meta/service/src/cluster_limit_service.rs @@ -0,0 +1,123 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use risingwave_common::license::{LicenseManager, Tier}; +use risingwave_common::util::cluster_limit::{ + ActorCountPerParallelism, ClusterLimit, WorkerActorCount, FREE_TIER_ACTOR_CNT_HARD_LIMIT, + FREE_TIER_ACTOR_CNT_SOFT_LIMIT, +}; +use risingwave_meta::manager::{MetaSrvEnv, MetadataManager, WorkerId}; +use risingwave_meta::MetaResult; +use risingwave_pb::common::worker_node::State; +use risingwave_pb::common::WorkerType; +use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService; +use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse}; +use tonic::{Request, Response, Status}; + +#[derive(Clone)] +pub struct ClusterLimitServiceImpl { + env: MetaSrvEnv, + metadata_manager: MetadataManager, +} + +impl ClusterLimitServiceImpl { + pub fn new(env: MetaSrvEnv, metadata_manager: MetadataManager) -> Self { + ClusterLimitServiceImpl { + env, + metadata_manager, + } + } + + async fn get_active_actor_limit(&self) -> MetaResult> { + let (soft_limit, hard_limit) = match LicenseManager::get().tier() { + Ok(Tier::Paid) => ( + self.env.opts.actor_cnt_per_worker_parallelism_soft_limit, + self.env.opts.actor_cnt_per_worker_parallelism_hard_limit, + ), + Ok(Tier::Free) => ( + FREE_TIER_ACTOR_CNT_SOFT_LIMIT, + FREE_TIER_ACTOR_CNT_HARD_LIMIT, + ), + Err(e) => { + tracing::warn!("Failed to get license tier: {}", e); + // Default to use free tier limit if there is any license error + ( + FREE_TIER_ACTOR_CNT_SOFT_LIMIT, + FREE_TIER_ACTOR_CNT_HARD_LIMIT, + ) + } + }; + + let running_worker_parallelism: HashMap = self + .metadata_manager + .list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running)) + .await? + .into_iter() + .map(|e| (e.id, e.parallelism())) + .collect(); + let worker_actor_count: HashMap = self + .metadata_manager + .worker_actor_count() + .await? + .into_iter() + .filter_map(|(worker_id, actor_count)| { + running_worker_parallelism + .get(&worker_id) + .map(|parallelism| { + ( + worker_id, + WorkerActorCount { + actor_count, + parallelism: *parallelism, + }, + ) + }) + }) + .collect(); + + let limit = ActorCountPerParallelism { + worker_id_to_actor_count: worker_actor_count, + hard_limit, + soft_limit, + }; + + if limit.exceed_limit() { + Ok(Some(ClusterLimit::ActorCount(limit))) + } else { + Ok(None) + } + } +} + +#[async_trait::async_trait] +impl ClusterLimitService for ClusterLimitServiceImpl { + #[cfg_attr(coverage, coverage(off))] + async fn get_cluster_limits( + &self, + _request: Request, + ) -> Result, Status> { + // TODO: support more limits + match self.get_active_actor_limit().await { + Ok(Some(limit)) => Ok(Response::new(GetClusterLimitsResponse { + active_limits: vec![limit.into()], + })), + Ok(None) => Ok(Response::new(GetClusterLimitsResponse { + active_limits: vec![], + })), + Err(e) => Err(e.into()), + } + } +} diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 9ab248802772..e2f57d4a26bb 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -21,6 +21,7 @@ use risingwave_meta::*; pub mod backup_service; pub mod cloud_service; +pub mod cluster_limit_service; pub mod cluster_service; pub mod ddl_service; pub mod event_log_service; diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a..c25bd1340cb3 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -125,6 +125,9 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, + + #[error("ResourceExhausted error: {0}")] + ResourceExhausted(String), } impl MetaError { @@ -175,6 +178,7 @@ impl From for tonic::Status { MetaErrorInner::Unavailable(_) => Code::Unavailable, MetaErrorInner::Cancelled(_) => Code::Cancelled, MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument, + MetaErrorInner::ResourceExhausted(_) => Code::ResourceExhausted, _ => Code::Internal, }; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 22f88bd9c0a7..ed18be6b0f48 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -294,6 +294,10 @@ pub struct MetaOpts { pub temp_secret_file_dir: String, pub table_info_statistic_history_times: usize, + + // Cluster limits + pub actor_cnt_per_worker_parallelism_hard_limit: usize, + pub actor_cnt_per_worker_parallelism_soft_limit: usize, } impl MetaOpts { @@ -358,6 +362,8 @@ impl MetaOpts { secret_store_private_key: Some("0123456789abcdef".as_bytes().to_vec()), temp_secret_file_dir: "./secrets".to_string(), table_info_statistic_history_times: 240, + actor_cnt_per_worker_parallelism_hard_limit: usize::MAX, + actor_cnt_per_worker_parallelism_soft_limit: usize::MAX, } } } @@ -408,9 +414,11 @@ impl MetaSrvEnv { (ClusterId::new(), true) }; - // For new clusters, the name of the object store needs to be prefixed according to the object id. - // For old clusters, the prefix is ​​not divided for the sake of compatibility. - + // For new clusters: + // - the name of the object store needs to be prefixed according to the object id. + // + // For old clusters + // - the prefix is ​​not divided for the sake of compatibility. init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); let system_params_manager = Arc::new( SystemParamsManager::new( @@ -455,7 +463,7 @@ impl MetaSrvEnv { } } MetaStoreImpl::Sql(sql_meta_store) => { - let is_sql_backend_cluster_first_launch = + let cluster_first_launch = is_first_launch_for_sql_backend_cluster(sql_meta_store).await?; // Try to upgrade if any new model changes are added. Migrator::up(&sql_meta_store.conn, None) @@ -469,10 +477,14 @@ impl MetaSrvEnv { .await? .map(|c| c.cluster_id.to_string().into()) .unwrap(); - init_system_params.use_new_object_prefix_strategy = - Some(is_sql_backend_cluster_first_launch); - // For new clusters, the name of the object store needs to be prefixed according to the object id. - // For old clusters, the prefix is ​​not divided for the sake of compatibility. + + // For new clusters: + // - the name of the object store needs to be prefixed according to the object id. + // + // For old clusters + // - the prefix is ​​not divided for the sake of compatibility. + init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch); + let system_param_controller = Arc::new( SystemParamsController::new( sql_meta_store.clone(), diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index b4e06d8690b7..86f401df24a2 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -22,6 +22,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; use lru::LruCache; @@ -1436,6 +1437,12 @@ impl MetaClient { let resp = self.inner.get_version_by_epoch(req).await?; Ok(resp.version.unwrap()) } + + pub async fn get_cluster_limits(&self) -> Result> { + let req = GetClusterLimitsRequest {}; + let resp = self.inner.get_cluster_limits(req).await?; + Ok(resp.active_limits.into_iter().map(|l| l.into()).collect()) + } } #[async_trait] @@ -1636,6 +1643,7 @@ struct GrpcMetaClientCore { cloud_client: CloudServiceClient, sink_coordinate_client: SinkCoordinationRpcClient, event_log_client: EventLogServiceClient, + cluster_limit_client: ClusterLimitServiceClient, } impl GrpcMetaClientCore { @@ -1662,7 +1670,8 @@ impl GrpcMetaClientCore { let serving_client = ServingServiceClient::new(channel.clone()); let cloud_client = CloudServiceClient::new(channel.clone()); let sink_coordinate_client = SinkCoordinationServiceClient::new(channel.clone()); - let event_log_client = EventLogServiceClient::new(channel); + let event_log_client = EventLogServiceClient::new(channel.clone()); + let cluster_limit_client = ClusterLimitServiceClient::new(channel); GrpcMetaClientCore { cluster_client, @@ -1682,6 +1691,7 @@ impl GrpcMetaClientCore { cloud_client, sink_coordinate_client, event_log_client, + cluster_limit_client, } } } @@ -2126,6 +2136,7 @@ macro_rules! for_all_meta_rpc { ,{ cloud_client, rw_cloud_validate_source, RwCloudValidateSourceRequest, RwCloudValidateSourceResponse } ,{ event_log_client, list_event_log, ListEventLogRequest, ListEventLogResponse } ,{ event_log_client, add_event_log, AddEventLogRequest, AddEventLogResponse } + ,{ cluster_limit_client, get_cluster_limits, GetClusterLimitsRequest, GetClusterLimitsResponse } } }; } From a060b0db6eed6d6cbbc63a82b60ef39c73c24455 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Tue, 3 Sep 2024 20:11:59 +0800 Subject: [PATCH 02/15] cleanup --- src/common/src/config.rs | 12 ++++-------- src/common/src/util/cluster_limit.rs | 4 ++-- src/frontend/src/handler/create_sink.rs | 2 ++ src/frontend/src/handler/create_table.rs | 2 ++ src/frontend/src/handler/util.rs | 2 -- src/rpc_client/src/meta_client.rs | 4 +++- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9067b7ecd180..6778b52cc2bf 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -469,19 +469,15 @@ pub struct MetaDeveloperConfig { /// Max number of actor allowed per parallelism (default = 100). /// CREATE MV/Table will be noticed when the number of actors exceeds this limit. - /// This limit is effective for clusters that meets the following conditions: - /// - running with version >= 2.0 - /// - running with paied tier license - /// Free tier cluster's limit will be hardcoded to FREE_TIER_ACTOR_CNT_SOFT_LIMIT + /// This limit is effective only for paid tier clusters. + /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_SOFT_LIMIT` #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")] pub actor_cnt_per_worker_parallelism_soft_limit: usize, /// Max number of actor allowed per parallelism (default = 400). /// CREATE MV/Table will be rejected when the number of actors exceeds this limit. - /// This limit is effective for clusters that meets the following conditions: - /// - created with version >= 2.0 - /// - running with paied tier license - /// Free tier cluster's limit will be hardcoded to FREE_TIER_ACTOR_CNT_HARD_LIMIT + /// This limit is effective only for paid tier clusters. + /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT`` #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] pub actor_cnt_per_worker_parallelism_hard_limit: usize, } diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs index 77c5b0d7777f..5e0d8b9d8196 100644 --- a/src/common/src/util/cluster_limit.rs +++ b/src/common/src/util/cluster_limit.rs @@ -107,13 +107,13 @@ impl ActorCountPerParallelism { pub fn exceed_hard_limit(&self) -> bool { self.worker_id_to_actor_count .values() - .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism) ) + .any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism)) } pub fn exceed_soft_limit(&self) -> bool { self.worker_id_to_actor_count .values() - .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism) ) + .any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism)) } pub fn exceed_limit(&self) -> bool { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 7ef118891865..95cf92ea6cf8 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -415,6 +415,8 @@ pub async fn handle_create_sink( ) -> Result { let session = handle_args.session.clone(); + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( stmt.sink_name.clone(), StatementType::CREATE_SINK, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 0359280d28ad..cd253745d5d2 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1197,6 +1197,8 @@ pub async fn handle_create_table( session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature."); } + session.check_cluster_limits().await?; + if let Either::Right(resp) = session.check_relation_name_duplicated( table_name.clone(), StatementType::CREATE_TABLE, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index d6a14d9dd696..0531ce5a6528 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -238,8 +238,6 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } - - #[cfg(test)] mod tests { use postgres_types::{ToSql, Type}; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 86f401df24a2..db66e60c91ee 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1438,7 +1438,9 @@ impl MetaClient { Ok(resp.version.unwrap()) } - pub async fn get_cluster_limits(&self) -> Result> { + pub async fn get_cluster_limits( + &self, + ) -> Result> { let req = GetClusterLimitsRequest {}; let resp = self.inner.get_cluster_limits(req).await?; Ok(resp.active_limits.into_iter().map(|l| l.into()).collect()) From 1e65fc88129338c213ead208451f22549e3c301b Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 09:07:39 +0800 Subject: [PATCH 03/15] address CI issues --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + proto/meta.proto | 3 +-- src/common/src/config.rs | 2 +- src/common/src/session_config/mod.rs | 6 +++++- src/config/example.toml | 4 ++-- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 3482ce485024..e05d466c3a4d 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -22,6 +22,7 @@ user backfill_rate_limit user background_ddl user batch_enable_distributed_dml user batch_parallelism +user bypass_cluster_limits user bytea_output user cdc_source_wait_streaming_start_timeout user client_encoding diff --git a/proto/meta.proto b/proto/meta.proto index 42661c7c2c77..7a8393104930 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -814,7 +814,6 @@ message GetClusterLimitsResponse { repeated ClusterLimit active_limits = 1; } - service ClusterLimitService { rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse); -} \ No newline at end of file +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 6778b52cc2bf..21c0e7cf8243 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -477,7 +477,7 @@ pub struct MetaDeveloperConfig { /// Max number of actor allowed per parallelism (default = 400). /// CREATE MV/Table will be rejected when the number of actors exceeds this limit. /// This limit is effective only for paid tier clusters. - /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT`` + /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT` #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] pub actor_cnt_per_worker_parallelism_hard_limit: usize, } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 7781198869fc..095bfcc7b9f9 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -29,6 +29,7 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use thiserror::Error; +use thiserror_ext::AsReport; use self::non_zero64::ConfigNonZeroU64; use crate::session_config::sink_decouple::SinkDecouple; @@ -327,7 +328,10 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } fn check_bypass_cluster_limits(_val: &bool) -> Result<(), String> { - match LicenseManager::get().tier().map_err(|e| e.to_string())? { + match LicenseManager::get() + .tier() + .map_err(|e| e.to_report_string())? + { risingwave_license::Tier::Free => { Err("Bypassing cluster limits is only allowed in paid tier".to_string()) } diff --git a/src/config/example.toml b/src/config/example.toml index 9e9d7894a079..f3c127cdc782 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -81,8 +81,8 @@ meta_enable_trivial_move = true meta_enable_check_task_level_overlap = false meta_max_trivial_move_task_count_per_loop = 256 meta_max_get_task_probe_times = 5 -actor_cnt_per_worker_parallelism_soft_limit = 100 -actor_cnt_per_worker_parallelism_hard_limit = 400 +meta_actor_cnt_per_worker_parallelism_soft_limit = 100 +meta_actor_cnt_per_worker_parallelism_hard_limit = 400 [batch] enable_barrier_read = false From 2c3b93d5fdf0684c411f43127259728723a9820c Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 10:50:39 +0800 Subject: [PATCH 04/15] polish notice messages --- src/frontend/src/session.rs | 23 +++++++++++++++-------- src/frontend/src/test_utils.rs | 2 +- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 0dd4a1a33ac4..e8ecbd86e676 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -77,6 +77,7 @@ use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient} use risingwave_sqlparser::ast::{ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror::Error; +use thiserror_ext::AsReport; use tokio::runtime::Builder; use tokio::sync::oneshot::Sender; use tokio::sync::watch; @@ -1198,19 +1199,24 @@ impl SessionImpl { pub async fn check_cluster_limits(&self) -> Result<()> { let bypass_cluster_limits = self.config().bypass_cluster_limits(); - let tier = LicenseManager::get() - .tier() - .map_err(|e| RwError::from(ErrorCode::ProtocolError(e.to_string())))?; + let tier = match LicenseManager::get().tier() { + Ok(tier) => tier, + Err(e) => { + self.notice_to_user(e.to_report_string()); + // Default to free tier if license is not available. + risingwave_common::license::Tier::Free + } + }; let limits = self.env().meta_client().get_cluster_limits().await?; for limit in limits { match limit { cluster_limit::ClusterLimit::ActorCount(l) => { if l.exceed_hard_limit() { - let mut msg = - "\n- Actor count per parallelism exceeds the hard limit".to_string(); + let mut msg = "\n- Actor count per parallelism exceeds the critical limit." + .to_string(); msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Please scale the cluster before proceeding."); if matches!(tier, risingwave_common::license::Tier::Free) { - msg.push_str("\n- Feel free to contact us via https://risingwave.com/contact-us/ for a upgrade."); + msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage."); } msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); msg.push_str(format!("\n{}", l).as_str()); @@ -1224,10 +1230,11 @@ impl SessionImpl { } else if l.exceed_soft_limit() { // Send a notice if soft limit is exceeded. let mut msg = - "\n- Actor count per parallelism exceeds the soft limit".to_string(); + "\n- Actor count per parallelism exceeds the recommended limit." + .to_string(); msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Scaling the cluster is recommended."); if matches!(tier, risingwave_common::license::Tier::Free) { - msg.push_str("\n- Feel free to contact us via https://risingwave.com/contact-us/ for a upgrade."); + msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage."); } msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); msg.push_str(format!("\n{}", l).as_str()); diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index f2382c62324f..24d7b6a23ee5 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1100,7 +1100,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { } async fn get_cluster_limits(&self) -> RpcResult> { - unimplemented!() + Ok(vec![]) } } From 08e84d0cbcf62640c1f514b285c892ee21e123e5 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 10:51:29 +0800 Subject: [PATCH 05/15] polish limit message --- src/common/src/util/cluster_limit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs index 5e0d8b9d8196..8097cf51ee64 100644 --- a/src/common/src/util/cluster_limit.rs +++ b/src/common/src/util/cluster_limit.rs @@ -130,7 +130,7 @@ impl Display for ActorCountPerParallelism { .collect(); write!( f, - "ActorCountPerParallelism {{ hard_limit: {:?}, soft_limit: {:?}. worker_id_to_actor_count: {:?} }}", + "ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}", self.hard_limit, self.soft_limit, worker_id_to_actor_count_str ) } From 0612955b6c6ed94cbb0fa2b5ee6e1115bfe28074 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 11:05:16 +0800 Subject: [PATCH 06/15] fix error log --- src/meta/service/src/cluster_limit_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 5c6db9dd2643..63ed55beac88 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -52,7 +52,7 @@ impl ClusterLimitServiceImpl { FREE_TIER_ACTOR_CNT_HARD_LIMIT, ), Err(e) => { - tracing::warn!("Failed to get license tier: {}", e); + tracing::warn!(error=error=%err.as_report(), "Failed to get license tier: {}"); // Default to use free tier limit if there is any license error ( FREE_TIER_ACTOR_CNT_SOFT_LIMIT, From e32944bdf954cb1e864d9475802c2c2e3739f106 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 11:06:59 +0800 Subject: [PATCH 07/15] remove unused codes --- src/meta/src/error.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index c25bd1340cb3..8aeaed2f9c5a 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -125,9 +125,6 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, - - #[error("ResourceExhausted error: {0}")] - ResourceExhausted(String), } impl MetaError { @@ -178,7 +175,6 @@ impl From for tonic::Status { MetaErrorInner::Unavailable(_) => Code::Unavailable, MetaErrorInner::Cancelled(_) => Code::Cancelled, MetaErrorInner::InvalidParameter(_) => Code::InvalidArgument, - MetaErrorInner::ResourceExhausted(_) => Code::ResourceExhausted, _ => Code::Internal, }; From b456b828c937162668915c6497be518a6ae71de5 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 11:19:28 +0800 Subject: [PATCH 08/15] fix error log --- src/meta/service/src/cluster_limit_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 63ed55beac88..220d8c579e5f 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -51,8 +51,8 @@ impl ClusterLimitServiceImpl { FREE_TIER_ACTOR_CNT_SOFT_LIMIT, FREE_TIER_ACTOR_CNT_HARD_LIMIT, ), - Err(e) => { - tracing::warn!(error=error=%err.as_report(), "Failed to get license tier: {}"); + Err(err) => { + tracing::warn!(error=%err.as_report(), "Failed to get license tier: {}"); // Default to use free tier limit if there is any license error ( FREE_TIER_ACTOR_CNT_SOFT_LIMIT, From e2fec45adfe38402bc1b03e9e805d8f926ee3b47 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 11:47:31 +0800 Subject: [PATCH 09/15] fix error log --- src/meta/service/src/cluster_limit_service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 220d8c579e5f..40dcdd8c4f76 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -25,6 +25,7 @@ use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService; use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse}; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; #[derive(Clone)] From 54f80c6d573a8e8c945f8f510753b5b720ce9d09 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 13:48:37 +0800 Subject: [PATCH 10/15] fix error log --- src/meta/service/src/cluster_limit_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 40dcdd8c4f76..5d18e33bdf40 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -53,7 +53,7 @@ impl ClusterLimitServiceImpl { FREE_TIER_ACTOR_CNT_HARD_LIMIT, ), Err(err) => { - tracing::warn!(error=%err.as_report(), "Failed to get license tier: {}"); + tracing::warn!(error=%err.as_report(), "Failed to get license tier."); // Default to use free tier limit if there is any license error ( FREE_TIER_ACTOR_CNT_SOFT_LIMIT, From 52df3b866b30c5cd543df3bbbf3f917e2eb6ffe1 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 14:12:37 +0800 Subject: [PATCH 11/15] fix test_builtin_view_definition --- src/frontend/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 24d7b6a23ee5..10dad2105ed9 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1066,7 +1066,7 @@ impl FrontendMetaClient for MockFrontendMetaClient { } async fn list_all_nodes(&self) -> RpcResult> { - unimplemented!() + Ok(vec![]) } async fn list_compact_task_progress(&self) -> RpcResult> { From ba37b365c36b38a6d5dc3f07ce9e9f16508cd7a9 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 18:37:39 +0800 Subject: [PATCH 12/15] address comment on proto --- proto/meta.proto | 9 +++++---- src/common/src/util/cluster_limit.rs | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 7a8393104930..77547fd7225c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -790,12 +790,13 @@ message RelationIdInfos { map map = 1; } -message WorkerActorCount { - uint64 actor_count = 1; - uint64 parallelism = 2; -} + message ActorCountPerParallelism { + message WorkerActorCount { + uint64 actor_count = 1; + uint64 parallelism = 2; + } map worker_id_to_actor_count = 1; uint64 hard_limit = 2; uint64 soft_limit = 3; diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs index 8097cf51ee64..a43edfb020bf 100644 --- a/src/common/src/util/cluster_limit.rs +++ b/src/common/src/util/cluster_limit.rs @@ -15,8 +15,9 @@ use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; +use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount; use risingwave_pb::meta::cluster_limit::PbLimit; -use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit, PbWorkerActorCount}; +use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit}; pub const FREE_TIER_ACTOR_CNT_SOFT_LIMIT: usize = 25; pub const FREE_TIER_ACTOR_CNT_HARD_LIMIT: usize = 100; From 6dfbf5f600e45e938b32066f2018df0b2ffa0758 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Wed, 4 Sep 2024 19:58:03 +0800 Subject: [PATCH 13/15] donot differentiate paid and free tier --- src/common/src/config.rs | 4 -- src/common/src/session_config/mod.rs | 17 +---- src/common/src/util/cluster_limit.rs | 4 -- src/frontend/src/session.rs | 63 ++++++++----------- src/license/src/manager.rs | 4 -- src/meta/service/src/cluster_limit_service.rs | 27 ++------ 6 files changed, 33 insertions(+), 86 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 21c0e7cf8243..ed7ac8619252 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -469,15 +469,11 @@ pub struct MetaDeveloperConfig { /// Max number of actor allowed per parallelism (default = 100). /// CREATE MV/Table will be noticed when the number of actors exceeds this limit. - /// This limit is effective only for paid tier clusters. - /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_SOFT_LIMIT` #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")] pub actor_cnt_per_worker_parallelism_soft_limit: usize, /// Max number of actor allowed per parallelism (default = 400). /// CREATE MV/Table will be rejected when the number of actors exceeds this limit. - /// This limit is effective only for paid tier clusters. - /// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT` #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] pub actor_cnt_per_worker_parallelism_hard_limit: usize, } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 095bfcc7b9f9..163aa1879939 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,12 +24,10 @@ use chrono_tz::Tz; pub use over_window::OverWindowCachePolicy; pub use query_mode::QueryMode; use risingwave_common_proc_macro::{ConfigDoc, SessionConfig}; -use risingwave_license::LicenseManager; pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use thiserror::Error; -use thiserror_ext::AsReport; use self::non_zero64::ConfigNonZeroU64; use crate::session_config::sink_decouple::SinkDecouple; @@ -298,8 +296,7 @@ pub struct SessionConfig { /// Bypass checks on cluster limits /// /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit. - /// This session variable is only mutable in paid tier. - #[parameter(default = false, check_hook = check_bypass_cluster_limits)] + #[parameter(default = false)] bypass_cluster_limits: bool, } @@ -327,18 +324,6 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } } -fn check_bypass_cluster_limits(_val: &bool) -> Result<(), String> { - match LicenseManager::get() - .tier() - .map_err(|e| e.to_report_string())? - { - risingwave_license::Tier::Free => { - Err("Bypassing cluster limits is only allowed in paid tier".to_string()) - } - risingwave_license::Tier::Paid => Ok(()), - } -} - impl SessionConfig { pub fn set_force_two_phase_agg( &mut self, diff --git a/src/common/src/util/cluster_limit.rs b/src/common/src/util/cluster_limit.rs index a43edfb020bf..048ea4fdab30 100644 --- a/src/common/src/util/cluster_limit.rs +++ b/src/common/src/util/cluster_limit.rs @@ -18,10 +18,6 @@ use std::fmt::{self, Display, Formatter}; use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount; use risingwave_pb::meta::cluster_limit::PbLimit; use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit}; - -pub const FREE_TIER_ACTOR_CNT_SOFT_LIMIT: usize = 25; -pub const FREE_TIER_ACTOR_CNT_HARD_LIMIT: usize = 100; - pub enum ClusterLimit { ActorCount(ActorCountPerParallelism), } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index e8ecbd86e676..d377a6fc758d 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -49,7 +49,6 @@ use risingwave_common::catalog::{ use risingwave_common::config::{ load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig, }; -use risingwave_common::license::LicenseManager; use risingwave_common::memory::MemoryContext; use risingwave_common::secret::LocalSecretManager; use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode}; @@ -60,6 +59,7 @@ use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::cluster_limit::ActorCountPerParallelism; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::util::{cluster_limit, resource_util}; @@ -77,7 +77,6 @@ use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient} use risingwave_sqlparser::ast::{ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use thiserror::Error; -use thiserror_ext::AsReport; use tokio::runtime::Builder; use tokio::sync::oneshot::Sender; use tokio::sync::watch; @@ -1198,47 +1197,39 @@ impl SessionImpl { } pub async fn check_cluster_limits(&self) -> Result<()> { - let bypass_cluster_limits = self.config().bypass_cluster_limits(); - let tier = match LicenseManager::get().tier() { - Ok(tier) => tier, - Err(e) => { - self.notice_to_user(e.to_report_string()); - // Default to free tier if license is not available. - risingwave_common::license::Tier::Free - } + if self.config().bypass_cluster_limits() { + return Ok(()); + } + + let gen_message = |violated_limit: &ActorCountPerParallelism, + exceed_hard_limit: bool| + -> String { + let (limit_type, action) = if exceed_hard_limit { + ("critical", "Please scale the cluster before proceeding!") + } else { + ("recommended", "Scaling the cluster is recommended.") + }; + format!( + "\n- {}\n- {}\n- {}\n- {}\n- {}\n{}", + format!("Actor count per parallelism exceeds the {} limit.", limit_type), + format!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action), + "Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.", + "You can bypass this check via SQL `SET bypass_cluster_limits TO true`.", + "You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.", + violated_limit, + ) }; + let limits = self.env().meta_client().get_cluster_limits().await?; for limit in limits { match limit { cluster_limit::ClusterLimit::ActorCount(l) => { if l.exceed_hard_limit() { - let mut msg = "\n- Actor count per parallelism exceeds the critical limit." - .to_string(); - msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Please scale the cluster before proceeding."); - if matches!(tier, risingwave_common::license::Tier::Free) { - msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage."); - } - msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); - msg.push_str(format!("\n{}", l).as_str()); - if bypass_cluster_limits { - // Only send a notice if `bypass_cluster_limits` is set. - self.notice_to_user(&msg); - } else { - // Return an error if `bypass_cluster_limits` is not set. - return Err(RwError::from(ErrorCode::ProtocolError(msg))); - } + return Err(RwError::from(ErrorCode::ProtocolError(gen_message( + &l, true, + )))); } else if l.exceed_soft_limit() { - // Send a notice if soft limit is exceeded. - let mut msg = - "\n- Actor count per parallelism exceeds the recommended limit." - .to_string(); - msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Scaling the cluster is recommended."); - if matches!(tier, risingwave_common::license::Tier::Free) { - msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage."); - } - msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`."); - msg.push_str(format!("\n{}", l).as_str()); - self.notice_to_user(&msg); + self.notice_to_user(gen_message(&l, false)); } } } diff --git a/src/license/src/manager.rs b/src/license/src/manager.rs index ee9e8e51da0d..5c1bc298388d 100644 --- a/src/license/src/manager.rs +++ b/src/license/src/manager.rs @@ -168,10 +168,6 @@ impl LicenseManager { } } - pub fn tier(&self) -> Result { - self.license().map(|l| l.tier) - } - /// Get the current license if it is valid. /// /// Since the license can expire, the returned license should not be cached by the caller. diff --git a/src/meta/service/src/cluster_limit_service.rs b/src/meta/service/src/cluster_limit_service.rs index 5d18e33bdf40..df19b24b234e 100644 --- a/src/meta/service/src/cluster_limit_service.rs +++ b/src/meta/service/src/cluster_limit_service.rs @@ -14,10 +14,8 @@ use std::collections::HashMap; -use risingwave_common::license::{LicenseManager, Tier}; use risingwave_common::util::cluster_limit::{ - ActorCountPerParallelism, ClusterLimit, WorkerActorCount, FREE_TIER_ACTOR_CNT_HARD_LIMIT, - FREE_TIER_ACTOR_CNT_SOFT_LIMIT, + ActorCountPerParallelism, ClusterLimit, WorkerActorCount, }; use risingwave_meta::manager::{MetaSrvEnv, MetadataManager, WorkerId}; use risingwave_meta::MetaResult; @@ -25,7 +23,6 @@ use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService; use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse}; -use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; #[derive(Clone)] @@ -43,24 +40,10 @@ impl ClusterLimitServiceImpl { } async fn get_active_actor_limit(&self) -> MetaResult> { - let (soft_limit, hard_limit) = match LicenseManager::get().tier() { - Ok(Tier::Paid) => ( - self.env.opts.actor_cnt_per_worker_parallelism_soft_limit, - self.env.opts.actor_cnt_per_worker_parallelism_hard_limit, - ), - Ok(Tier::Free) => ( - FREE_TIER_ACTOR_CNT_SOFT_LIMIT, - FREE_TIER_ACTOR_CNT_HARD_LIMIT, - ), - Err(err) => { - tracing::warn!(error=%err.as_report(), "Failed to get license tier."); - // Default to use free tier limit if there is any license error - ( - FREE_TIER_ACTOR_CNT_SOFT_LIMIT, - FREE_TIER_ACTOR_CNT_HARD_LIMIT, - ) - } - }; + let (soft_limit, hard_limit) = ( + self.env.opts.actor_cnt_per_worker_parallelism_soft_limit, + self.env.opts.actor_cnt_per_worker_parallelism_hard_limit, + ); let running_worker_parallelism: HashMap = self .metadata_manager From 88ff9eee0f6c123a346c92301e7afe5e018342f0 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 5 Sep 2024 17:57:38 +0800 Subject: [PATCH 14/15] fix check --- src/frontend/src/session.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index d377a6fc758d..a1150798951c 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -1211,8 +1211,8 @@ impl SessionImpl { }; format!( "\n- {}\n- {}\n- {}\n- {}\n- {}\n{}", - format!("Actor count per parallelism exceeds the {} limit.", limit_type), - format!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action), + format_args!("Actor count per parallelism exceeds the {} limit.", limit_type), + format_args!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action), "Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.", "You can bypass this check via SQL `SET bypass_cluster_limits TO true`.", "You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.", From 3e7d5c2ea2bc14158f6f8c8dfff45fcc530fb2d0 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Fri, 6 Sep 2024 12:46:18 +0800 Subject: [PATCH 15/15] fix newline --- proto/meta.proto | 2 -- 1 file changed, 2 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 77547fd7225c..1b523903108a 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -790,8 +790,6 @@ message RelationIdInfos { map map = 1; } - - message ActorCountPerParallelism { message WorkerActorCount { uint64 actor_count = 1;