From 7f80900147bc3da2db31cf386bb25a1b86cbefc0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 19 Aug 2024 17:01:06 +0800 Subject: [PATCH] feat(license): add limit for total cpu cores in the cluster (#18022) Signed-off-by: Bugen Zhao --- src/license/src/cpu.rs | 99 ++++++++++++++++++++++++++++++ src/license/src/lib.rs | 1 + src/license/src/manager.rs | 10 ++- src/meta/src/controller/cluster.rs | 47 +++++++++++++- src/meta/src/manager/cluster.rs | 33 +++++++++- 5 files changed, 186 insertions(+), 4 deletions(-) create mode 100644 src/license/src/cpu.rs diff --git a/src/license/src/cpu.rs b/src/license/src/cpu.rs new file mode 100644 index 0000000000000..b2c23733ce755 --- /dev/null +++ b/src/license/src/cpu.rs @@ -0,0 +1,99 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroU64; + +use thiserror::Error; + +use crate::{LicenseKeyError, LicenseManager}; + +/// The error type for CPU core limit exceeded as per the license key. +#[derive(Debug, Clone, Error)] +#[error("invalid license key")] +pub enum CpuCoreLimitExceeded { + #[error("cannot check CPU core limit due to license key error")] + LicenseKeyError(#[from] LicenseKeyError), + + #[error( + "CPU core limit exceeded as per the license key, \ + requesting {actual} while the maximum allowed is {limit}" + )] + Exceeded { limit: NonZeroU64, actual: u64 }, +} + +impl LicenseManager { + /// Check if the given CPU core count exceeds the limit as per the license key. + pub fn check_cpu_core_limit(&self, cpu_core_count: u64) -> Result<(), CpuCoreLimitExceeded> { + let license = self.license()?; + + match license.cpu_core_limit { + Some(limit) if cpu_core_count > limit.get() => Err(CpuCoreLimitExceeded::Exceeded { + limit, + actual: cpu_core_count, + }), + _ => Ok(()), + } + } +} + +// Tests below only work in debug mode. +#[cfg(debug_assertions)] +#[cfg(test)] +mod tests { + use expect_test::expect; + use thiserror_ext::AsReport as _; + + use super::*; + use crate::{LicenseKey, TEST_PAID_LICENSE_KEY_CONTENT}; + + fn do_test(key: &str, cpu_core_count: u64, expect: expect_test::Expect) { + let manager = LicenseManager::new(); + manager.refresh(LicenseKey(key)); + + match manager.check_cpu_core_limit(cpu_core_count) { + Ok(_) => expect.assert_eq("ok"), + Err(error) => expect.assert_eq(&error.to_report_string()), + } + } + + #[test] + fn test_no_limit() { + do_test(TEST_PAID_LICENSE_KEY_CONTENT, 114514, expect!["ok"]); + } + + #[test] + fn test_no_license_key_no_limit() { + do_test("", 114514, expect!["ok"]); + } + + #[test] + fn test_invalid_license_key() { + const KEY: &str = "invalid"; + + do_test(KEY, 0, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]); + do_test(KEY, 114514, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]); + } + + #[test] + fn test_limit() { + const KEY: &str = + "eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\ + eyJzdWIiOiJmcmVlLXRlc3QtMzIiLCJpc3MiOiJwcm9kLnJpc2luZ3dhdmUuY29tIiwidGllciI6ImZyZWUiLCJleHAiOjE4NTI1NTk5OTksImlhdCI6MTcyMzcwMTk5NCwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\ + rsATtzlduLUkGQeXkOROtyGUpafdDhi18iKdYAzAldWQuO9KevNcnD8a6geCShZSGte65bI7oYtv7GHx8i66ge3B1SVsgGgYr10ebphPUNUQenYoN0mpD4Wn0prPStOgANzYZOI2ntMGAaeWStji1x67_iho6r0W9r6RX3kMvzFSbiObSIfvTdrMULeg-xeHc3bT_ErRhaXq7MAa2Oiq3lcK2sNgEvc9KYSP9YbhSik9CBkc8lcyeVoc48SSWEaBU-c8-Ge0fzjgWHI9KIsUV5Ihe66KEfs0PqdRoSWbgskYGzA3o8wHIbtJbJiPzra373kkFH9MGY0HOsw9QeJLGQ"; + + do_test(KEY, 31, expect!["ok"]); + do_test(KEY, 32, expect!["ok"]); + do_test(KEY, 33, expect!["CPU core limit exceeded as per the license key, requesting 33 while the maximum allowed is 32"]); + } +} diff --git a/src/license/src/lib.rs b/src/license/src/lib.rs index bdcac90441043..e2a3275780098 100644 --- a/src/license/src/lib.rs +++ b/src/license/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod cpu; mod feature; mod key; mod manager; diff --git a/src/license/src/manager.rs b/src/license/src/manager.rs index cac51105358ae..5c1bc298388da 100644 --- a/src/license/src/manager.rs +++ b/src/license/src/manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroU64; use std::sync::{LazyLock, RwLock}; use jsonwebtoken::{Algorithm, DecodingKey, Validation}; @@ -76,6 +77,9 @@ pub(super) struct License { /// Tier of the license. pub tier: Tier, + /// Maximum number of compute-node CPU cores allowed to use. Typically used for the paid tier. + pub cpu_core_limit: Option, + /// Expiration time in seconds since UNIX epoch. /// /// See . @@ -91,6 +95,7 @@ impl Default for License { sub: "default".to_owned(), tier: Tier::Free, iss: Issuer::Prod, + cpu_core_limit: None, exp: u64::MAX, } } @@ -117,7 +122,7 @@ static PUBLIC_KEY: LazyLock = LazyLock::new(|| { impl LicenseManager { /// Create a new license manager with the default license. - fn new() -> Self { + pub(crate) fn new() -> Self { Self { inner: RwLock::new(Inner { license: Ok(License::default()), @@ -208,6 +213,7 @@ mod tests { sub: "rw-test", iss: Test, tier: Paid, + cpu_core_limit: None, exp: 9999999999, } "#]], @@ -228,6 +234,7 @@ mod tests { sub: "rw-test", iss: Test, tier: Free, + cpu_core_limit: None, exp: 9999999999, } "#]], @@ -244,6 +251,7 @@ mod tests { sub: "default", iss: Prod, tier: Free, + cpu_core_limit: None, exp: 18446744073709551615, } "#]], diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 6a7ca826f160a..10d4a947e2a9a 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -26,6 +26,7 @@ use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_license::LicenseManager; use risingwave_meta_model_v2::prelude::{Worker, WorkerProperty}; use risingwave_meta_model_v2::worker::{WorkerStatus, WorkerType}; use risingwave_meta_model_v2::{worker, worker_property, TransactionId, WorkerId}; @@ -38,8 +39,8 @@ use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulabili use sea_orm::prelude::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ - ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, - TransactionTrait, + ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, + QueryFilter, QuerySelect, TransactionTrait, }; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -581,6 +582,43 @@ impl ClusterControllerInner { } } + /// Check if the total CPU cores in the cluster exceed the license limit, after counting the + /// newly joined compute node. + pub async fn check_cpu_core_limit_on_newly_joined_compute_node( + &self, + txn: &DatabaseTransaction, + host_address: &HostAddress, + resource: &PbResource, + ) -> MetaResult<()> { + let this = resource.total_cpu_cores; + + let other_worker_ids: Vec = Worker::find() + .filter( + (worker::Column::Host + .eq(host_address.host.clone()) + .and(worker::Column::Port.eq(host_address.port))) + .not() + .and(worker::Column::WorkerType.eq(WorkerType::ComputeNode as i32)), + ) + .select_only() + .column(worker::Column::WorkerId) + .into_tuple() + .all(txn) + .await?; + + let others = other_worker_ids + .into_iter() + .flat_map(|id| self.worker_extra_info.get(&id)) + .flat_map(|info| info.resource.as_ref().map(|r| r.total_cpu_cores)) + .sum::(); + + LicenseManager::get() + .check_cpu_core_limit(this + others) + .map_err(anyhow::Error::from)?; + + Ok(()) + } + pub async fn add_worker( &mut self, r#type: PbWorkerType, @@ -591,6 +629,11 @@ impl ClusterControllerInner { ) -> MetaResult { let txn = self.db.begin().await?; + if let PbWorkerType::ComputeNode = r#type { + self.check_cpu_core_limit_on_newly_joined_compute_node(&txn, &host_address, &resource) + .await?; + } + let worker = Worker::find() .filter( worker::Column::Host diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 93e50dec3706b..d5c12c70a0b9c 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -24,7 +24,8 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::RW_VERSION; -use risingwave_pb::common::worker_node::{Property, State}; +use risingwave_license::LicenseManager; +use risingwave_pb::common::worker_node::{Property, Resource, State}; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; @@ -114,6 +115,13 @@ impl ClusterManager { let mut property = self.parse_property(r#type, property); let mut core = self.core.write().await; + if let WorkerType::ComputeNode = r#type { + core.check_cpu_core_limit_on_newly_joined_compute_node( + host_address.clone(), + &resource, + )?; + } + if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) { tracing::info!("worker {} re-joined the cluster", worker.worker_id()); worker.update_resource(Some(resource)); @@ -631,6 +639,29 @@ impl ClusterManagerCore { .map(|(_, worker)| worker.clone()) } + /// Check if the total CPU cores in the cluster exceed the license limit, after counting the + /// newly joined compute node. + pub fn check_cpu_core_limit_on_newly_joined_compute_node( + &self, + host_address: HostAddress, + resource: &Resource, + ) -> MetaResult<()> { + let this_key = WorkerKey(host_address); + + let this = resource.total_cpu_cores; + let others = (self.workers.iter()) + .filter(|(k, _v)| k != &&this_key) + .filter(|(_k, v)| v.worker_node.r#type == WorkerType::ComputeNode as i32) + .flat_map(|(_k, v)| v.resource.as_ref().map(|r| r.total_cpu_cores)) + .sum::(); + + LicenseManager::get() + .check_cpu_core_limit(this + others) + .map_err(anyhow::Error::from)?; + + Ok(()) + } + fn add_worker_node(&mut self, worker: Worker) { if let Some(transactional_id) = worker.worker_node.transactional_id { self.available_transactional_ids