From 931a70c251083f11e9f8226a4b80bf3ea7bc521a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 16 Jan 2024 17:17:59 +0800 Subject: [PATCH 1/9] skip updating and notifying when value not changed Signed-off-by: Bugen Zhao --- src/common/src/system_param/mod.rs | 28 +++++++---- src/common/src/system_param/reader.rs | 62 +++++++++++++++--------- src/meta/src/controller/system_param.rs | 12 +++-- src/meta/src/manager/system_param/mod.rs | 9 ++-- 4 files changed, 72 insertions(+), 39 deletions(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 86dea616519d8..85495f9aa5185 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -300,28 +300,38 @@ macro_rules! impl_default_from_other_params { macro_rules! impl_set_system_param { ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => { - /// Set a system parameter with the given value or default one, returns the new value. - pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option) -> Result { - match key { + /// Set a system parameter with the given value or default one. + /// Returns the new value if changed. + pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option>) -> Result> { + use crate::system_param::reader::{SystemParamsReader, SystemParamsRead}; + + match key { $( key_of!($field) => { let v = if let Some(v) = value { - v.parse().map_err(|_| format!("cannot parse parameter value"))? + v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))? } else { $default.ok_or_else(|| format!("{} does not have a default value", key))? }; OverrideValidateOnSet::$field(&v)?; - params.$field = Some(v.clone()); - return Ok(v.to_string()) + + let changed = SystemParamsReader::new(&*params).$field() != v; + if changed { + let new_value = v.to_string(); + params.$field = Some(v); + Ok(Some(new_value)) + } else { + Ok(None) + } }, )* _ => { - return Err(format!( + Err(format!( "unrecognized system param {:?}", key - )); + )) } - }; + } } }; } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index 06509c1aa1916..d311ebab37f1a 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Borrow; + use risingwave_pb::meta::PbSystemParams; use super::{default, system_params_to_kv, ParamValue}; @@ -40,13 +42,30 @@ for_all_params!(define_system_params_read_trait); /// /// See [`SystemParamsRead`] for more details. #[derive(Clone, Debug, PartialEq)] -pub struct SystemParamsReader { - prost: PbSystemParams, +pub struct SystemParamsReader { + inner: I, } impl From for SystemParamsReader { fn from(prost: PbSystemParams) -> Self { - Self { prost } + Self { inner: prost } + } +} + +impl SystemParamsReader +where + I: Borrow, +{ + pub fn new(inner: I) -> Self { + Self { inner } + } + + fn inner(&self) -> &PbSystemParams { + self.inner.borrow() + } + + pub fn to_kv(&self) -> Vec<(String, String)> { + system_params_to_kv(&self.inner()).unwrap() } } @@ -54,59 +73,62 @@ impl From for SystemParamsReader { /// For example, if a parameter is introduced before the initial public release. /// /// - Otherwise, specify the fallback logic when the field is missing. -impl SystemParamsRead for SystemParamsReader { +impl SystemParamsRead for SystemParamsReader +where + I: Borrow, +{ fn barrier_interval_ms(&self) -> u32 { - self.prost.barrier_interval_ms.unwrap() + self.inner().barrier_interval_ms.unwrap() } fn checkpoint_frequency(&self) -> u64 { - self.prost.checkpoint_frequency.unwrap() + self.inner().checkpoint_frequency.unwrap() } fn parallel_compact_size_mb(&self) -> u32 { - self.prost.parallel_compact_size_mb.unwrap() + self.inner().parallel_compact_size_mb.unwrap() } fn sstable_size_mb(&self) -> u32 { - self.prost.sstable_size_mb.unwrap() + self.inner().sstable_size_mb.unwrap() } fn block_size_kb(&self) -> u32 { - self.prost.block_size_kb.unwrap() + self.inner().block_size_kb.unwrap() } fn bloom_false_positive(&self) -> f64 { - self.prost.bloom_false_positive.unwrap() + self.inner().bloom_false_positive.unwrap() } fn state_store(&self) -> &str { - self.prost.state_store.as_ref().unwrap() + self.inner().state_store.as_ref().unwrap() } fn data_directory(&self) -> &str { - self.prost.data_directory.as_ref().unwrap() + self.inner().data_directory.as_ref().unwrap() } fn backup_storage_url(&self) -> &str { - self.prost.backup_storage_url.as_ref().unwrap() + self.inner().backup_storage_url.as_ref().unwrap() } fn backup_storage_directory(&self) -> &str { - self.prost.backup_storage_directory.as_ref().unwrap() + self.inner().backup_storage_directory.as_ref().unwrap() } fn max_concurrent_creating_streaming_jobs(&self) -> u32 { - self.prost.max_concurrent_creating_streaming_jobs.unwrap() + self.inner().max_concurrent_creating_streaming_jobs.unwrap() } fn pause_on_next_bootstrap(&self) -> bool { - self.prost + self.inner() .pause_on_next_bootstrap .unwrap_or_else(default::pause_on_next_bootstrap) } fn enable_tracing(&self) -> bool { - self.prost + self.inner() .enable_tracing .unwrap_or_else(default::enable_tracing) } @@ -118,9 +140,3 @@ impl SystemParamsRead for SystemParamsReader { .unwrap_or(&default::WASM_STORAGE_URL) } } - -impl SystemParamsReader { - pub fn to_kv(&self) -> Vec<(String, String)> { - system_params_to_kv(&self.prost).unwrap() - } -} diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index e483645ead937..bc9a944e670ef 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -195,13 +195,17 @@ impl SystemParamsController { }; let mut params = params_guard.clone(); let mut param: system_parameter::ActiveModel = param.into(); - param.value = - Set(set_system_param(&mut params, name, value).map_err(MetaError::system_params)?); + let Some(value) = set_system_param(&mut params, name, value.as_ref()) + .map_err(MetaError::system_params)? + else { + // No changes on the parameter. + return Ok(params); + }; + + param.value = Set(value); param.update(&self.db).await?; *params_guard = params.clone(); - // TODO: check if the parameter is actually changed. - // Run common handler. self.common_handler.handle_change(params.clone().into()); diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 7ff3a43551585..52d5157042bb8 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -90,7 +90,12 @@ impl SystemParamsManager { let params = params_guard.deref_mut(); let mut mem_txn = VarTransaction::new(params); - set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)?; + let Some(_) = + set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)? + else { + // No changes on the parameter. + return Ok(params.clone()); + }; let mut store_txn = Transaction::default(); mem_txn.apply_to_txn(&mut store_txn).await?; @@ -98,8 +103,6 @@ impl SystemParamsManager { mem_txn.commit(); - // TODO: check if the parameter is actually changed. - // Run common handler. self.common_handler.handle_change(params.clone().into()); From ce38c30c38d32eab2063a135e8fa384caa8fa296 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 16 Jan 2024 17:56:37 +0800 Subject: [PATCH 2/9] handle diff change Signed-off-by: Bugen Zhao --- src/common/src/system_param/common.rs | 21 +++++---- src/common/src/system_param/diff.rs | 46 ++++++++++++++++++++ src/common/src/system_param/local_manager.rs | 7 ++- src/common/src/system_param/mod.rs | 15 ++++++- src/common/src/system_param/reader.rs | 23 ++++++++-- src/meta/src/controller/system_param.rs | 10 +++-- src/meta/src/manager/system_param/mod.rs | 6 ++- 7 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 src/common/src/system_param/diff.rs diff --git a/src/common/src/system_param/common.rs b/src/common/src/system_param/common.rs index eb660d2ac3bb5..9413404d179c2 100644 --- a/src/common/src/system_param/common.rs +++ b/src/common/src/system_param/common.rs @@ -14,35 +14,34 @@ use std::sync::Mutex; -use super::reader::{SystemParamsRead, SystemParamsReader}; +use super::diff::SystemParamsDiff; +use super::reader::SystemParamsReader; use crate::util::tracing::layer::toggle_otel_layer; /// Node-independent handler for system parameter changes. /// /// Currently, it is only used to enable or disable the distributed tracing layer. pub struct CommonHandler { - last_params: Mutex>, + // TODO: this is bad + sync: Mutex<()>, } impl CommonHandler { /// Create a new handler with the initial parameters. pub fn new(initial: SystemParamsReader) -> Self { let this = Self { - last_params: None.into(), + sync: Default::default(), }; - this.handle_change(initial); + this.handle_change(&SystemParamsDiff::from_initial(initial)); this } /// Handle the change of system parameters. - // TODO: directly call this method with the difference of old and new params. - pub fn handle_change(&self, new_params: SystemParamsReader) { - let mut last_params = self.last_params.lock().unwrap(); + pub fn handle_change(&self, diff: &SystemParamsDiff) { + let mut _sync_guard = self.sync.lock().unwrap(); - if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) { - toggle_otel_layer(new_params.enable_tracing()); + if let Some(enabled) = diff.enable_tracing { + toggle_otel_layer(enabled) } - - last_params.replace(new_params); } } diff --git a/src/common/src/system_param/diff.rs b/src/common/src/system_param/diff.rs new file mode 100644 index 0000000000000..dd4a9c8b1b94f --- /dev/null +++ b/src/common/src/system_param/diff.rs @@ -0,0 +1,46 @@ +use std::ops::Deref; + +use risingwave_pb::meta::PbSystemParams; + +use super::reader::{SystemParamsRead, SystemParamsReader}; +use crate::for_all_params; + +pub struct SystemParamsDiff { + diff: PbSystemParams, +} + +impl Deref for SystemParamsDiff { + type Target = PbSystemParams; + + fn deref(&self) -> &Self::Target { + &self.diff + } +} + +impl SystemParamsDiff { + pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self { + let mut diff = PbSystemParams::default(); + + macro_rules! set_diff_field { + ($({ $field:ident, $($rest:tt)* },)*) => { + $( + if curr.$field() != prev.$field() { + diff.$field = Some(curr.$field().to_owned()); + } + )* + }; + } + + for_all_params!(set_diff_field); + + Self { diff } + } + + pub fn new(diff: PbSystemParams) -> Self { + Self { diff } + } + + pub fn from_initial(initial: impl SystemParamsRead) -> Self { + Self::diff(SystemParamsReader::default(), initial) + } +} diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index 312c5577a0f81..c0ff185257e2c 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -20,6 +20,7 @@ use risingwave_pb::meta::SystemParams; use tokio::sync::watch::{channel, Receiver, Sender}; use super::common::CommonHandler; +use super::diff::SystemParamsDiff; use super::reader::SystemParamsReader; use super::system_params_for_test; @@ -49,11 +50,15 @@ impl LocalSystemParamsManager { tokio::spawn({ let mut rx = tx.subscribe(); async move { + let mut old_params = initial_params.clone(); let handler = CommonHandler::new(initial_params); + // TODO: receive the diff instead of the snapshot. while rx.changed().await.is_ok() { let new_params = (**rx.borrow_and_update().load()).clone(); - handler.handle_change(new_params); + let diff = SystemParamsDiff::diff(old_params.as_ref(), new_params.as_ref()); + handler.handle_change(&diff); + old_params = new_params; } } }); diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 85495f9aa5185..a1e3a9339458a 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -21,6 +21,7 @@ //! - Add a new method to [`reader::SystemParamsReader`]. pub mod common; +pub mod diff; pub mod local_manager; pub mod reader; @@ -31,6 +32,8 @@ use std::str::FromStr; use paste::paste; use risingwave_pb::meta::PbSystemParams; +use self::diff::SystemParamsDiff; + pub type SystemParamsError = String; type Result = core::result::Result; @@ -302,7 +305,11 @@ macro_rules! impl_set_system_param { ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => { /// Set a system parameter with the given value or default one. /// Returns the new value if changed. - pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option>) -> Result> { + pub fn set_system_param( + params: &mut PbSystemParams, + key: &str, + value: Option>, + ) -> Result> { use crate::system_param::reader::{SystemParamsReader, SystemParamsRead}; match key { @@ -318,8 +325,12 @@ macro_rules! impl_set_system_param { let changed = SystemParamsReader::new(&*params).$field() != v; if changed { let new_value = v.to_string(); + let diff = SystemParamsDiff::new(PbSystemParams { + $field: Some(v.clone()), + ..Default::default() + }); params.$field = Some(v); - Ok(Some(new_value)) + Ok(Some((new_value, diff))) } else { Ok(None) } diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index d311ebab37f1a..fa590dc2ae70f 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -46,9 +46,20 @@ pub struct SystemParamsReader { inner: I, } -impl From for SystemParamsReader { - fn from(prost: PbSystemParams) -> Self { - Self { inner: prost } +impl Default for SystemParamsReader { + fn default() -> Self { + Self { + inner: PbSystemParams::default(), + } + } +} + +impl From for SystemParamsReader +where + I: Borrow, +{ + fn from(inner: I) -> Self { + Self { inner } } } @@ -60,6 +71,12 @@ where Self { inner } } + pub fn as_ref(&self) -> SystemParamsReader<&PbSystemParams> { + SystemParamsReader { + inner: self.inner(), + } + } + fn inner(&self) -> &PbSystemParams { self.inner.borrow() } diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index bc9a944e670ef..bb8f51479856b 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -195,19 +195,21 @@ impl SystemParamsController { }; let mut params = params_guard.clone(); let mut param: system_parameter::ActiveModel = param.into(); - let Some(value) = set_system_param(&mut params, name, value.as_ref()) - .map_err(MetaError::system_params)? + let Some((new_value, diff)) = + set_system_param(&mut params, name, value).map_err(MetaError::system_params)? else { // No changes on the parameter. return Ok(params); }; - param.value = Set(value); + param.value = Set(new_value); param.update(&self.db).await?; *params_guard = params.clone(); // Run common handler. - self.common_handler.handle_change(params.clone().into()); + self.common_handler.handle_change(&diff); + + // TODO: notify the diff instead of the snapshot. // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index 52d5157042bb8..a7336a10e5de6 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -90,7 +90,7 @@ impl SystemParamsManager { let params = params_guard.deref_mut(); let mut mem_txn = VarTransaction::new(params); - let Some(_) = + let Some((_new_value, diff)) = set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)? else { // No changes on the parameter. @@ -104,7 +104,9 @@ impl SystemParamsManager { mem_txn.commit(); // Run common handler. - self.common_handler.handle_change(params.clone().into()); + self.common_handler.handle_change(&diff); + + // TODO: notify the diff instead of the snapshot. // Sync params to other managers on the meta node only once, since it's infallible. self.notification_manager From cafc6471a353c084f659ae4c85abf1d87cb24e1e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 17 Jan 2024 15:22:42 +0800 Subject: [PATCH 3/9] do not wrap pb for diff Signed-off-by: Bugen Zhao --- src/common/src/system_param/diff.rs | 51 +++++++++++++++------------ src/common/src/system_param/mod.rs | 6 ++-- src/common/src/system_param/reader.rs | 17 +++------ 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/src/common/src/system_param/diff.rs b/src/common/src/system_param/diff.rs index dd4a9c8b1b94f..f86feb667c702 100644 --- a/src/common/src/system_param/diff.rs +++ b/src/common/src/system_param/diff.rs @@ -1,25 +1,26 @@ -use std::ops::Deref; - -use risingwave_pb::meta::PbSystemParams; - -use super::reader::{SystemParamsRead, SystemParamsReader}; +use super::reader::SystemParamsRead; use crate::for_all_params; -pub struct SystemParamsDiff { - diff: PbSystemParams, -} - -impl Deref for SystemParamsDiff { - type Target = PbSystemParams; - - fn deref(&self) -> &Self::Target { - &self.diff +macro_rules! define_diff { + ($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => { + /// The diff of the system params. + /// + /// Fields that are changed are set to `Some`, otherwise `None`. + #[derive(Default, Debug, Clone)] + pub struct SystemParamsDiff { + $( + #[doc = $doc] + pub $field: Option<$type>, + )* + } } } +for_all_params!(define_diff); impl SystemParamsDiff { + /// Create a diff between the given two system params. pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self { - let mut diff = PbSystemParams::default(); + let mut diff = Self::default(); macro_rules! set_diff_field { ($({ $field:ident, $($rest:tt)* },)*) => { @@ -30,17 +31,23 @@ impl SystemParamsDiff { )* }; } - for_all_params!(set_diff_field); - Self { diff } - } - - pub fn new(diff: PbSystemParams) -> Self { - Self { diff } + diff } + /// Create a diff from the given initial system params. + /// All fields will be set to `Some`. pub fn from_initial(initial: impl SystemParamsRead) -> Self { - Self::diff(SystemParamsReader::default(), initial) + macro_rules! initial_field { + ($({ $field:ident, $($rest:tt)* },)*) => { + Self { + $( + $field: Some(initial.$field().to_owned()), + )* + } + }; + } + for_all_params!(initial_field) } } diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index a1e3a9339458a..17c0a308d3e52 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -325,10 +325,10 @@ macro_rules! impl_set_system_param { let changed = SystemParamsReader::new(&*params).$field() != v; if changed { let new_value = v.to_string(); - let diff = SystemParamsDiff::new(PbSystemParams { - $field: Some(v.clone()), + let diff = SystemParamsDiff { + $field: Some(v.to_owned()), ..Default::default() - }); + }; params.$field = Some(v); Ok(Some((new_value, diff))) } else { diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index fa590dc2ae70f..ec280f8bb05b1 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -46,14 +46,6 @@ pub struct SystemParamsReader { inner: I, } -impl Default for SystemParamsReader { - fn default() -> Self { - Self { - inner: PbSystemParams::default(), - } - } -} - impl From for SystemParamsReader where I: Borrow, @@ -71,19 +63,20 @@ where Self { inner } } + /// Return a new reader with the reference to the inner system params. pub fn as_ref(&self) -> SystemParamsReader<&PbSystemParams> { SystemParamsReader { inner: self.inner(), } } - fn inner(&self) -> &PbSystemParams { - self.inner.borrow() - } - pub fn to_kv(&self) -> Vec<(String, String)> { system_params_to_kv(&self.inner()).unwrap() } + + fn inner(&self) -> &PbSystemParams { + self.inner.borrow() + } } /// - Unwrap the field if it always exists. From d6f3248eebcc4798502ba36682a96661bb52e2b1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 17 Jan 2024 15:38:26 +0800 Subject: [PATCH 4/9] refine docs Signed-off-by: Bugen Zhao --- src/common/src/system_param/common.rs | 14 +++----------- src/common/src/system_param/diff.rs | 14 ++++++++++++++ src/common/src/system_param/local_manager.rs | 8 ++++---- src/common/src/system_param/mod.rs | 4 +++- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/common/src/system_param/common.rs b/src/common/src/system_param/common.rs index 9413404d179c2..d8ff741399533 100644 --- a/src/common/src/system_param/common.rs +++ b/src/common/src/system_param/common.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Mutex; - use super::diff::SystemParamsDiff; use super::reader::SystemParamsReader; use crate::util::tracing::layer::toggle_otel_layer; @@ -21,25 +19,19 @@ use crate::util::tracing::layer::toggle_otel_layer; /// Node-independent handler for system parameter changes. /// /// Currently, it is only used to enable or disable the distributed tracing layer. -pub struct CommonHandler { - // TODO: this is bad - sync: Mutex<()>, -} +#[derive(Debug)] +pub struct CommonHandler; impl CommonHandler { /// Create a new handler with the initial parameters. pub fn new(initial: SystemParamsReader) -> Self { - let this = Self { - sync: Default::default(), - }; + let this = Self; this.handle_change(&SystemParamsDiff::from_initial(initial)); this } /// Handle the change of system parameters. pub fn handle_change(&self, diff: &SystemParamsDiff) { - let mut _sync_guard = self.sync.lock().unwrap(); - if let Some(enabled) = diff.enable_tracing { toggle_otel_layer(enabled) } diff --git a/src/common/src/system_param/diff.rs b/src/common/src/system_param/diff.rs index f86feb667c702..243b3e247bec8 100644 --- a/src/common/src/system_param/diff.rs +++ b/src/common/src/system_param/diff.rs @@ -1,3 +1,17 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use super::reader::SystemParamsRead; use crate::for_all_params; diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index c0ff185257e2c..7ce6938a84f8e 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -50,15 +50,15 @@ impl LocalSystemParamsManager { tokio::spawn({ let mut rx = tx.subscribe(); async move { - let mut old_params = initial_params.clone(); + let mut params = initial_params.clone(); let handler = CommonHandler::new(initial_params); - // TODO: receive the diff instead of the snapshot. while rx.changed().await.is_ok() { + // TODO: directly watch the changes instead of diffing ourselves. let new_params = (**rx.borrow_and_update().load()).clone(); - let diff = SystemParamsDiff::diff(old_params.as_ref(), new_params.as_ref()); + let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref()); handler.handle_change(&diff); - old_params = new_params; + params = new_params; } } }); diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 17c0a308d3e52..ff1346b9853b4 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -304,7 +304,9 @@ macro_rules! impl_default_from_other_params { macro_rules! impl_set_system_param { ($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => { /// Set a system parameter with the given value or default one. - /// Returns the new value if changed. + /// + /// Returns the new value if changed, or an error if the parameter is unrecognized + /// or the value is invalid. pub fn set_system_param( params: &mut PbSystemParams, key: &str, From 01057b6176d65aa08cf54b6a83d061238d3348a8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 17 Jan 2024 16:24:18 +0800 Subject: [PATCH 5/9] fix build and clippy Signed-off-by: Bugen Zhao --- src/common/src/system_param/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/src/system_param/reader.rs b/src/common/src/system_param/reader.rs index ec280f8bb05b1..c6b8d8c5af6aa 100644 --- a/src/common/src/system_param/reader.rs +++ b/src/common/src/system_param/reader.rs @@ -71,7 +71,7 @@ where } pub fn to_kv(&self) -> Vec<(String, String)> { - system_params_to_kv(&self.inner()).unwrap() + system_params_to_kv(self.inner()).unwrap() } fn inner(&self) -> &PbSystemParams { @@ -144,7 +144,7 @@ where } fn wasm_storage_url(&self) -> &str { - self.prost + self.inner() .wasm_storage_url .as_ref() .unwrap_or(&default::WASM_STORAGE_URL) From 3a54d0b1191f310963dcdf09a5de1d98242e0496 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 17 Jan 2024 16:24:24 +0800 Subject: [PATCH 6/9] use info level for aws Signed-off-by: Bugen Zhao --- src/utils/runtime/src/logger.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index cb27840d7530f..6eab94af6c48a 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -202,8 +202,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { // Configure levels for external crates. filter = filter .with_target("foyer", Level::WARN) - .with_target("aws_sdk_ec2", Level::INFO) - .with_target("aws_sdk_s3", Level::INFO) + .with_target("aws", Level::INFO) .with_target("aws_config", Level::WARN) .with_target("aws_endpoint", Level::WARN) .with_target("aws_credential_types::cache::lazy_caching", Level::WARN) From a1a35c7b04355bdf7d9e8be00a5a61492c9921f4 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 22 Jan 2024 13:30:00 +0800 Subject: [PATCH 7/9] fix unit test Signed-off-by: Bugen Zhao --- src/common/src/system_param/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index ff1346b9853b4..df91214898171 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -417,6 +417,7 @@ impl FromParams for OverrideFromParams {} #[cfg(test)] mod tests { use super::*; + use crate::config::SystemConfig; #[test] fn test_to_from_kv() { @@ -456,7 +457,7 @@ mod tests { #[test] fn test_set() { - let mut p = PbSystemParams::default(); + let mut p = SystemConfig::default().into_init_system_params(); // Unrecognized param. assert!(set_system_param(&mut p, "?", Some("?".to_string())).is_err()); // Value out of range. From 62d7901725c1bbf2331a3018c8a25d908cf0cfba Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 22 Jan 2024 14:25:18 +0800 Subject: [PATCH 8/9] do not spawn common handler task Signed-off-by: Bugen Zhao --- src/common/src/system_param/local_manager.rs | 22 +++++++++++++------- src/common/src/system_param/mod.rs | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index 7ce6938a84f8e..5040d30f811d0 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -42,13 +42,14 @@ pub struct LocalSystemParamsManager { } impl LocalSystemParamsManager { + /// Create a new instance of `LocalSystemParamsManager` and spawn a task to run + /// the common handler. pub fn new(initial_params: SystemParamsReader) -> Self { - let params = Arc::new(ArcSwap::from_pointee(initial_params.clone())); - let (tx, _) = channel(params.clone()); + let this = Self::new_inner(initial_params.clone()); // Spawn a task to run the common handler. tokio::spawn({ - let mut rx = tx.subscribe(); + let mut rx = this.tx.subscribe(); async move { let mut params = initial_params.clone(); let handler = CommonHandler::new(initial_params); @@ -63,11 +64,17 @@ impl LocalSystemParamsManager { } }); - Self { params, tx } + this } pub fn for_test() -> Self { - Self::new(system_params_for_test().into()) + Self::new_inner(system_params_for_test().into()) + } + + fn new_inner(initial_params: SystemParamsReader) -> Self { + let params = Arc::new(ArcSwap::from_pointee(initial_params)); + let (tx, _) = channel(params.clone()); + Self { params, tx } } pub fn get_params(&self) -> SystemParamsReaderRef { @@ -94,12 +101,11 @@ mod tests { #[tokio::test] async fn test_manager() { - let p = SystemParams::default().into(); - let manager = LocalSystemParamsManager::new(p); + let manager = LocalSystemParamsManager::for_test(); let shared_params = manager.get_params(); let new_params = SystemParams { - sstable_size_mb: Some(1), + sstable_size_mb: Some(114514), ..Default::default() }; diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index df91214898171..84793f3d38f51 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -457,7 +457,7 @@ mod tests { #[test] fn test_set() { - let mut p = SystemConfig::default().into_init_system_params(); + let mut p = system_params_for_test(); // Unrecognized param. assert!(set_system_param(&mut p, "?", Some("?".to_string())).is_err()); // Value out of range. From bac11a8ee51515a95469ff84a3faf49ca2e51acb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 22 Jan 2024 16:32:29 +0800 Subject: [PATCH 9/9] fix check Signed-off-by: Bugen Zhao --- src/common/src/system_param/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 84793f3d38f51..a55b236f4b310 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -417,7 +417,6 @@ impl FromParams for OverrideFromParams {} #[cfg(test)] mod tests { use super::*; - use crate::config::SystemConfig; #[test] fn test_to_from_kv() {