Skip to content

Commit

Permalink
donot differentiate paid and free tier
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Sep 4, 2024
1 parent ba37b36 commit 6dfbf5f
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 86 deletions.
4 changes: 0 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,11 @@ pub struct MetaDeveloperConfig {

/// Max number of actor allowed per parallelism (default = 100).
/// CREATE MV/Table will be noticed when the number of actors exceeds this limit.
/// This limit is effective only for paid tier clusters.
/// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_SOFT_LIMIT`
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")]
pub actor_cnt_per_worker_parallelism_soft_limit: usize,

/// Max number of actor allowed per parallelism (default = 400).
/// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
/// This limit is effective only for paid tier clusters.
/// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT`
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,
}
Expand Down
17 changes: 1 addition & 16 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ use chrono_tz::Tz;
pub use over_window::OverWindowCachePolicy;
pub use query_mode::QueryMode;
use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
use risingwave_license::LicenseManager;
pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use thiserror_ext::AsReport;

use self::non_zero64::ConfigNonZeroU64;
use crate::session_config::sink_decouple::SinkDecouple;
Expand Down Expand Up @@ -298,8 +296,7 @@ pub struct SessionConfig {
/// Bypass checks on cluster limits
///
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
/// This session variable is only mutable in paid tier.
#[parameter(default = false, check_hook = check_bypass_cluster_limits)]
#[parameter(default = false)]
bypass_cluster_limits: bool,
}

Expand Down Expand Up @@ -327,18 +324,6 @@ fn check_bytea_output(val: &str) -> Result<(), String> {
}
}

fn check_bypass_cluster_limits(_val: &bool) -> Result<(), String> {
match LicenseManager::get()
.tier()
.map_err(|e| e.to_report_string())?
{
risingwave_license::Tier::Free => {
Err("Bypassing cluster limits is only allowed in paid tier".to_string())
}
risingwave_license::Tier::Paid => Ok(()),
}
}

impl SessionConfig {
pub fn set_force_two_phase_agg(
&mut self,
Expand Down
4 changes: 0 additions & 4 deletions src/common/src/util/cluster_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ use std::fmt::{self, Display, Formatter};
use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
use risingwave_pb::meta::cluster_limit::PbLimit;
use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};

pub const FREE_TIER_ACTOR_CNT_SOFT_LIMIT: usize = 25;
pub const FREE_TIER_ACTOR_CNT_HARD_LIMIT: usize = 100;

pub enum ClusterLimit {
ActorCount(ActorCountPerParallelism),
}
Expand Down
63 changes: 27 additions & 36 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use risingwave_common::catalog::{
use risingwave_common::config::{
load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig,
};
use risingwave_common::license::LicenseManager;
use risingwave_common::memory::MemoryContext;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
Expand All @@ -60,6 +59,7 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
Expand All @@ -77,7 +77,6 @@ use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient}
use risingwave_sqlparser::ast::{ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror::Error;
use thiserror_ext::AsReport;
use tokio::runtime::Builder;
use tokio::sync::oneshot::Sender;
use tokio::sync::watch;
Expand Down Expand Up @@ -1198,47 +1197,39 @@ impl SessionImpl {
}

pub async fn check_cluster_limits(&self) -> Result<()> {
let bypass_cluster_limits = self.config().bypass_cluster_limits();
let tier = match LicenseManager::get().tier() {
Ok(tier) => tier,
Err(e) => {
self.notice_to_user(e.to_report_string());
// Default to free tier if license is not available.
risingwave_common::license::Tier::Free
}
if self.config().bypass_cluster_limits() {
return Ok(());
}

let gen_message = |violated_limit: &ActorCountPerParallelism,
exceed_hard_limit: bool|
-> String {
let (limit_type, action) = if exceed_hard_limit {
("critical", "Please scale the cluster before proceeding!")
} else {
("recommended", "Scaling the cluster is recommended.")
};
format!(
"\n- {}\n- {}\n- {}\n- {}\n- {}\n{}",
format!("Actor count per parallelism exceeds the {} limit.", limit_type),
format!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action),
"Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.",
"You can bypass this check via SQL `SET bypass_cluster_limits TO true`.",
"You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.",
violated_limit,
)
};

let limits = self.env().meta_client().get_cluster_limits().await?;
for limit in limits {
match limit {
cluster_limit::ClusterLimit::ActorCount(l) => {
if l.exceed_hard_limit() {
let mut msg = "\n- Actor count per parallelism exceeds the critical limit."
.to_string();
msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Please scale the cluster before proceeding.");
if matches!(tier, risingwave_common::license::Tier::Free) {
msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage.");
}
msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.");
msg.push_str(format!("\n{}", l).as_str());
if bypass_cluster_limits {
// Only send a notice if `bypass_cluster_limits` is set.
self.notice_to_user(&msg);
} else {
// Return an error if `bypass_cluster_limits` is not set.
return Err(RwError::from(ErrorCode::ProtocolError(msg)));
}
return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
&l, true,
))));
} else if l.exceed_soft_limit() {
// Send a notice if soft limit is exceeded.
let mut msg =
"\n- Actor count per parallelism exceeds the recommended limit."
.to_string();
msg.push_str("\n- This may overload the cluster and cause performance/stability issues. Scaling the cluster is recommended.");
if matches!(tier, risingwave_common::license::Tier::Free) {
msg.push_str("\n- Contact us via https://risingwave.com/contact-us/ and consider upgrading your free-tier license to enhance performance/stability and user experience for production usage.");
}
msg.push_str("\n- You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.");
msg.push_str(format!("\n{}", l).as_str());
self.notice_to_user(&msg);
self.notice_to_user(gen_message(&l, false));
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/license/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ impl LicenseManager {
}
}

pub fn tier(&self) -> Result<Tier, LicenseKeyError> {
self.license().map(|l| l.tier)
}

/// Get the current license if it is valid.
///
/// Since the license can expire, the returned license should not be cached by the caller.
Expand Down
27 changes: 5 additions & 22 deletions src/meta/service/src/cluster_limit_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@

use std::collections::HashMap;

use risingwave_common::license::{LicenseManager, Tier};
use risingwave_common::util::cluster_limit::{
ActorCountPerParallelism, ClusterLimit, WorkerActorCount, FREE_TIER_ACTOR_CNT_HARD_LIMIT,
FREE_TIER_ACTOR_CNT_SOFT_LIMIT,
ActorCountPerParallelism, ClusterLimit, WorkerActorCount,
};
use risingwave_meta::manager::{MetaSrvEnv, MetadataManager, WorkerId};
use risingwave_meta::MetaResult;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitService;
use risingwave_pb::meta::{GetClusterLimitsRequest, GetClusterLimitsResponse};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

#[derive(Clone)]
Expand All @@ -43,24 +40,10 @@ impl ClusterLimitServiceImpl {
}

async fn get_active_actor_limit(&self) -> MetaResult<Option<ClusterLimit>> {
let (soft_limit, hard_limit) = match LicenseManager::get().tier() {
Ok(Tier::Paid) => (
self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
),
Ok(Tier::Free) => (
FREE_TIER_ACTOR_CNT_SOFT_LIMIT,
FREE_TIER_ACTOR_CNT_HARD_LIMIT,
),
Err(err) => {
tracing::warn!(error=%err.as_report(), "Failed to get license tier.");
// Default to use free tier limit if there is any license error
(
FREE_TIER_ACTOR_CNT_SOFT_LIMIT,
FREE_TIER_ACTOR_CNT_HARD_LIMIT,
)
}
};
let (soft_limit, hard_limit) = (
self.env.opts.actor_cnt_per_worker_parallelism_soft_limit,
self.env.opts.actor_cnt_per_worker_parallelism_hard_limit,
);

let running_worker_parallelism: HashMap<WorkerId, usize> = self
.metadata_manager
Expand Down

0 comments on commit 6dfbf5f

Please sign in to comment.