diff --git a/src/common/src/system_param/common.rs b/src/common/src/system_param/common.rs index eb660d2ac3bb5..d8ff741399533 100644 --- a/src/common/src/system_param/common.rs +++ b/src/common/src/system_param/common.rs @@ -12,37 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Mutex; - -use super::reader::{SystemParamsRead, SystemParamsReader}; +use super::diff::SystemParamsDiff; +use super::reader::SystemParamsReader; use crate::util::tracing::layer::toggle_otel_layer; /// Node-independent handler for system parameter changes. /// /// Currently, it is only used to enable or disable the distributed tracing layer. -pub struct CommonHandler { - last_params: Mutex>, -} +#[derive(Debug)] +pub struct CommonHandler; impl CommonHandler { /// Create a new handler with the initial parameters. pub fn new(initial: SystemParamsReader) -> Self { - let this = Self { - last_params: None.into(), - }; - this.handle_change(initial); + let this = Self; + this.handle_change(&SystemParamsDiff::from_initial(initial)); this } /// Handle the change of system parameters. - // TODO: directly call this method with the difference of old and new params. - pub fn handle_change(&self, new_params: SystemParamsReader) { - let mut last_params = self.last_params.lock().unwrap(); - - if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) { - toggle_otel_layer(new_params.enable_tracing()); + pub fn handle_change(&self, diff: &SystemParamsDiff) { + if let Some(enabled) = diff.enable_tracing { + toggle_otel_layer(enabled) } - - last_params.replace(new_params); } } diff --git a/src/common/src/system_param/diff.rs b/src/common/src/system_param/diff.rs new file mode 100644 index 0000000000000..243b3e247bec8 --- /dev/null +++ b/src/common/src/system_param/diff.rs @@ -0,0 +1,67 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::reader::SystemParamsRead; +use crate::for_all_params; + +macro_rules! define_diff { + ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => { + /// The diff of the system params. + /// + /// Fields that are changed are set to `Some`, otherwise `None`. + #[derive(Default, Debug, Clone)] + pub struct SystemParamsDiff { + $( + #[doc = $doc] + pub $field: Option<$type>, + )* + } + } +} +for_all_params!(define_diff); + +impl SystemParamsDiff { + /// Create a diff between the given two system params. + pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self { + let mut diff = Self::default(); + + macro_rules! set_diff_field { + ($({ $field:ident, $($rest:tt)* },)*) => { + $( + if curr.$field() != prev.$field() { + diff.$field = Some(curr.$field().to_owned()); + } + )* + }; + } + for_all_params!(set_diff_field); + + diff + } + + /// Create a diff from the given initial system params. + /// All fields will be set to `Some`. + pub fn from_initial(initial: impl SystemParamsRead) -> Self { + macro_rules! initial_field { + ($({ $field:ident, $($rest:tt)* },)*) => { + Self { + $( + $field: Some(initial.$field().to_owned()), + )* + } + }; + } + for_all_params!(initial_field) + } +} diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index 312c5577a0f81..5040d30f811d0 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -20,6 +20,7 @@ use risingwave_pb::meta::SystemParams; use tokio::sync::watch::{channel, Receiver, Sender}; use super::common::CommonHandler; +use super::diff::SystemParamsDiff; use super::reader::SystemParamsReader; use super::system_params_for_test; @@ -41,28 +42,39 @@ pub struct LocalSystemParamsManager { } impl LocalSystemParamsManager { + /// Create a new instance of `LocalSystemParamsManager` and spawn a task to run + /// the common handler. pub fn new(initial_params: SystemParamsReader) -> Self { - let params = Arc::new(ArcSwap::from_pointee(initial_params.clone())); - let (tx, _) = channel(params.clone()); + let this = Self::new_inner(initial_params.clone()); // Spawn a task to run the common handler. tokio::spawn({ - let mut rx = tx.subscribe(); + let mut rx = this.tx.subscribe(); async move { + let mut params = initial_params.clone(); let handler = CommonHandler::new(initial_params); while rx.changed().await.is_ok() { + // TODO: directly watch the changes instead of diffing ourselves. let new_params = (**rx.borrow_and_update().load()).clone(); - handler.handle_change(new_params); + let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref()); + handler.handle_change(&diff); + params = new_params; } } }); - Self { params, tx } + this } pub fn for_test() -> Self { - Self::new(system_params_for_test().into()) + Self::new_inner(system_params_for_test().into()) + } + + fn new_inner(initial_params: SystemParamsReader) -> Self { + let params = Arc::new(ArcSwap::from_pointee(initial_params)); + let (tx, _) = channel(params.clone()); + Self { params, tx } } pub fn get_params(&self) -> SystemParamsReaderRef { @@ -89,12 +101,11 @@ mod tests { #[tokio::test] async fn test_manager() { - let p = SystemParams::default().into(); - let manager = LocalSystemParamsManager::new(p); + let manager = LocalSystemParamsManager::for_test(); let shared_params = manager.get_params(); let new_params = SystemParams { - sstable_size_mb: Some(1), + sstable_size_mb: Some(114514), ..Default::default() }; diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 86dea616519d8..a55b236f4b310 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -21,6 +21,7 @@ //! - Add a new method to [`reader::SystemParamsReader`]. pub mod common; +pub mod diff; pub mod local_manager; pub mod reader; @@ -31,6 +32,8 @@ use std::str::FromStr; use paste::paste; use risingwave_pb::meta::PbSystemParams; +use self::diff::SystemParamsDiff; + pub type SystemParamsError = String; type Result = core::result::Result; @@ -300,28 +303,48 @@ macro_rules! impl_default_from_other_params { macro_rules! impl_set_system_param { ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => { - /// Set a system parameter with the given value or default one, returns the new value. - pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option) -> Result { - match key { + /// Set a system parameter with the given value or default one. + /// + /// Returns the new value if changed, or an error if the parameter is unrecognized + /// or the value is invalid. + pub fn set_system_param( + params: &mut PbSystemParams, + key: &str, + value: Option>, + ) -> Result> { + use crate::system_param::reader::{SystemParamsReader, SystemParamsRead}; + + match key { $( key_of!($field) => { let v = if let Some(v) = value { - v.parse().map_err(|_| format!("cannot parse parameter value"))? + v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))? } else { $default.ok_or_else(|| format!("{} does not have a default value", key))? }; OverrideValidateOnSet::$field(&v)?; - params.$field = Some(v.clone()); - return Ok(v.to_string()) + + let changed = SystemParamsReader::new(&*params).$field() != v; + if changed { + let new_value = v.to_string(); + let diff = SystemParamsDiff { + $field: Some(v.to_owned()), + ..Default::default() + }; + params.$field = Some(v); + Ok(Some((new_value, diff))) + } else { + Ok(None) + } }, )* _ => { - return Err(format!( + Err(format!( "unrecognized system param {:?}", key - )); + )) } - }; + } } }; } @@ -433,7 +456,7 @@ mod tests { #[test] fn test_set() { - let mut p = PbSystemParams::default(); + let mut p = system_params_for_test(); // Unrecognized param. assert!(set_system_param(&mut p, "?", Some("?".to_string())).is_err()); // Value out of range. diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 06509c1aa1916..c6b8d8c5af6aa 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Borrow; + use risingwave_pb::meta::PbSystemParams; use super::{default, system_params_to_kv, ParamValue}; @@ -40,13 +42,40 @@ for_all_params!(define_system_params_read_trait); /// /// See [`SystemParamsRead`] for more details. #[derive(Clone, Debug, PartialEq)] -pub struct SystemParamsReader { - prost: PbSystemParams, +pub struct SystemParamsReader { + inner: I, +} + +impl From for SystemParamsReader +where + I: Borrow, +{ + fn from(inner: I) -> Self { + Self { inner } + } } -impl From for SystemParamsReader { - fn from(prost: PbSystemParams) -> Self { - Self { prost } +impl SystemParamsReader +where + I: Borrow, +{ + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Return a new reader with the reference to the inner system params. + pub fn as_ref(&self) -> SystemParamsReader<&PbSystemParams> { + SystemParamsReader { + inner: self.inner(), + } + } + + pub fn to_kv(&self) -> Vec<(String, String)> { + system_params_to_kv(self.inner()).unwrap() + } + + fn inner(&self) -> &PbSystemParams { + self.inner.borrow() } } @@ -54,73 +83,70 @@ impl From for SystemParamsReader { /// For example, if a parameter is introduced before the initial public release. /// /// - Otherwise, specify the fallback logic when the field is missing. -impl SystemParamsRead for SystemParamsReader { +impl SystemParamsRead for SystemParamsReader +where + I: Borrow, +{ fn barrier_interval_ms(&self) -> u32 { - self.prost.barrier_interval_ms.unwrap() + self.inner().barrier_interval_ms.unwrap() } fn checkpoint_frequency(&self) -> u64 { - self.prost.checkpoint_frequency.unwrap() + self.inner().checkpoint_frequency.unwrap() } fn parallel_compact_size_mb(&self) -> u32 { - self.prost.parallel_compact_size_mb.unwrap() + self.inner().parallel_compact_size_mb.unwrap() } fn sstable_size_mb(&self) -> u32 { - self.prost.sstable_size_mb.unwrap() + self.inner().sstable_size_mb.unwrap() } fn block_size_kb(&self) -> u32 { - self.prost.block_size_kb.unwrap() + self.inner().block_size_kb.unwrap() } fn bloom_false_positive(&self) -> f64 { - self.prost.bloom_false_positive.unwrap() + self.inner().bloom_false_positive.unwrap() } fn state_store(&self) -> &str { - self.prost.state_store.as_ref().unwrap() + self.inner().state_store.as_ref().unwrap() } fn data_directory(&self) -> &str { - self.prost.data_directory.as_ref().unwrap() + self.inner().data_directory.as_ref().unwrap() } fn backup_storage_url(&self) -> &str { - self.prost.backup_storage_url.as_ref().unwrap() + self.inner().backup_storage_url.as_ref().unwrap() } fn backup_storage_directory(&self) -> &str { - self.prost.backup_storage_directory.as_ref().unwrap() + self.inner().backup_storage_directory.as_ref().unwrap() } fn max_concurrent_creating_streaming_jobs(&self) -> u32 { - self.prost.max_concurrent_creating_streaming_jobs.unwrap() + self.inner().max_concurrent_creating_streaming_jobs.unwrap() } fn pause_on_next_bootstrap(&self) -> bool { - self.prost + self.inner() .pause_on_next_bootstrap .unwrap_or_else(default::pause_on_next_bootstrap) } fn enable_tracing(&self) -> bool { - self.prost + self.inner() .enable_tracing .unwrap_or_else(default::enable_tracing) } fn wasm_storage_url(&self) -> &str { - self.prost + self.inner() .wasm_storage_url .as_ref() .unwrap_or(&default::WASM_STORAGE_URL) } } - -impl SystemParamsReader { - pub fn to_kv(&self) -> Vec<(String, String)> { - system_params_to_kv(&self.prost).unwrap() - } -} diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index e483645ead937..bb8f51479856b 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -195,15 +195,21 @@ impl SystemParamsController { }; let mut params = params_guard.clone(); let mut param: system_parameter::ActiveModel = param.into(); - param.value = - Set(set_system_param(&mut params, name, value).map_err(MetaError::system_params)?); + let Some((new_value, diff)) = + set_system_param(&mut params, name, value).map_err(MetaError::system_params)? + else { + // No changes on the parameter. + return Ok(params); + }; + + param.value = Set(new_value); param.update(&self.db).await?; *params_guard = params.clone(); - // TODO: check if the parameter is actually changed. - // Run common handler. - self.common_handler.handle_change(params.clone().into()); + self.common_handler.handle_change(&diff); + + // TODO: notify the diff instead of the snapshot. // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 7ff3a43551585..a7336a10e5de6 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -90,7 +90,12 @@ impl SystemParamsManager { let params = params_guard.deref_mut(); let mut mem_txn = VarTransaction::new(params); - set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)?; + let Some((_new_value, diff)) = + set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)? + else { + // No changes on the parameter. + return Ok(params.clone()); + }; let mut store_txn = Transaction::default(); mem_txn.apply_to_txn(&mut store_txn).await?; @@ -98,10 +103,10 @@ impl SystemParamsManager { mem_txn.commit(); - // TODO: check if the parameter is actually changed. - // Run common handler. - self.common_handler.handle_change(params.clone().into()); + self.common_handler.handle_change(&diff); + + // TODO: notify the diff instead of the snapshot. // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index cb27840d7530f..6eab94af6c48a 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -202,8 +202,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // Configure levels for external crates. filter = filter .with_target("foyer", Level::WARN) - .with_target("aws_sdk_ec2", Level::INFO) - .with_target("aws_sdk_s3", Level::INFO) + .with_target("aws", Level::INFO) .with_target("aws_config", Level::WARN) .with_target("aws_endpoint", Level::WARN) .with_target("aws_credential_types::cache::lazy_caching", Level::WARN)