Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): only persist and handle changes when system params actually get updated #14602

Merged
merged 10 commits into from
Jan 22, 2024
21 changes: 10 additions & 11 deletions src/common/src/system_param/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,34 @@

use std::sync::Mutex;

use super::reader::{SystemParamsRead, SystemParamsReader};
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use crate::util::tracing::layer::toggle_otel_layer;

/// Node-independent handler for system parameter changes.
///
/// Currently, it is only used to enable or disable the distributed tracing layer.
pub struct CommonHandler {
last_params: Mutex<Option<SystemParamsReader>>,
// TODO: this is bad
sync: Mutex<()>,
}

impl CommonHandler {
/// Create a new handler with the initial parameters.
pub fn new(initial: SystemParamsReader) -> Self {
let this = Self {
last_params: None.into(),
sync: Default::default(),
};
this.handle_change(initial);
this.handle_change(&SystemParamsDiff::from_initial(initial));
this
}

/// Handle the change of system parameters.
// TODO: directly call this method with the difference of old and new params.
pub fn handle_change(&self, new_params: SystemParamsReader) {
let mut last_params = self.last_params.lock().unwrap();
pub fn handle_change(&self, diff: &SystemParamsDiff) {
let mut _sync_guard = self.sync.lock().unwrap();

if last_params.as_ref().map(|p| p.enable_tracing()) != Some(new_params.enable_tracing()) {
toggle_otel_layer(new_params.enable_tracing());
if let Some(enabled) = diff.enable_tracing {
toggle_otel_layer(enabled)
}

last_params.replace(new_params);
}
}
53 changes: 53 additions & 0 deletions src/common/src/system_param/diff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use super::reader::SystemParamsRead;
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
use crate::for_all_params;

macro_rules! define_diff {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
/// The diff of the system params.
///
/// Fields that are changed are set to `Some`, otherwise `None`.
#[derive(Default, Debug, Clone)]
pub struct SystemParamsDiff {
$(
#[doc = $doc]
pub $field: Option<$type>,
)*
}
}
}
for_all_params!(define_diff);

impl SystemParamsDiff {
/// Create a diff between the given two system params.
pub fn diff(prev: impl SystemParamsRead, curr: impl SystemParamsRead) -> Self {
let mut diff = Self::default();

macro_rules! set_diff_field {
($({ $field:ident, $($rest:tt)* },)*) => {
$(
if curr.$field() != prev.$field() {
diff.$field = Some(curr.$field().to_owned());
}
)*
};
}
for_all_params!(set_diff_field);

diff
}

/// Create a diff from the given initial system params.
/// All fields will be set to `Some`.
pub fn from_initial(initial: impl SystemParamsRead) -> Self {
macro_rules! initial_field {
($({ $field:ident, $($rest:tt)* },)*) => {
Self {
$(
$field: Some(initial.$field().to_owned()),
)*
}
};
}
for_all_params!(initial_field)
}
}
7 changes: 6 additions & 1 deletion src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::meta::SystemParams;
use tokio::sync::watch::{channel, Receiver, Sender};

use super::common::CommonHandler;
use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use super::system_params_for_test;

Expand Down Expand Up @@ -49,11 +50,15 @@ impl LocalSystemParamsManager {
tokio::spawn({
let mut rx = tx.subscribe();
async move {
let mut old_params = initial_params.clone();
let handler = CommonHandler::new(initial_params);

// TODO: receive the diff instead of the snapshot.
while rx.changed().await.is_ok() {
let new_params = (**rx.borrow_and_update().load()).clone();
handler.handle_change(new_params);
let diff = SystemParamsDiff::diff(old_params.as_ref(), new_params.as_ref());
handler.handle_change(&diff);
old_params = new_params;
}
}
});
Expand Down
39 changes: 30 additions & 9 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! - Add a new method to [`reader::SystemParamsReader`].

pub mod common;
pub mod diff;
pub mod local_manager;
pub mod reader;

Expand All @@ -31,6 +32,8 @@ use std::str::FromStr;
use paste::paste;
use risingwave_pb::meta::PbSystemParams;

use self::diff::SystemParamsDiff;

pub type SystemParamsError = String;

type Result<T> = core::result::Result<T, SystemParamsError>;
Expand Down Expand Up @@ -300,28 +303,46 @@ macro_rules! impl_default_from_other_params {

macro_rules! impl_set_system_param {
($({ $field:ident, $type:ty, $default:expr, $($rest:tt)* },)*) => {
/// Set a system parameter with the given value or default one, returns the new value.
pub fn set_system_param(params: &mut PbSystemParams, key: &str, value: Option<String>) -> Result<String> {
match key {
/// Set a system parameter with the given value or default one.
/// Returns the new value if changed.
pub fn set_system_param(
params: &mut PbSystemParams,
key: &str,
value: Option<impl AsRef<str>>,
) -> Result<Option<(String, SystemParamsDiff)>> {
use crate::system_param::reader::{SystemParamsReader, SystemParamsRead};

match key {
$(
key_of!($field) => {
let v = if let Some(v) = value {
v.parse().map_err(|_| format!("cannot parse parameter value"))?
v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))?
} else {
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
OverrideValidateOnSet::$field(&v)?;
params.$field = Some(v.clone());
return Ok(v.to_string())

let changed = SystemParamsReader::new(&*params).$field() != v;
if changed {
let new_value = v.to_string();
let diff = SystemParamsDiff {
$field: Some(v.to_owned()),
..Default::default()
};
params.$field = Some(v);
Ok(Some((new_value, diff)))
} else {
Ok(None)
}
},
)*
_ => {
return Err(format!(
Err(format!(
"unrecognized system param {:?}",
key
));
))
}
};
}
}
};
}
Expand Down
76 changes: 51 additions & 25 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;

use risingwave_pb::meta::PbSystemParams;

use super::{default, system_params_to_kv, ParamValue};
Expand Down Expand Up @@ -40,73 +42,103 @@ for_all_params!(define_system_params_read_trait);
///
/// See [`SystemParamsRead`] for more details.
#[derive(Clone, Debug, PartialEq)]
pub struct SystemParamsReader {
prost: PbSystemParams,
pub struct SystemParamsReader<I = PbSystemParams> {
inner: I,
Comment on lines +45 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introducing I here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we want to create a reader with the reference to a PbSystemParams:

let changed = SystemParamsReader::new(&*params).$field() != v;

}

impl From<PbSystemParams> for SystemParamsReader {
fn from(prost: PbSystemParams) -> Self {
Self { prost }
impl<I> From<I> for SystemParamsReader<I>
where
I: Borrow<PbSystemParams>,
{
fn from(inner: I) -> Self {
Self { inner }
}
}

impl<I> SystemParamsReader<I>
where
I: Borrow<PbSystemParams>,
{
pub fn new(inner: I) -> Self {
Self { inner }
}

/// Return a new reader with the reference to the inner system params.
pub fn as_ref(&self) -> SystemParamsReader<&PbSystemParams> {
SystemParamsReader {
inner: self.inner(),
}
}

pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.inner()).unwrap()
}

fn inner(&self) -> &PbSystemParams {
self.inner.borrow()
}
}

/// - Unwrap the field if it always exists.
/// For example, if a parameter is introduced before the initial public release.
///
/// - Otherwise, specify the fallback logic when the field is missing.
impl SystemParamsRead for SystemParamsReader {
impl<I> SystemParamsRead for SystemParamsReader<I>
where
I: Borrow<PbSystemParams>,
{
fn barrier_interval_ms(&self) -> u32 {
self.prost.barrier_interval_ms.unwrap()
self.inner().barrier_interval_ms.unwrap()
}

fn checkpoint_frequency(&self) -> u64 {
self.prost.checkpoint_frequency.unwrap()
self.inner().checkpoint_frequency.unwrap()
}

fn parallel_compact_size_mb(&self) -> u32 {
self.prost.parallel_compact_size_mb.unwrap()
self.inner().parallel_compact_size_mb.unwrap()
}

fn sstable_size_mb(&self) -> u32 {
self.prost.sstable_size_mb.unwrap()
self.inner().sstable_size_mb.unwrap()
}

fn block_size_kb(&self) -> u32 {
self.prost.block_size_kb.unwrap()
self.inner().block_size_kb.unwrap()
}

fn bloom_false_positive(&self) -> f64 {
self.prost.bloom_false_positive.unwrap()
self.inner().bloom_false_positive.unwrap()
}

fn state_store(&self) -> &str {
self.prost.state_store.as_ref().unwrap()
self.inner().state_store.as_ref().unwrap()
}

fn data_directory(&self) -> &str {
self.prost.data_directory.as_ref().unwrap()
self.inner().data_directory.as_ref().unwrap()
}

fn backup_storage_url(&self) -> &str {
self.prost.backup_storage_url.as_ref().unwrap()
self.inner().backup_storage_url.as_ref().unwrap()
}

fn backup_storage_directory(&self) -> &str {
self.prost.backup_storage_directory.as_ref().unwrap()
self.inner().backup_storage_directory.as_ref().unwrap()
}

fn max_concurrent_creating_streaming_jobs(&self) -> u32 {
self.prost.max_concurrent_creating_streaming_jobs.unwrap()
self.inner().max_concurrent_creating_streaming_jobs.unwrap()
}

fn pause_on_next_bootstrap(&self) -> bool {
self.prost
self.inner()
.pause_on_next_bootstrap
.unwrap_or_else(default::pause_on_next_bootstrap)
}

fn enable_tracing(&self) -> bool {
self.prost
self.inner()
.enable_tracing
.unwrap_or_else(default::enable_tracing)
}
Expand All @@ -118,9 +150,3 @@ impl SystemParamsRead for SystemParamsReader {
.unwrap_or(&default::WASM_STORAGE_URL)
}
}

impl SystemParamsReader {
pub fn to_kv(&self) -> Vec<(String, String)> {
system_params_to_kv(&self.prost).unwrap()
}
}
16 changes: 11 additions & 5 deletions src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,21 @@ impl SystemParamsController {
};
let mut params = params_guard.clone();
let mut param: system_parameter::ActiveModel = param.into();
param.value =
Set(set_system_param(&mut params, name, value).map_err(MetaError::system_params)?);
let Some((new_value, diff)) =
set_system_param(&mut params, name, value).map_err(MetaError::system_params)?
else {
// No changes on the parameter.
return Ok(params);
};

param.value = Set(new_value);
param.update(&self.db).await?;
*params_guard = params.clone();

// TODO: check if the parameter is actually changed.

// Run common handler.
self.common_handler.handle_change(params.clone().into());
self.common_handler.handle_change(&diff);

// TODO: notify the diff instead of the snapshot.

// Sync params to other managers on the meta node only once, since it's infallible.
self.notification_manager
Expand Down
13 changes: 9 additions & 4 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,23 @@ impl SystemParamsManager {
let params = params_guard.deref_mut();
let mut mem_txn = VarTransaction::new(params);

set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)?;
let Some((_new_value, diff)) =
set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)?
else {
// No changes on the parameter.
return Ok(params.clone());
};

let mut store_txn = Transaction::default();
mem_txn.apply_to_txn(&mut store_txn).await?;
self.meta_store.txn(store_txn).await?;

mem_txn.commit();

// TODO: check if the parameter is actually changed.

// Run common handler.
self.common_handler.handle_change(params.clone().into());
self.common_handler.handle_change(&diff);

// TODO: notify the diff instead of the snapshot.

// Sync params to other managers on the meta node only once, since it's infallible.
self.notification_manager
Expand Down
Loading