Skip to content

Commit

Permalink
refactor(meta): only persist and handle changes when system params ac…
Browse files Browse the repository at this point in the history
…tually get updated (#14602)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored and TennyZhuang committed Feb 2, 2024
1 parent 186703d commit 5d21534
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 74 deletions.
27 changes: 9 additions & 18 deletions src/common/src/system_param/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Mutex;

use super::reader::{SystemParamsRead, SystemParamsReader};
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use crate::util::tracing::layer::toggle_otel_layer;

/// Node-independent handler for system parameter changes.
///
/// Currently, it is only used to enable or disable the distributed tracing layer.
pub struct CommonHandler {
last_params: Mutex<Option<SystemParamsReader>>,
}
#[derive(Debug)]
pub struct CommonHandler;

impl CommonHandler {
/// Create a new handler with the initial parameters.
pub fn new(initial: SystemParamsReader) -> Self {
let this = Self {
last_params: None.into(),
};
this.handle_change(initial);
let this = Self;
this.handle_change(&SystemParamsDiff::from_initial(initial));
this
}

/// Handle the change of system parameters.
// TODO: directly call this method with the difference of old and new params.
pub fn handle_change(&self, new_params: SystemParamsReader) {
let mut last_params = self.last_params.lock().unwrap();

if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) {
toggle_otel_layer(new_params.enable_tracing());
pub fn handle_change(&self, diff: &SystemParamsDiff) {
if let Some(enabled) = diff.enable_tracing {
toggle_otel_layer(enabled)
}

last_params.replace(new_params);
}
}
67 changes: 67 additions & 0 deletions src/common/src/system_param/diff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::reader::SystemParamsRead;
use crate::for_all_params;

macro_rules! define_diff {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
/// The diff of the system params.
///
/// Fields that are changed are set to `Some`, otherwise `None`.
#[derive(Default, Debug, Clone)]
pub struct SystemParamsDiff {
$(
#[doc = $doc]
pub $field: Option<$type>,
)*
}
}
}
for_all_params!(define_diff);

impl SystemParamsDiff {
/// Create a diff between the given two system params.
pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self {
let mut diff = Self::default();

macro_rules! set_diff_field {
($({ $field:ident, $($rest:tt)* },)*) => {
$(
if curr.$field() != prev.$field() {
diff.$field = Some(curr.$field().to_owned());
}
)*
};
}
for_all_params!(set_diff_field);

diff
}

/// Create a diff from the given initial system params.
/// All fields will be set to `Some`.
pub fn from_initial(initial: impl SystemParamsRead) -> Self {
macro_rules! initial_field {
($({ $field:ident, $($rest:tt)* },)*) => {
Self {
$(
$field: Some(initial.$field().to_owned()),
)*
}
};
}
for_all_params!(initial_field)
}
}
29 changes: 20 additions & 9 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::meta::SystemParams;
use tokio::sync::watch::{channel, Receiver, Sender};

use super::common::CommonHandler;
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use super::system_params_for_test;

Expand All @@ -41,28 +42,39 @@ pub struct LocalSystemParamsManager {
}

impl LocalSystemParamsManager {
/// Create a new instance of `LocalSystemParamsManager` and spawn a task to run
/// the common handler.
pub fn new(initial_params: SystemParamsReader) -> Self {
let params = Arc::new(ArcSwap::from_pointee(initial_params.clone()));
let (tx, _) = channel(params.clone());
let this = Self::new_inner(initial_params.clone());

// Spawn a task to run the common handler.
tokio::spawn({
let mut rx = tx.subscribe();
let mut rx = this.tx.subscribe();
async move {
let mut params = initial_params.clone();
let handler = CommonHandler::new(initial_params);

while rx.changed().await.is_ok() {
// TODO: directly watch the changes instead of diffing ourselves.
let new_params = (**rx.borrow_and_update().load()).clone();
handler.handle_change(new_params);
let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref());
handler.handle_change(&diff);
params = new_params;
}
}
});

Self { params, tx }
this
}

pub fn for_test() -> Self {
Self::new(system_params_for_test().into())
Self::new_inner(system_params_for_test().into())
}

fn new_inner(initial_params: SystemParamsReader) -> Self {
let params = Arc::new(ArcSwap::from_pointee(initial_params));
let (tx, _) = channel(params.clone());
Self { params, tx }
}

pub fn get_params(&self) -> SystemParamsReaderRef {
Expand All @@ -89,12 +101,11 @@ mod tests {

#[tokio::test]
async fn test_manager() {
let p = SystemParams::default().into();
let manager = LocalSystemParamsManager::new(p);
let manager = LocalSystemParamsManager::for_test();
let shared_params = manager.get_params();

let new_params = SystemParams {
sstable_size_mb: Some(1),
sstable_size_mb: Some(114514),
..Default::default()
};

Expand Down
43 changes: 33 additions & 10 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! - Add a new method to [`reader::SystemParamsReader`].
pub mod common;
pub mod diff;
pub mod local_manager;
pub mod reader;

Expand All @@ -31,6 +32,8 @@ use std::str::FromStr;
use paste::paste;
use risingwave_pb::meta::PbSystemParams;

use self::diff::SystemParamsDiff;

pub type SystemParamsError = String;

type Result<T> = core::result::Result<T, SystemParamsError>;
Expand Down Expand Up @@ -300,28 +303,48 @@ macro_rules! impl_default_from_other_params {

macro_rules! impl_set_system_param {
($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
/// Set a system parameter with the given value or default one, returns the new value.
pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option<String>) -> Result<String> {
match key {
/// Set a system parameter with the given value or default one.
///
/// Returns the new value if changed, or an error if the parameter is unrecognized
/// or the value is invalid.
pub fn set_system_param(
params: &mut PbSystemParams,
key: &str,
value: Option<impl AsRef<str>>,
) -> Result<Option<(String, SystemParamsDiff)>> {
use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};

match key {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.parse().map_err(|_| format!("cannot parse parameter value"))?
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
OverrideValidateOnSet::$field(&v)?;
params.$field = Some(v.clone());
return Ok(v.to_string())

let changed = SystemParamsReader::new(&*params).$field() != v;
if changed {
let new_value = v.to_string();
let diff = SystemParamsDiff {
$field: Some(v.to_owned()),
..Default::default()
};
params.$field = Some(v);
Ok(Some((new_value, diff)))
} else {
Ok(None)
}
},
)*
_ => {
return Err(format!(
Err(format!(
"unrecognized system param {:?}",
key
));
))
}
};
}
}
};
}
Expand Down Expand Up @@ -433,7 +456,7 @@ mod tests {

#[test]
fn test_set() {
let mut p = PbSystemParams::default();
let mut p = system_params_for_test();
// Unrecognized param.
assert!(set_system_param(&mut p, "?", Some("?".to_string())).is_err());
// Value out of range.
Expand Down
Loading

0 comments on commit 5d21534

Please sign in to comment.