Skip to content

Commit

Permalink
move single node defaults configs into its module
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 2, 2024
1 parent a57cf07 commit 8b38319
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 200 deletions.
2 changes: 0 additions & 2 deletions src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env::home_dir;
use std::ffi::OsString;
use std::sync::LazyLock;

pub fn osstrs<T: Into<OsString> + AsRef<std::ffi::OsStr>>(s: impl AsRef<[T]>) -> Vec<OsString> {
s.as_ref().iter().map(OsString::from).collect()
Expand Down
148 changes: 122 additions & 26 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use anyhow::Result;
use clap::Parser;
use risingwave_common::single_process_config::{
make_single_node_sql_endpoint, make_single_node_state_store_url,
};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts};
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::osstrs;
use crate::ParsedStandaloneOpts;

pub static DEFAULT_STORE_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
let home_path = home_path.to_str().unwrap();
home_path.to_string()
});

pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY));

pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock<String> =
LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone()));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock<String> = LazyLock::new(|| {
format!(
"hummock+fs://{}",
DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone()
)
});

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
Expand Down Expand Up @@ -68,11 +85,22 @@ pub struct SingleNodeOpts {
compactor_addr: Option<String>,
}

pub fn make_single_node_sql_endpoint(store_directory: &String) -> String {
format!(
"sqlite://{}/meta_store/single_node.db?mode=rwc",
store_directory
)
}

pub fn make_single_node_state_store_url(store_directory: &String) -> String {
format!("hummock+fs://{}/state_store", store_directory)
}

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = MetaNodeOpts::new_for_single_node();
let mut compute_opts = ComputeNodeOpts::new_for_single_node();
let mut frontend_opts = FrontendOpts::new_for_single_node();
let mut compactor_opts = CompactorOpts::new_for_single_node();
let mut meta_opts = SingleNodeOpts::default_meta_opts();
let mut compute_opts = SingleNodeOpts::default_compute_opts();
let mut frontend_opts = SingleNodeOpts::default_frontend_opts();
let mut compactor_opts = SingleNodeOpts::default_compactor_opts();
if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
Expand All @@ -85,7 +113,6 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
frontend_opts.config_path = config_path.clone();
compactor_opts.config_path = config_path.clone();
}
// TODO(kwannoel): Also update state store URL
if let Some(store_directory) = &opts.store_directory {
let state_store_url = make_single_node_state_store_url(store_directory);
let meta_store_endpoint = make_single_node_sql_endpoint(store_directory);
Expand Down Expand Up @@ -117,19 +144,88 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
}
}

#[cfg(test)]
mod test {
use std::fmt::Debug;

use expect_test::{expect, Expect};
impl SingleNodeOpts {
fn default_frontend_opts() -> FrontendOpts {
FrontendOpts {
listen_addr: "0.0.0.0:4566".to_string(),
advertise_addr: Some("0.0.0.0:4566".to_string()),
port: None,
meta_addr: "http://0.0.0.0:5690".parse().unwrap(),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
health_check_listener_addr: "0.0.0.0:6786".to_string(),
config_path: "".to_string(),
metrics_level: None,
enable_barrier_read: None,
}
}

use super::*;
fn default_meta_opts() -> MetaNodeOpts {
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "0.0.0.0:5690".to_string(),
advertise_addr: "0.0.0.0:5690".to_string(),
dashboard_host: Some("0.0.0.0:5691".to_string()),
prometheus_listener_addr: Some("0.0.0.0:1250".to_string()),
etcd_endpoints: Default::default(),
etcd_auth: false,
etcd_username: Default::default(),
etcd_password: Default::default(),
sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()),
dashboard_ui_path: None,
prometheus_endpoint: None,
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "".to_string(),
backend: Some(MetaBackend::Sql),
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()),
data_directory: Some("hummock_001".to_string()),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
heap_profiling_dir: None,
}
}

fn check(actual: impl Debug, expect: Expect) {
let actual = format!("{:#?}", actual);
expect.assert_eq(&actual);
pub fn default_compute_opts() -> ComputeNodeOpts {
ComputeNodeOpts {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
}

#[test]
fn test_parse_opt_args() {}
fn default_compactor_opts() -> CompactorOpts {
CompactorOpts {
listen_addr: "0.0.0.0:6660".to_string(),
advertise_addr: Some("0.0.0.0:6660".to_string()),
port: None,
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
compaction_worker_threads_number: None,
config_path: "".to_string(),
metrics_level: None,
async_stack_trace: None,
heap_profiling_dir: None,
compactor_mode: None,
proxy_rpc_endpoint: "".to_string(),
}
}
}
2 changes: 0 additions & 2 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ pub mod opts;
pub mod range;
pub mod row;
pub mod session_config;
pub mod single_process_config;
pub mod system_param;
pub mod telemetry;
pub mod test_utils;
pub mod transaction;
pub mod types;
pub mod vnode_mapping;

pub mod test_prelude {
pub use super::array::{DataChunkTestExt, StreamChunkTestExt};
pub use super::catalog::test_utils::ColumnDescTestExt;
Expand Down
53 changes: 0 additions & 53 deletions src/common/src/single_process_config.rs

This file was deleted.

29 changes: 3 additions & 26 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use serde::{Deserialize, Serialize};
use tonic::IntoStreamingRequest;

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
/// the system memory limit multiplied by this proportion
Expand Down Expand Up @@ -134,28 +133,6 @@ pub struct ComputeNodeOpts {
pub heap_profiling_dir: Option<String>,
}

impl ComputeNodeOpts {
pub fn new_for_single_node() -> Self {
Self {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
}
}

impl risingwave_common::opts::Opts for ComputeNodeOpts {
fn name() -> &'static str {
"compute"
Expand Down Expand Up @@ -248,14 +225,14 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
})
}

fn default_total_memory_bytes() -> usize {
pub fn default_total_memory_bytes() -> usize {
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}

fn default_parallelism() -> usize {
pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

fn default_role() -> Role {
pub fn default_role() -> Role {
Role::Both
}
16 changes: 0 additions & 16 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,22 +143,6 @@ pub struct FrontendOpts {
pub enable_barrier_read: Option<bool>,
}

impl FrontendOpts {
pub fn new_for_single_node() -> Self {
Self {
listen_addr: "0.0.0.0:4566".to_string(),
advertise_addr: Some("0.0.0.0:4566".to_string()),
port: None,
meta_addr: "http://0.0.0.0:5690".parse().unwrap(),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
health_check_listener_addr: "0.0.0.0:6786".to_string(),
config_path: "".to_string(),
metrics_level: None,
enable_barrier_read: None,
}
}
}

impl risingwave_common::opts::Opts for FrontendOpts {
fn name() -> &'static str {
"frontend"
Expand Down
Loading

0 comments on commit 8b38319

Please sign in to comment.