Skip to content

Commit

Permalink
apply node-specific options
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh committed Mar 11, 2024
1 parent 2bec439 commit f631e83
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn standalone(opts: StandaloneOpts) {
/// high level options to standalone mode node-level options.
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
Expand Down
30 changes: 25 additions & 5 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct NodeSpecificOpts {
pub compaction_worker_threads_number: Option<usize>,
}

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
// Parse from empty strings to get the default values.
// Note that environment variables will be used if they are set.
let empty_args = vec![] as Vec<String>;
Expand All @@ -120,20 +120,20 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
compactor_opts.config_path = config_path.clone();
}

let store_directory = if let Some(store_directory) = &opts.store_directory {
store_directory.clone()
} else {
let store_directory = opts.store_directory.unwrap_or_else(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
home_path.to_str().unwrap().to_string()
};
});

// Set state store for meta (if not set)
if meta_opts.state_store.is_none() {
let state_store_url = format!("hummock+fs://{}/state_store", &store_directory);
std::fs::create_dir_all(&state_store_url).unwrap();
meta_opts.state_store = Some(state_store_url);
}

// Set meta store for meta (if not set)
let meta_backend_is_set = match meta_opts.backend {
Some(MetaBackend::Etcd) => !meta_opts.etcd_endpoints.is_empty(),
Some(MetaBackend::Sql) => meta_opts.sql_endpoint.is_some(),
Expand Down Expand Up @@ -165,6 +165,26 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();

// Apply node-specific options
if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
compute_opts.total_memory_bytes = total_memory_bytes;
}
if let Some(parallelism) = opts.node_opts.parallelism {
compute_opts.parallelism = parallelism;
}
if let Some(listen_addr) = opts.node_opts.listen_addr {
frontend_opts.listen_addr = listen_addr;
}
if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
}
if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
meta_opts.prometheus_selector = Some(prometheus_selector);
}
if let Some(n) = opts.node_opts.compaction_worker_threads_number {
compactor_opts.compaction_worker_threads_number = Some(n);
}

ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
Expand Down

0 comments on commit f631e83

Please sign in to comment.