Skip to content

Commit

Permalink
feat(license): add limit for total cpu cores in the cluster (#18022)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 19, 2024
1 parent f1fd63e commit 7f80900
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 4 deletions.
99 changes: 99 additions & 0 deletions src/license/src/cpu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;

use thiserror::Error;

use crate::{LicenseKeyError, LicenseManager};

/// The error type for CPU core limit exceeded as per the license key.
#[derive(Debug, Clone, Error)]
#[error("invalid license key")]
pub enum CpuCoreLimitExceeded {
#[error("cannot check CPU core limit due to license key error")]
LicenseKeyError(#[from] LicenseKeyError),

#[error(
"CPU core limit exceeded as per the license key, \
requesting {actual} while the maximum allowed is {limit}"
)]
Exceeded { limit: NonZeroU64, actual: u64 },
}

impl LicenseManager {
/// Check if the given CPU core count exceeds the limit as per the license key.
pub fn check_cpu_core_limit(&self, cpu_core_count: u64) -> Result<(), CpuCoreLimitExceeded> {
let license = self.license()?;

match license.cpu_core_limit {
Some(limit) if cpu_core_count > limit.get() => Err(CpuCoreLimitExceeded::Exceeded {
limit,
actual: cpu_core_count,
}),
_ => Ok(()),
}
}
}

// Tests below only work in debug mode.
#[cfg(debug_assertions)]
#[cfg(test)]
mod tests {
use expect_test::expect;
use thiserror_ext::AsReport as _;

use super::*;
use crate::{LicenseKey, TEST_PAID_LICENSE_KEY_CONTENT};

fn do_test(key: &str, cpu_core_count: u64, expect: expect_test::Expect) {
let manager = LicenseManager::new();
manager.refresh(LicenseKey(key));

match manager.check_cpu_core_limit(cpu_core_count) {
Ok(_) => expect.assert_eq("ok"),
Err(error) => expect.assert_eq(&error.to_report_string()),
}
}

#[test]
fn test_no_limit() {
do_test(TEST_PAID_LICENSE_KEY_CONTENT, 114514, expect!["ok"]);
}

#[test]
fn test_no_license_key_no_limit() {
do_test("", 114514, expect!["ok"]);
}

#[test]
fn test_invalid_license_key() {
const KEY: &str = "invalid";

do_test(KEY, 0, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
do_test(KEY, 114514, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
}

#[test]
fn test_limit() {
const KEY: &str =
"eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
eyJzdWIiOiJmcmVlLXRlc3QtMzIiLCJpc3MiOiJwcm9kLnJpc2luZ3dhdmUuY29tIiwidGllciI6ImZyZWUiLCJleHAiOjE4NTI1NTk5OTksImlhdCI6MTcyMzcwMTk5NCwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\
rsATtzlduLUkGQeXkOROtyGUpafdDhi18iKdYAzAldWQuO9KevNcnD8a6geCShZSGte65bI7oYtv7GHx8i66ge3B1SVsgGgYr10ebphPUNUQenYoN0mpD4Wn0prPStOgANzYZOI2ntMGAaeWStji1x67_iho6r0W9r6RX3kMvzFSbiObSIfvTdrMULeg-xeHc3bT_ErRhaXq7MAa2Oiq3lcK2sNgEvc9KYSP9YbhSik9CBkc8lcyeVoc48SSWEaBU-c8-Ge0fzjgWHI9KIsUV5Ihe66KEfs0PqdRoSWbgskYGzA3o8wHIbtJbJiPzra373kkFH9MGY0HOsw9QeJLGQ";

do_test(KEY, 31, expect!["ok"]);
do_test(KEY, 32, expect!["ok"]);
do_test(KEY, 33, expect!["CPU core limit exceeded as per the license key, requesting 33 while the maximum allowed is 32"]);
}
}
1 change: 1 addition & 0 deletions src/license/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod cpu;
mod feature;
mod key;
mod manager;
Expand Down
10 changes: 9 additions & 1 deletion src/license/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;
use std::sync::{LazyLock, RwLock};

use jsonwebtoken::{Algorithm, DecodingKey, Validation};
Expand Down Expand Up @@ -76,6 +77,9 @@ pub(super) struct License {
/// Tier of the license.
pub tier: Tier,

/// Maximum number of compute-node CPU cores allowed to use. Typically used for the paid tier.
pub cpu_core_limit: Option<NonZeroU64>,

/// Expiration time in seconds since UNIX epoch.
///
/// See <https://tools.ietf.org/html/rfc7519#section-4.1.4>.
Expand All @@ -91,6 +95,7 @@ impl Default for License {
sub: "default".to_owned(),
tier: Tier::Free,
iss: Issuer::Prod,
cpu_core_limit: None,
exp: u64::MAX,
}
}
Expand All @@ -117,7 +122,7 @@ static PUBLIC_KEY: LazyLock<DecodingKey> = LazyLock::new(|| {

impl LicenseManager {
/// Create a new license manager with the default license.
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
inner: RwLock::new(Inner {
license: Ok(License::default()),
Expand Down Expand Up @@ -208,6 +213,7 @@ mod tests {
sub: "rw-test",
iss: Test,
tier: Paid,
cpu_core_limit: None,
exp: 9999999999,
}
"#]],
Expand All @@ -228,6 +234,7 @@ mod tests {
sub: "rw-test",
iss: Test,
tier: Free,
cpu_core_limit: None,
exp: 9999999999,
}
"#]],
Expand All @@ -244,6 +251,7 @@ mod tests {
sub: "default",
iss: Prod,
tier: Free,
cpu_core_limit: None,
exp: 18446744073709551615,
}
"#]],
Expand Down
47 changes: 45 additions & 2 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::RW_VERSION;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_license::LicenseManager;
use risingwave_meta_model_v2::prelude::{Worker, WorkerProperty};
use risingwave_meta_model_v2::worker::{WorkerStatus, WorkerType};
use risingwave_meta_model_v2::{worker, worker_property, TransactionId, WorkerId};
Expand All @@ -38,8 +39,8 @@ use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulabili
use sea_orm::prelude::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect,
TransactionTrait,
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
QueryFilter, QuerySelect, TransactionTrait,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
Expand Down Expand Up @@ -581,6 +582,43 @@ impl ClusterControllerInner {
}
}

/// Check if the total CPU cores in the cluster exceed the license limit, after counting the
/// newly joined compute node.
pub async fn check_cpu_core_limit_on_newly_joined_compute_node(
&self,
txn: &DatabaseTransaction,
host_address: &HostAddress,
resource: &PbResource,
) -> MetaResult<()> {
let this = resource.total_cpu_cores;

let other_worker_ids: Vec<WorkerId> = Worker::find()
.filter(
(worker::Column::Host
.eq(host_address.host.clone())
.and(worker::Column::Port.eq(host_address.port)))
.not()
.and(worker::Column::WorkerType.eq(WorkerType::ComputeNode as i32)),
)
.select_only()
.column(worker::Column::WorkerId)
.into_tuple()
.all(txn)
.await?;

let others = other_worker_ids
.into_iter()
.flat_map(|id| self.worker_extra_info.get(&id))
.flat_map(|info| info.resource.as_ref().map(|r| r.total_cpu_cores))
.sum::<u64>();

LicenseManager::get()
.check_cpu_core_limit(this + others)
.map_err(anyhow::Error::from)?;

Ok(())
}

pub async fn add_worker(
&mut self,
r#type: PbWorkerType,
Expand All @@ -591,6 +629,11 @@ impl ClusterControllerInner {
) -> MetaResult<WorkerId> {
let txn = self.db.begin().await?;

if let PbWorkerType::ComputeNode = r#type {
self.check_cpu_core_limit_on_newly_joined_compute_node(&txn, &host_address, &resource)
.await?;
}

let worker = Worker::find()
.filter(
worker::Column::Host
Expand Down
33 changes: 32 additions & 1 deletion src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::RW_VERSION;
use risingwave_pb::common::worker_node::{Property, State};
use risingwave_license::LicenseManager;
use risingwave_pb::common::worker_node::{Property, Resource, State};
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
Expand Down Expand Up @@ -114,6 +115,13 @@ impl ClusterManager {
let mut property = self.parse_property(r#type, property);
let mut core = self.core.write().await;

if let WorkerType::ComputeNode = r#type {
core.check_cpu_core_limit_on_newly_joined_compute_node(
host_address.clone(),
&resource,
)?;
}

if let Some(worker) = core.get_worker_by_host_mut(host_address.clone()) {
tracing::info!("worker {} re-joined the cluster", worker.worker_id());
worker.update_resource(Some(resource));
Expand Down Expand Up @@ -631,6 +639,29 @@ impl ClusterManagerCore {
.map(|(_, worker)| worker.clone())
}

/// Check if the total CPU cores in the cluster exceed the license limit, after counting the
/// newly joined compute node.
pub fn check_cpu_core_limit_on_newly_joined_compute_node(
&self,
host_address: HostAddress,
resource: &Resource,
) -> MetaResult<()> {
let this_key = WorkerKey(host_address);

let this = resource.total_cpu_cores;
let others = (self.workers.iter())
.filter(|(k, _v)| k != &&this_key)
.filter(|(_k, v)| v.worker_node.r#type == WorkerType::ComputeNode as i32)
.flat_map(|(_k, v)| v.resource.as_ref().map(|r| r.total_cpu_cores))
.sum::<u64>();

LicenseManager::get()
.check_cpu_core_limit(this + others)
.map_err(anyhow::Error::from)?;

Ok(())
}

fn add_worker_node(&mut self, worker: Worker) {
if let Some(transactional_id) = worker.worker_node.transactional_id {
self.available_transactional_ids
Expand Down

0 comments on commit 7f80900

Please sign in to comment.