diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 4347dde1563f1..68e9090578ab0 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env::home_dir; use std::ffi::OsString; -use std::sync::LazyLock; pub fn osstrs + AsRef>(s: impl AsRef<[T]>) -> Vec { s.as_ref().iter().map(OsString::from).collect() diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 0dffaeec877ed..19f79128ded22 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -12,24 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; +use std::sync::LazyLock; -use anyhow::Result; use clap::Parser; -use risingwave_common::single_process_config::{ - make_single_node_sql_endpoint, make_single_node_state_store_url, -}; -use risingwave_common::util::meta_addr::MetaAddressStrategy; +use home::home_dir; +use risingwave_common::config::{AsyncStackTraceOption, MetaBackend}; use risingwave_compactor::CompactorOpts; -use risingwave_compute::ComputeNodeOpts; +use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts}; use risingwave_frontend::FrontendOpts; use risingwave_meta_node::MetaNodeOpts; -use shell_words::split; -use tokio::signal; -use crate::common::osstrs; use crate::ParsedStandaloneOpts; +pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { + let mut home_path = home_dir().unwrap(); + home_path.push(".risingwave"); + let home_path = home_path.to_str().unwrap(); + home_path.to_string() +}); + +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = + LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); + +pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = + LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = + LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { + format!( + "hummock+fs://{}", + DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() + ) +}); + #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] #[command( version, @@ -68,11 +85,22 @@ pub struct SingleNodeOpts { compactor_addr: Option, } +pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { + format!( + "sqlite://{}/meta_store/single_node.db?mode=rwc", + store_directory + ) +} + +pub fn make_single_node_state_store_url(store_directory: &String) -> String { + format!("hummock+fs://{}/state_store", store_directory) +} + pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { - let mut meta_opts = MetaNodeOpts::new_for_single_node(); - let mut compute_opts = ComputeNodeOpts::new_for_single_node(); - let mut frontend_opts = FrontendOpts::new_for_single_node(); - let mut compactor_opts = CompactorOpts::new_for_single_node(); + let mut meta_opts = SingleNodeOpts::default_meta_opts(); + let mut compute_opts = SingleNodeOpts::default_compute_opts(); + let mut frontend_opts = SingleNodeOpts::default_frontend_opts(); + let mut compactor_opts = SingleNodeOpts::default_compactor_opts(); if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr { meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone()); compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); @@ -85,7 +113,6 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS frontend_opts.config_path = config_path.clone(); compactor_opts.config_path = config_path.clone(); } - // TODO(kwannoel): Also update state store URL if let Some(store_directory) = &opts.store_directory { let state_store_url = make_single_node_state_store_url(store_directory); let meta_store_endpoint = make_single_node_sql_endpoint(store_directory); @@ -117,19 +144,88 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS } } -#[cfg(test)] -mod test { - use std::fmt::Debug; - - use expect_test::{expect, Expect}; +impl SingleNodeOpts { + fn default_frontend_opts() -> FrontendOpts { + FrontendOpts { + listen_addr: "0.0.0.0:4566".to_string(), + advertise_addr: Some("0.0.0.0:4566".to_string()), + port: None, + meta_addr: "http://0.0.0.0:5690".parse().unwrap(), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + health_check_listener_addr: "0.0.0.0:6786".to_string(), + config_path: "".to_string(), + metrics_level: None, + enable_barrier_read: None, + } + } - use super::*; + fn default_meta_opts() -> MetaNodeOpts { + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "0.0.0.0:5690".to_string(), + advertise_addr: "0.0.0.0:5690".to_string(), + dashboard_host: Some("0.0.0.0:5691".to_string()), + prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), + etcd_endpoints: Default::default(), + etcd_auth: false, + etcd_username: Default::default(), + etcd_password: Default::default(), + sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), + dashboard_ui_path: None, + prometheus_endpoint: None, + prometheus_selector: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "".to_string(), + backend: Some(MetaBackend::Sql), + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), + data_directory: Some("hummock_001".to_string()), + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + heap_profiling_dir: None, + } + } - fn check(actual: impl Debug, expect: Expect) { - let actual = format!("{:#?}", actual); - expect.assert_eq(&actual); + pub fn default_compute_opts() -> ComputeNodeOpts { + ComputeNodeOpts { + listen_addr: "0.0.0.0:5688".to_string(), + advertise_addr: Some("0.0.0.0:5688".to_string()), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "".to_string(), + total_memory_bytes: default_total_memory_bytes(), + parallelism: default_parallelism(), + role: Default::default(), + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), + heap_profiling_dir: None, + } } - #[test] - fn test_parse_opt_args() {} + fn default_compactor_opts() -> CompactorOpts { + CompactorOpts { + listen_addr: "0.0.0.0:6660".to_string(), + advertise_addr: Some("0.0.0.0:6660".to_string()), + port: None, + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + compaction_worker_threads_number: None, + config_path: "".to_string(), + metrics_level: None, + async_stack_trace: None, + heap_profiling_dir: None, + compactor_mode: None, + proxy_rpc_endpoint: "".to_string(), + } + } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index ed7696131a437..980897d5636e7 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -74,14 +74,12 @@ pub mod opts; pub mod range; pub mod row; pub mod session_config; -pub mod single_process_config; pub mod system_param; pub mod telemetry; pub mod test_utils; pub mod transaction; pub mod types; pub mod vnode_mapping; - pub mod test_prelude { pub use super::array::{DataChunkTestExt, StreamChunkTestExt}; pub use super::catalog::test_utils::ColumnDescTestExt; diff --git a/src/common/src/single_process_config.rs b/src/common/src/single_process_config.rs deleted file mode 100644 index 4c272b6742e04..0000000000000 --- a/src/common/src/single_process_config.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! This module defines default configurations for single node mode. - -use std::sync::LazyLock; - -use home::home_dir; - -pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { - let mut home_path = home_dir().unwrap(); - home_path.push(".risingwave"); - let home_path = home_path.to_str().unwrap(); - home_path.to_string() -}); - -pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = - LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); - -pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = - LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); - -pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { - format!( - "sqlite://{}/meta_store/single_node.db?mode=rwc", - store_directory - ) -} - -pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = - LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); - -pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { - format!( - "hummock+fs://{}", - DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() - ) -}); - -pub fn make_single_node_state_store_url(store_directory: &String) -> String { - format!("hummock+fs://{}/state_store", store_directory) -} diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 41c18bdf13355..3673997d2a128 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -39,7 +39,6 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use serde::{Deserialize, Serialize}; -use tonic::IntoStreamingRequest; /// If `total_memory_bytes` is not specified, the default memory limit will be set to /// the system memory limit multiplied by this proportion @@ -134,28 +133,6 @@ pub struct ComputeNodeOpts { pub heap_profiling_dir: Option, } -impl ComputeNodeOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:5688".to_string(), - advertise_addr: Some("0.0.0.0:5688".to_string()), - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - meta_address: "http://0.0.0.0:5690".parse().unwrap(), - connector_rpc_endpoint: None, - connector_rpc_sink_payload_format: None, - config_path: "".to_string(), - total_memory_bytes: default_total_memory_bytes(), - parallelism: default_parallelism(), - role: Default::default(), - metrics_level: None, - data_file_cache_dir: None, - meta_file_cache_dir: None, - async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), - heap_profiling_dir: None, - } - } -} - impl risingwave_common::opts::Opts for ComputeNodeOpts { fn name() -> &'static str { "compute" @@ -248,14 +225,14 @@ pub fn start(opts: ComputeNodeOpts) -> Pin + Send>> }) } -fn default_total_memory_bytes() -> usize { +pub fn default_total_memory_bytes() -> usize { (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize } -fn default_parallelism() -> usize { +pub fn default_parallelism() -> usize { total_cpu_available().ceil() as usize } -fn default_role() -> Role { +pub fn default_role() -> Role { Role::Both } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index d59a0071d6916..805221327f016 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -143,22 +143,6 @@ pub struct FrontendOpts { pub enable_barrier_read: Option, } -impl FrontendOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:4566".to_string(), - advertise_addr: Some("0.0.0.0:4566".to_string()), - port: None, - meta_addr: "http://0.0.0.0:5690".parse().unwrap(), - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - health_check_listener_addr: "0.0.0.0:6786".to_string(), - config_path: "".to_string(), - metrics_level: None, - enable_barrier_read: None, - } - } -} - impl risingwave_common::opts::Opts for FrontendOpts { fn name() -> &'static str { "frontend" diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 3c50aa0df8b89..2e770fb841ada 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -24,9 +24,6 @@ use clap::Parser; pub use error::{MetaError, MetaResult}; use redact::Secret; use risingwave_common::config::OverrideConfig; -use risingwave_common::single_process_config::{ - DEFAULT_SINGLE_NODE_SQL_ENDPOINT, DEFAULT_SINGLE_NODE_STATE_STORE_URL, -}; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; use risingwave_common::{GIT_SHA, RW_VERSION}; @@ -42,10 +39,10 @@ use crate::manager::MetaOpts; #[command(version, about = "The central metadata management service")] pub struct MetaNodeOpts { #[clap(long, env = "RW_VPC_ID")] - vpc_id: Option, + pub vpc_id: Option, #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] - security_group_id: Option, + pub security_group_id: Option, // TODO: use `SocketAddr` #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] @@ -60,7 +57,7 @@ pub struct MetaNodeOpts { pub advertise_addr: String, #[clap(long, env = "RW_DASHBOARD_HOST")] - dashboard_host: Option, + pub dashboard_host: Option, /// We will start a http server at this address via `MetricsManager`. /// Then the prometheus instance will poll the metrics from this address. @@ -68,38 +65,38 @@ pub struct MetaNodeOpts { pub prometheus_listener_addr: Option, #[clap(long, env = "RW_ETCD_ENDPOINTS", default_value_t = String::from(""))] - etcd_endpoints: String, + pub etcd_endpoints: String, /// Enable authentication with etcd. By default disabled. #[clap(long, env = "RW_ETCD_AUTH")] - etcd_auth: bool, + pub etcd_auth: bool, /// Username of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_USERNAME", default_value = "")] - etcd_username: String, + pub etcd_username: String, /// Password of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_PASSWORD", default_value = "")] - etcd_password: Secret, + pub etcd_password: Secret, /// Endpoint of the SQL service, make it non-option when SQL service is required. #[clap(long, env = "RW_SQL_ENDPOINT")] pub sql_endpoint: Option, #[clap(long, env = "RW_DASHBOARD_UI_PATH")] - dashboard_ui_path: Option, + pub dashboard_ui_path: Option, /// The HTTP REST-API address of the Prometheus instance associated to this cluster. /// This address is used to serve PromQL queries to Prometheus. /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them. #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] - prometheus_endpoint: Option, + pub prometheus_endpoint: Option, /// The additional selector used when querying Prometheus. /// /// The format is same as PromQL. Example: `instance="foo",namespace="bar"` #[clap(long, env = "RW_PROMETHEUS_SELECTOR")] - prometheus_selector: Option, + pub prometheus_selector: Option, /// Endpoint of the connector node, there will be a sidecar connector node /// colocated with Meta node in the cloud environment @@ -120,27 +117,27 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_BACKEND", value_enum)] #[override_opts(path = meta.backend)] - backend: Option, + pub backend: Option, /// The interval of periodic barrier. #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] #[override_opts(path = system.barrier_interval_ms)] - barrier_interval_ms: Option, + pub barrier_interval_ms: Option, /// Target size of the Sstable. #[clap(long, env = "RW_SSTABLE_SIZE_MB")] #[override_opts(path = system.sstable_size_mb)] - sstable_size_mb: Option, + pub sstable_size_mb: Option, /// Size of each block in bytes in SST. #[clap(long, env = "RW_BLOCK_SIZE_KB")] #[override_opts(path = system.block_size_kb)] - block_size_kb: Option, + pub block_size_kb: Option, /// False positive probability of bloom filter. #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] #[override_opts(path = system.bloom_false_positive)] - bloom_false_positive: Option, + pub bloom_false_positive: Option, /// State store url #[clap(long, env = "RW_STATE_STORE")] @@ -155,17 +152,17 @@ pub struct MetaNodeOpts { /// Whether config object storage bucket lifecycle to purge stale data. #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] #[override_opts(path = meta.do_not_config_object_storage_lifecycle)] - do_not_config_object_storage_lifecycle: Option, + pub do_not_config_object_storage_lifecycle: Option, /// Remote storage url for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_URL")] #[override_opts(path = system.backup_storage_url)] - backup_storage_url: Option, + pub backup_storage_url: Option, /// Remote directory for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] #[override_opts(path = system.backup_storage_directory)] - backup_storage_directory: Option, + pub backup_storage_directory: Option, /// Enable heap profile dump when memory usage is high. #[clap(long, env = "RW_HEAP_PROFILING_DIR")] @@ -173,41 +170,6 @@ pub struct MetaNodeOpts { pub heap_profiling_dir: Option, } -impl MetaNodeOpts { - pub fn new_for_single_node() -> Self { - MetaNodeOpts { - vpc_id: None, - security_group_id: None, - listen_addr: "0.0.0.0:5690".to_string(), - advertise_addr: "0.0.0.0:5690".to_string(), - dashboard_host: Some("0.0.0.0:5691".to_string()), - prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), - etcd_endpoints: Default::default(), - etcd_auth: false, - etcd_username: Default::default(), - etcd_password: Default::default(), - sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), - dashboard_ui_path: None, - prometheus_endpoint: None, - prometheus_selector: None, - connector_rpc_endpoint: None, - privatelink_endpoint_default_tags: None, - config_path: "".to_string(), - backend: Some(MetaBackend::Sql), - barrier_interval_ms: None, - sstable_size_mb: None, - block_size_kb: None, - bloom_false_positive: None, - state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), - data_directory: Some("hummock_001".to_string()), - do_not_config_object_storage_lifecycle: None, - backup_storage_url: None, - backup_storage_directory: None, - heap_profiling_dir: None, - } - } -} - impl risingwave_common::opts::Opts for MetaNodeOpts { fn name() -> &'static str { "meta" diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index f212261d9dd48..2204ffe0af7ef 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -92,25 +92,6 @@ pub struct CompactorOpts { pub proxy_rpc_endpoint: String, } -impl CompactorOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:6660".to_string(), - advertise_addr: Some("0.0.0.0:6660".to_string()), - port: None, - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - meta_address: "http://0.0.0.0:5690".parse().unwrap(), - compaction_worker_threads_number: None, - config_path: "".to_string(), - metrics_level: None, - async_stack_trace: None, - heap_profiling_dir: None, - compactor_mode: None, - proxy_rpc_endpoint: "".to_string(), - } - } -} - impl risingwave_common::opts::Opts for CompactorOpts { fn name() -> &'static str { "compactor"