Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): only persist and handle changes when system params actually get updated #14602

Merged
merged 10 commits into from
Jan 22, 2024
27 changes: 9 additions & 18 deletions src/common/src/system_param/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Mutex;

use super::reader::{SystemParamsRead, SystemParamsReader};
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use crate::util::tracing::layer::toggle_otel_layer;

/// Node-independent handler for system parameter changes.
///
/// Currently, it is only used to enable or disable the distributed tracing layer.
pub struct CommonHandler {
last_params: Mutex<Option<SystemParamsReader>>,
}
#[derive(Debug)]
pub struct CommonHandler;

impl CommonHandler {
/// Create a new handler with the initial parameters.
pub fn new(initial: SystemParamsReader) -> Self {
let this = Self {
last_params: None.into(),
};
this.handle_change(initial);
let this = Self;
this.handle_change(&SystemParamsDiff::from_initial(initial));
this
}

/// Handle the change of system parameters.
// TODO: directly call this method with the difference of old and new params.
pub fn handle_change(&self, new_params: SystemParamsReader) {
let mut last_params = self.last_params.lock().unwrap();

if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) {
toggle_otel_layer(new_params.enable_tracing());
pub fn handle_change(&self, diff: &SystemParamsDiff) {
if let Some(enabled) = diff.enable_tracing {
toggle_otel_layer(enabled)
}

last_params.replace(new_params);
}
}
67 changes: 67 additions & 0 deletions src/common/src/system_param/diff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::reader::SystemParamsRead;
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
use crate::for_all_params;

macro_rules! define_diff {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
/// The diff of the system params.
///
/// Fields that are changed are set to `Some`, otherwise `None`.
#[derive(Default, Debug, Clone)]
pub struct SystemParamsDiff {
$(
#[doc = $doc]
pub $field: Option<$type>,
)*
}
}
}
for_all_params!(define_diff);

impl SystemParamsDiff {
/// Create a diff between the given two system params.
pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self {
let mut diff = Self::default();

macro_rules! set_diff_field {
($({ $field:ident, $($rest:tt)* },)*) => {
$(
if curr.$field() != prev.$field() {
diff.$field = Some(curr.$field().to_owned());
}
)*
};
}
for_all_params!(set_diff_field);

diff
}

/// Create a diff from the given initial system params.
/// All fields will be set to `Some`.
pub fn from_initial(initial: impl SystemParamsRead) -> Self {
macro_rules! initial_field {
($({ $field:ident, $($rest:tt)* },)*) => {
Self {
$(
$field: Some(initial.$field().to_owned()),
)*
}
};
}
for_all_params!(initial_field)
}
}
29 changes: 20 additions & 9 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::meta::SystemParams;
use tokio::sync::watch::{channel, Receiver, Sender};

use super::common::CommonHandler;
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use super::system_params_for_test;

Expand All @@ -41,28 +42,39 @@ pub struct LocalSystemParamsManager {
}

impl LocalSystemParamsManager {
/// Create a new instance of `LocalSystemParamsManager` and spawn a task to run
/// the common handler.
pub fn new(initial_params: SystemParamsReader) -> Self {
let params = Arc::new(ArcSwap::from_pointee(initial_params.clone()));
let (tx, _) = channel(params.clone());
let this = Self::new_inner(initial_params.clone());

// Spawn a task to run the common handler.
tokio::spawn({
let mut rx = tx.subscribe();
let mut rx = this.tx.subscribe();
async move {
let mut params = initial_params.clone();
let handler = CommonHandler::new(initial_params);

while rx.changed().await.is_ok() {
// TODO: directly watch the changes instead of diffing ourselves.
let new_params = (**rx.borrow_and_update().load()).clone();
handler.handle_change(new_params);
let diff = SystemParamsDiff::diff(params.as_ref(), new_params.as_ref());
handler.handle_change(&diff);
params = new_params;
}
}
});

Self { params, tx }
this
}

pub fn for_test() -> Self {
Self::new(system_params_for_test().into())
Self::new_inner(system_params_for_test().into())
}

fn new_inner(initial_params: SystemParamsReader) -> Self {
let params = Arc::new(ArcSwap::from_pointee(initial_params));
let (tx, _) = channel(params.clone());
Self { params, tx }
}

pub fn get_params(&self) -> SystemParamsReaderRef {
Expand All @@ -89,12 +101,11 @@ mod tests {

#[tokio::test]
async fn test_manager() {
let p = SystemParams::default().into();
let manager = LocalSystemParamsManager::new(p);
let manager = LocalSystemParamsManager::for_test();
let shared_params = manager.get_params();

let new_params = SystemParams {
sstable_size_mb: Some(1),
sstable_size_mb: Some(114514),
..Default::default()
};

Expand Down
43 changes: 33 additions & 10 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! - Add a new method to [`reader::SystemParamsReader`].

pub mod common;
pub mod diff;
pub mod local_manager;
pub mod reader;

Expand All @@ -31,6 +32,8 @@ use std::str::FromStr;
use paste::paste;
use risingwave_pb::meta::PbSystemParams;

use self::diff::SystemParamsDiff;

pub type SystemParamsError = String;

type Result<T> = core::result::Result<T, SystemParamsError>;
Expand Down Expand Up @@ -300,28 +303,48 @@ macro_rules! impl_default_from_other_params {

macro_rules! impl_set_system_param {
($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
/// Set a system parameter with the given value or default one, returns the new value.
pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option<String>) -> Result<String> {
match key {
/// Set a system parameter with the given value or default one.
///
/// Returns the new value if changed, or an error if the parameter is unrecognized
/// or the value is invalid.
pub fn set_system_param(
params: &mut PbSystemParams,
key: &str,
value: Option<impl AsRef<str>>,
) -> Result<Option<(String, SystemParamsDiff)>> {
use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};

match key {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.parse().map_err(|_| format!("cannot parse parameter value"))?
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
OverrideValidateOnSet::$field(&v)?;
params.$field = Some(v.clone());
return Ok(v.to_string())

let changed = SystemParamsReader::new(&*params).$field() != v;
if changed {
let new_value = v.to_string();
let diff = SystemParamsDiff {
$field: Some(v.to_owned()),
..Default::default()
};
params.$field = Some(v);
Ok(Some((new_value, diff)))
} else {
Ok(None)
}
},
)*
_ => {
return Err(format!(
Err(format!(
"unrecognized system param {:?}",
key
));
))
}
};
}
}
};
}
Expand Down Expand Up @@ -433,7 +456,7 @@ mod tests {

#[test]
fn test_set() {
let mut p = PbSystemParams::default();
let mut p = system_params_for_test();
// Unrecognized param.
assert!(set_system_param(&mut p, "?", Some("?".to_string())).is_err());
// Value out of range.
Expand Down
Loading
Loading