Skip to content

Commit

Permalink
generate system config with macro
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 15, 2024
1 parent 1e6704d commit a5e510c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 90 deletions.
100 changes: 28 additions & 72 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize, Serializer};
use serde_default::DefaultFromSerde;
use serde_json::Value;

use crate::for_all_params;
use crate::hash::VirtualNode;

/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
Expand Down Expand Up @@ -849,65 +850,27 @@ pub struct BatchDeveloperConfig {
#[serde(default = "default::developer::batch_chunk_size")]
pub chunk_size: usize,
}
/// The section `[system]` in `risingwave.toml`. All these fields are used to initialize the system
/// parameters persisted in Meta store. Most fields are for testing purpose only and should not be
/// documented.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct SystemConfig {
/// The interval of periodic barrier.
#[serde(default = "default::system::barrier_interval_ms")]
pub barrier_interval_ms: Option<u32>,

/// There will be a checkpoint for every n barriers
#[serde(default = "default::system::checkpoint_frequency")]
pub checkpoint_frequency: Option<u64>,

/// Target size of the Sstable.
#[serde(default = "default::system::sstable_size_mb")]
pub sstable_size_mb: Option<u32>,

#[serde(default = "default::system::parallel_compact_size_mb")]
pub parallel_compact_size_mb: Option<u32>,

/// Size of each block in bytes in SST.
#[serde(default = "default::system::block_size_kb")]
pub block_size_kb: Option<u32>,

/// False positive probability of bloom filter.
#[serde(default = "default::system::bloom_false_positive")]
pub bloom_false_positive: Option<f64>,

#[serde(default = "default::system::state_store")]
pub state_store: Option<String>,

/// Remote directory for storing data and metadata objects.
#[serde(default = "default::system::data_directory")]
pub data_directory: Option<String>,

/// Remote storage url for storing snapshots.
#[serde(default = "default::system::backup_storage_url")]
pub backup_storage_url: Option<String>,

/// Remote directory for storing snapshots.
#[serde(default = "default::system::backup_storage_directory")]
pub backup_storage_directory: Option<String>,

/// Max number of concurrent creating streaming jobs.
#[serde(default = "default::system::max_concurrent_creating_streaming_jobs")]
pub max_concurrent_creating_streaming_jobs: Option<u32>,

/// Whether to pause all data sources on next bootstrap.
#[serde(default = "default::system::pause_on_next_bootstrap")]
pub pause_on_next_bootstrap: Option<bool>,

#[serde(default = "default::system::wasm_storage_url")]
pub wasm_storage_url: Option<String>,

/// Whether to enable distributed tracing.
#[serde(default = "default::system::enable_tracing")]
pub enable_tracing: Option<bool>,
macro_rules! define_system_config {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $doc:literal, $($rest:tt)* },)*) => {
paste::paste!(
/// The section `[system]` in `risingwave.toml`. All these fields are used to initialize the system
/// parameters persisted in Meta store. Most fields are for testing purpose only and should not be
/// documented.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct SystemConfig {
$(
#[doc = $doc]
#[serde(default = "default::system::" $field)]
pub $field: Option<$type>,
)*
}
);
};
}

for_all_params!(define_system_config);

/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
Expand Down Expand Up @@ -946,23 +909,16 @@ pub struct S3ObjectStoreConfig {
impl SystemConfig {
#![allow(deprecated)]
pub fn into_init_system_params(self) -> SystemParams {
SystemParams {
barrier_interval_ms: self.barrier_interval_ms,
checkpoint_frequency: self.checkpoint_frequency,
sstable_size_mb: self.sstable_size_mb,
parallel_compact_size_mb: self.parallel_compact_size_mb,
block_size_kb: self.block_size_kb,
bloom_false_positive: self.bloom_false_positive,
state_store: self.state_store,
data_directory: self.data_directory,
backup_storage_url: self.backup_storage_url,
backup_storage_directory: self.backup_storage_directory,
max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs,
pause_on_next_bootstrap: self.pause_on_next_bootstrap,
wasm_storage_url: self.wasm_storage_url,
enable_tracing: self.enable_tracing,
telemetry_enabled: None, // deprecated
macro_rules! fields {
($({ $field:ident, $($rest:tt)* },)*) => {
SystemParams {
$($field: self.$field,)*
..Default::default() // deprecated fields
}
};
}

for_all_params!(fields)
}
}

Expand Down
34 changes: 17 additions & 17 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ type Result<T> = core::result::Result<T, SystemParamsError>;
macro_rules! for_all_params {
($macro:ident) => {
$macro! {
// name type default value mutable
{ barrier_interval_ms, u32, Some(1000_u32), true, },
{ checkpoint_frequency, u64, Some(1_u64), true, },
{ sstable_size_mb, u32, Some(256_u32), false, },
{ parallel_compact_size_mb, u32, Some(512_u32), false, },
{ block_size_kb, u32, Some(64_u32), false, },
{ bloom_false_positive, f64, Some(0.001_f64), false, },
{ state_store, String, None, false, },
{ data_directory, String, None, false, },
{ backup_storage_url, String, Some("memory".to_string()), true, },
{ backup_storage_directory, String, Some("backup".to_string()), true, },
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, },
{ pause_on_next_bootstrap, bool, Some(false), true, },
{ wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false, },
{ enable_tracing, bool, Some(false), true, },
// name type default value mut? doc
{ barrier_interval_ms, u32, Some(1000_u32), true, "The interval of periodic barrier.", },
{ checkpoint_frequency, u64, Some(1_u64), true, "There will be a checkpoint for every n barriers.", },
{ sstable_size_mb, u32, Some(256_u32), false, "Target size of the Sstable.", },
{ parallel_compact_size_mb, u32, Some(512_u32), false, "", },
{ block_size_kb, u32, Some(64_u32), false, "Size of each block in bytes in SST.", },
{ bloom_false_positive, f64, Some(0.001_f64), false, "False positive probability of bloom filter.", },
{ state_store, String, None, false, "", },
{ data_directory, String, None, false, "Remote directory for storing data and metadata objects.", },
{ backup_storage_url, String, Some("memory".to_string()), true, "Remote storage url for storing snapshots.", },
{ backup_storage_directory, String, Some("backup".to_string()), true, "Remote directory for storing snapshots.", },
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ wasm_storage_url, String, Some("fs://.risingwave/data".to_string()), false, "", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
}
};
}
Expand Down Expand Up @@ -188,7 +188,7 @@ macro_rules! impl_system_params_from_kv {
/// If you want custom rules, please override the default implementation in
/// `OverrideValidateOnSet` below.
macro_rules! impl_default_validation_on_set {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, },)*) => {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
#[allow(clippy::ptr_arg)]
trait ValidateOnSet {
$(
Expand Down Expand Up @@ -277,7 +277,7 @@ macro_rules! impl_set_system_param {
}

macro_rules! impl_is_mutable {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, },)*) => {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
pub fn is_mutable(field: &str) -> Result<bool> {
match field {
$(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ macro_rules! impl_system_params_from_db {

/// Derive serialization to db models.
macro_rules! impl_system_params_to_models {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, },)*) => {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr, $($rest:tt)* },)*) => {
#[allow(clippy::vec_init_then_push)]
pub fn system_params_to_model(params: &PbSystemParams) -> MetaResult<Vec<system_parameter::ActiveModel>> {
check_missing_params(params).map_err(|e| anyhow!(e))?;
Expand Down

0 comments on commit a5e510c

Please sign in to comment.