From f631e83b2ae8cff6889e04ada291248ebdf0dfc4 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 11 Mar 2024 18:27:31 +0800 Subject: [PATCH] apply node-specific options --- src/cmd_all/src/bin/risingwave.rs | 2 +- src/cmd_all/src/single_node.rs | 30 +++++++++++++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 2c167fc1bdc20..f63002e5fcecb 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -239,7 +239,7 @@ fn standalone(opts: StandaloneOpts) { /// high level options to standalone mode node-level options. /// We will start a standalone instance, with all nodes in the same process. fn single_node(opts: SingleNodeOpts) { - let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts); + let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) .with_target("risingwave_storage", Level::WARN) .with_thread_name(true); diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 73a63fe871c98..67037af22434f 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -98,7 +98,7 @@ pub struct NodeSpecificOpts { pub compaction_worker_threads_number: Option, } -pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { +pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts { // Parse from empty strings to get the default values. // Note that environment variables will be used if they are set. let empty_args = vec![] as Vec; @@ -120,20 +120,20 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS compactor_opts.config_path = config_path.clone(); } - let store_directory = if let Some(store_directory) = &opts.store_directory { - store_directory.clone() - } else { + let store_directory = opts.store_directory.unwrap_or_else(|| { let mut home_path = home_dir().unwrap(); home_path.push(".risingwave"); home_path.to_str().unwrap().to_string() - }; + }); + // Set state store for meta (if not set) if meta_opts.state_store.is_none() { let state_store_url = format!("hummock+fs://{}/state_store", &store_directory); std::fs::create_dir_all(&state_store_url).unwrap(); meta_opts.state_store = Some(state_store_url); } + // Set meta store for meta (if not set) let meta_backend_is_set = match meta_opts.backend { Some(MetaBackend::Etcd) => !meta_opts.etcd_endpoints.is_empty(), Some(MetaBackend::Sql) => meta_opts.sql_endpoint.is_some(), @@ -165,6 +165,26 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS frontend_opts.meta_addr = meta_addr.parse().unwrap(); compactor_opts.meta_address = meta_addr.parse().unwrap(); + // Apply node-specific options + if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes { + compute_opts.total_memory_bytes = total_memory_bytes; + } + if let Some(parallelism) = opts.node_opts.parallelism { + compute_opts.parallelism = parallelism; + } + if let Some(listen_addr) = opts.node_opts.listen_addr { + frontend_opts.listen_addr = listen_addr; + } + if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint { + meta_opts.prometheus_endpoint = Some(prometheus_endpoint); + } + if let Some(prometheus_selector) = opts.node_opts.prometheus_selector { + meta_opts.prometheus_selector = Some(prometheus_selector); + } + if let Some(n) = opts.node_opts.compaction_worker_threads_number { + compactor_opts.compaction_worker_threads_number = Some(n); + } + ParsedStandaloneOpts { meta_opts: Some(meta_opts), compute_opts: Some(compute_opts),