Skip to content

Commit

Permalink
feat: introduce cluster limit (#18383)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Sep 6, 2024
1 parent b00d750 commit 4accc4a
Show file tree
Hide file tree
Showing 21 changed files with 439 additions and 12 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bypass_cluster_limits
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
Expand Down
27 changes: 27 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,30 @@ message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}

message ActorCountPerParallelism {
message WorkerActorCount {
uint64 actor_count = 1;
uint64 parallelism = 2;
}
map<uint32, WorkerActorCount> worker_id_to_actor_count = 1;
uint64 hard_limit = 2;
uint64 soft_limit = 3;
}

message ClusterLimit {
oneof limit {
ActorCountPerParallelism actor_count = 1;
// TODO: limit DDL using compaction pending bytes
}
}

message GetClusterLimitsRequest {}

message GetClusterLimitsResponse {
repeated ClusterLimit active_limits = 1;
}

service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}
18 changes: 18 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,16 @@ pub struct MetaDeveloperConfig {

#[serde(default = "default::developer::max_get_task_probe_times")]
pub max_get_task_probe_times: usize,

/// Max number of actor allowed per parallelism (default = 100).
/// CREATE MV/Table will be noticed when the number of actors exceeds this limit.
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")]
pub actor_cnt_per_worker_parallelism_soft_limit: usize,

/// Max number of actor allowed per parallelism (default = 400).
/// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1859,6 +1869,14 @@ pub mod default {
5
}

pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
100
}

pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
400
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ pub struct SessionConfig {

#[parameter(default = "hex", check_hook = check_bytea_output)]
bytea_output: String,

/// Bypass checks on cluster limits
///
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
#[parameter(default = false)]
bypass_cluster_limits: bool,
}

fn check_timezone(val: &str) -> Result<(), String> {
Expand Down
134 changes: 134 additions & 0 deletions src/common/src/util/cluster_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};

use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
use risingwave_pb::meta::cluster_limit::PbLimit;
use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
pub enum ClusterLimit {
ActorCount(ActorCountPerParallelism),
}

impl From<ClusterLimit> for PbClusterLimit {
fn from(limit: ClusterLimit) -> Self {
match limit {
ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
},
}
}
}

impl From<PbClusterLimit> for ClusterLimit {
fn from(pb_limit: PbClusterLimit) -> Self {
match pb_limit.limit.unwrap() {
PbLimit::ActorCount(actor_count_per_parallelism) => {
ClusterLimit::ActorCount(actor_count_per_parallelism.into())
}
}
}
}

#[derive(Debug)]
pub struct WorkerActorCount {
pub actor_count: usize,
pub parallelism: usize,
}

impl From<WorkerActorCount> for PbWorkerActorCount {
fn from(worker_actor_count: WorkerActorCount) -> Self {
PbWorkerActorCount {
actor_count: worker_actor_count.actor_count as u64,
parallelism: worker_actor_count.parallelism as u64,
}
}
}

impl From<PbWorkerActorCount> for WorkerActorCount {
fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
WorkerActorCount {
actor_count: pb_worker_actor_count.actor_count as usize,
parallelism: pb_worker_actor_count.parallelism as usize,
}
}
}

pub struct ActorCountPerParallelism {
pub worker_id_to_actor_count: HashMap<u32, WorkerActorCount>,
pub hard_limit: usize,
pub soft_limit: usize,
}

impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
PbActorCountPerParallelism {
worker_id_to_actor_count: actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: actor_count_per_parallelism.hard_limit as u64,
soft_limit: actor_count_per_parallelism.soft_limit as u64,
}
}
}

impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
ActorCountPerParallelism {
worker_id_to_actor_count: pb_actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
}
}
}

impl ActorCountPerParallelism {
pub fn exceed_hard_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
}

pub fn exceed_soft_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
}

pub fn exceed_limit(&self) -> bool {
self.exceed_soft_limit() || self.exceed_hard_limit()
}
}

impl Display for ActorCountPerParallelism {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let worker_id_to_actor_count_str: Vec<_> = self
.worker_id_to_actor_count
.iter()
.map(|(k, v)| format!("{} -> {:?}", k, v))
.collect();
write!(
f,
"ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}",
self.hard_limit, self.soft_limit, worker_id_to_actor_count_str
)
}
}
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;
pub use tokio_util;
pub mod cluster_limit;
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false
meta_max_trivial_move_task_count_per_loop = 256
meta_max_get_task_probe_times = 5
meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400

[batch]
enable_barrier_read = false
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ mod rw_worker_nodes;

mod rw_actor_id_to_ddl;
mod rw_fragment_id_to_ddl;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

#[system_catalog(
view,
"rw_catalog.rw_worker_actor_count",
"SELECT t2.id as worker_id, parallelism, count(*) as actor_count
FROM rw_actors t1, rw_worker_nodes t2
where t1.worker_id = t2.id
GROUP BY t2.id, t2.parallelism;"
)]
#[derive(Fields)]
struct RwWorkerActorCount {
worker_id: i32,
parallelism: i32,
actor_count: i64,
}
3 changes: 3 additions & 0 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ pub async fn handle_create_mv_bound(
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

// Check cluster limits
session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
name.clone(),
StatementType::CREATE_MATERIALIZED_VIEW,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ pub async fn handle_create_sink(
) -> Result<RwPgResponse> {
let session = handle_args.session.clone();

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.sink_name.clone(),
StatementType::CREATE_SINK,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,8 @@ pub async fn handle_create_table(
session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
}

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
table_name.clone(),
StatementType::CREATE_TABLE,
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use anyhow::Context;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::cluster_limit::ClusterLimit;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
Expand Down Expand Up @@ -136,6 +137,8 @@ pub trait FrontendMetaClient: Send + Sync {
) -> Result<Vec<u64>>;

async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -345,4 +348,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
self.0.get_cluster_recovery_status().await
}

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
self.0.get_cluster_limits().await
}
}
44 changes: 43 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::resource_util;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
Expand Down Expand Up @@ -1194,6 +1195,47 @@ impl SessionImpl {
pub fn temporary_source_manager(&self) -> TemporarySourceManager {
self.temporary_source_manager.lock().clone()
}

pub async fn check_cluster_limits(&self) -> Result<()> {
if self.config().bypass_cluster_limits() {
return Ok(());
}

let gen_message = |violated_limit: &ActorCountPerParallelism,
exceed_hard_limit: bool|
-> String {
let (limit_type, action) = if exceed_hard_limit {
("critical", "Please scale the cluster before proceeding!")
} else {
("recommended", "Scaling the cluster is recommended.")
};
format!(
"\n- {}\n- {}\n- {}\n- {}\n- {}\n{}",
format_args!("Actor count per parallelism exceeds the {} limit.", limit_type),
format_args!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action),
"Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.",
"You can bypass this check via SQL `SET bypass_cluster_limits TO true`.",
"You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.",
violated_limit,
)
};

let limits = self.env().meta_client().get_cluster_limits().await?;
for limit in limits {
match limit {
cluster_limit::ClusterLimit::ActorCount(l) => {
if l.exceed_hard_limit() {
return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
&l, true,
))));
} else if l.exceed_soft_limit() {
self.notice_to_user(gen_message(&l, false));
}
}
}
}
Ok(())
}
}

pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
Expand Down
Loading

0 comments on commit 4accc4a

Please sign in to comment.