diff --git a/Cargo.lock b/Cargo.lock index ca6e6644b505a..678571eddd03a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8686,6 +8686,7 @@ dependencies = [ "console", "const-str", "expect-test", + "home", "madsim-tokio", "prometheus", "risingwave_cmd", diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index f5a08e6c4b688..bb57fbfe88a09 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -23,6 +23,7 @@ anyhow = "1" clap = { version = "4", features = ["cargo", "derive"] } console = "0.15" const-str = "0.5" +home = "0.5" prometheus = { version = "0.13" } risingwave_cmd = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index a1e3a1b5f7063..2c167fc1bdc20 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -17,9 +17,10 @@ use std::str::FromStr; use anyhow::Result; +use clap::error::ErrorKind; use clap::{command, ArgMatches, Args, Command, FromArgMatches}; use risingwave_cmd::{compactor, compute, ctl, frontend, meta}; -use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts}; +use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts}; use risingwave_common::git_sha; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -98,7 +99,14 @@ enum Component { Compactor, Ctl, Playground, + /// Used by cloud to bundle different components into a single node. + /// It exposes the low level configuration options of each node. Standalone, + /// Used by users to run a single node. + /// The low level configuration options are hidden. + /// We only expose high-level configuration options, + /// which map across multiple nodes. + SingleNode, } impl Component { @@ -117,6 +125,7 @@ impl Component { Self::Ctl => ctl(parse_opts(matches)), Self::Playground => playground(parse_opts(matches)), Self::Standalone => standalone(parse_opts(matches)), + Self::SingleNode => single_node(parse_opts(matches)), } } @@ -130,6 +139,7 @@ impl Component { Component::Ctl => vec!["risectl"], Component::Playground => vec!["play"], Component::Standalone => vec![], + Component::SingleNode => vec!["single-node", "single"], } } @@ -143,6 +153,7 @@ impl Component { Component::Ctl => CtlOpts::augment_args(cmd), Component::Playground => PlaygroundOpts::augment_args(cmd), Component::Standalone => StandaloneOpts::augment_args(cmd), + Component::SingleNode => SingleNodeOpts::augment_args(cmd), } } @@ -179,7 +190,23 @@ fn main() -> Result<()> { .subcommands(Component::commands()), ); - let matches = command.get_matches(); + let matches = match command.try_get_matches() { + Ok(m) => m, + Err(e) if e.kind() == ErrorKind::MissingSubcommand => { + // `$ ./risingwave` + // NOTE(kwannoel): This is a hack to make `risingwave` + // work as an alias of `risingwave single-process`. + // If invocation is not a multicall and there's no subcommand, + // we will try to invoke it as a single node. + let command = Component::SingleNode.augment_args(risingwave()); + let matches = command.get_matches(); + Component::SingleNode.start(&matches); + return Ok(()); + } + Err(e) => { + e.exit(); + } + }; let multicall = matches.subcommand().unwrap(); let argv_1 = multicall.1.subcommand(); @@ -207,3 +234,15 @@ fn standalone(opts: StandaloneOpts) { risingwave_rt::init_risingwave_logger(settings); risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } + +/// For single node, the internals are just a config mapping from its +/// high level options to standalone mode node-level options. +/// We will start a standalone instance, with all nodes in the same process. +fn single_node(opts: SingleNodeOpts) { + let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts); + let settings = risingwave_rt::LoggerSettings::from_opts(&opts) + .with_target("risingwave_storage", Level::WARN) + .with_thread_name(true); + risingwave_rt::init_risingwave_logger(settings); + risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); +} diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 94d4fd7ae3929..54ee3243bc662 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -18,7 +18,10 @@ mod common; pub mod playground; mod standalone; +pub mod single_node; + pub use playground::*; +pub use single_node::*; pub use standalone::*; risingwave_expr_impl::enable!(); diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index 70039264f6ef7..1b03048d5d6e0 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -137,7 +137,7 @@ fn get_services(profile: &str) -> (Vec, bool) { } #[derive(Debug, Clone, Parser)] -#[command(about = "The quick way to start a RisingWave cluster for playing around")] +#[command(about = "The quick way to start an in-memory RisingWave cluster for playing around")] pub struct PlaygroundOpts { /// The profile to use. #[clap(short, long, env = "PLAYGROUND_PROFILE", default_value = "playground")] diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs new file mode 100644 index 0000000000000..b89f861f6e4fd --- /dev/null +++ b/src/cmd_all/src/single_node.rs @@ -0,0 +1,229 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::LazyLock; + +use clap::Parser; +use home::home_dir; +use risingwave_common::config::{AsyncStackTraceOption, MetaBackend}; +use risingwave_compactor::CompactorOpts; +use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts}; +use risingwave_frontend::FrontendOpts; +use risingwave_meta_node::MetaNodeOpts; + +use crate::ParsedStandaloneOpts; + +pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { + let mut home_path = home_dir().unwrap(); + home_path.push(".risingwave"); + let home_path = home_path.to_str().unwrap(); + home_path.to_string() +}); + +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = + LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); + +pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = + LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = + LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { + format!( + "hummock+fs://{}", + DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() + ) +}); + +#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] +#[command( + version, + about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" +)] +/// Here we define our own defaults for the single node mode. +pub struct SingleNodeOpts { + /// The address prometheus polls metrics from. + #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")] + prometheus_listener_addr: Option, + + /// The path to the cluster configuration file. + #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")] + config_path: Option, + + /// The store directory used by meta store and object store. + #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")] + store_directory: Option, + + /// The address of the meta node. + #[clap(long, env = "RW_SINGLE_NODE_META_ADDR")] + meta_addr: Option, + + /// The address of the compute node + #[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")] + compute_addr: Option, + + /// The address of the frontend node + #[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")] + frontend_addr: Option, + + /// The address of the compactor node + #[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")] + compactor_addr: Option, +} + +pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { + format!( + "sqlite://{}/meta_store/single_node.db?mode=rwc", + store_directory + ) +} + +pub fn make_single_node_state_store_url(store_directory: &String) -> String { + format!("hummock+fs://{}/state_store", store_directory) +} + +pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { + let mut meta_opts = SingleNodeOpts::default_meta_opts(); + let mut compute_opts = SingleNodeOpts::default_compute_opts(); + let mut frontend_opts = SingleNodeOpts::default_frontend_opts(); + let mut compactor_opts = SingleNodeOpts::default_compactor_opts(); + if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr { + meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone()); + compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(config_path) = &opts.config_path { + meta_opts.config_path = config_path.clone(); + compute_opts.config_path = config_path.clone(); + frontend_opts.config_path = config_path.clone(); + compactor_opts.config_path = config_path.clone(); + } + if let Some(store_directory) = &opts.store_directory { + let state_store_url = make_single_node_state_store_url(store_directory); + let meta_store_endpoint = make_single_node_sql_endpoint(store_directory); + meta_opts.state_store = Some(state_store_url); + meta_opts.sql_endpoint = Some(meta_store_endpoint); + } + if let Some(meta_addr) = &opts.meta_addr { + meta_opts.listen_addr = meta_addr.clone(); + meta_opts.advertise_addr = meta_addr.clone(); + + compute_opts.meta_address = meta_addr.parse().unwrap(); + frontend_opts.meta_addr = meta_addr.parse().unwrap(); + compactor_opts.meta_address = meta_addr.parse().unwrap(); + } + if let Some(compute_addr) = &opts.compute_addr { + compute_opts.listen_addr = compute_addr.clone(); + } + if let Some(frontend_addr) = &opts.frontend_addr { + frontend_opts.listen_addr = frontend_addr.clone(); + } + if let Some(compactor_addr) = &opts.compactor_addr { + compactor_opts.listen_addr = compactor_addr.clone(); + } + ParsedStandaloneOpts { + meta_opts: Some(meta_opts), + compute_opts: Some(compute_opts), + frontend_opts: Some(frontend_opts), + compactor_opts: Some(compactor_opts), + } +} + +impl SingleNodeOpts { + fn default_frontend_opts() -> FrontendOpts { + FrontendOpts { + listen_addr: "0.0.0.0:4566".to_string(), + advertise_addr: Some("0.0.0.0:4566".to_string()), + port: None, + meta_addr: "http://0.0.0.0:5690".parse().unwrap(), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + health_check_listener_addr: "0.0.0.0:6786".to_string(), + config_path: "".to_string(), + metrics_level: None, + enable_barrier_read: None, + } + } + + fn default_meta_opts() -> MetaNodeOpts { + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "0.0.0.0:5690".to_string(), + advertise_addr: "0.0.0.0:5690".to_string(), + dashboard_host: Some("0.0.0.0:5691".to_string()), + prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), + etcd_endpoints: Default::default(), + etcd_auth: false, + etcd_username: Default::default(), + etcd_password: Default::default(), + sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), + dashboard_ui_path: None, + prometheus_endpoint: None, + prometheus_selector: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "".to_string(), + backend: Some(MetaBackend::Sql), + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), + data_directory: Some("hummock_001".to_string()), + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + heap_profiling_dir: None, + } + } + + pub fn default_compute_opts() -> ComputeNodeOpts { + ComputeNodeOpts { + listen_addr: "0.0.0.0:5688".to_string(), + advertise_addr: Some("0.0.0.0:5688".to_string()), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "".to_string(), + total_memory_bytes: default_total_memory_bytes(), + parallelism: default_parallelism(), + role: Default::default(), + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), + heap_profiling_dir: None, + } + } + + fn default_compactor_opts() -> CompactorOpts { + CompactorOpts { + listen_addr: "0.0.0.0:6660".to_string(), + advertise_addr: Some("0.0.0.0:6660".to_string()), + port: None, + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + compaction_worker_threads_number: None, + config_path: "".to_string(), + metrics_level: None, + async_stack_trace: None, + heap_profiling_dir: None, + compactor_mode: None, + proxy_rpc_endpoint: "".to_string(), + } + } +} diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 24b90ad613da4..6714da2a37eee 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -25,6 +25,11 @@ use tokio::signal; use crate::common::osstrs; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] +#[command( + version, + about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service", + hide = true +)] pub struct StandaloneOpts { /// Compute node options /// If missing, compute node won't start @@ -161,6 +166,9 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts } } +/// For `standalone` mode, we can configure and start multiple services in one process. +/// `standalone` mode is meant to be used by our cloud service and docker, +/// where we can configure and start multiple services in one process. pub async fn standalone( ParsedStandaloneOpts { meta_opts, diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 20428599b1039..980897d5636e7 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -80,7 +80,6 @@ pub mod test_utils; pub mod transaction; pub mod types; pub mod vnode_mapping; - pub mod test_prelude { pub use super::array::{DataChunkTestExt, StreamChunkTestExt}; pub use super::catalog::test_utils::ColumnDescTestExt; diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index b3125b76052a6..f011ad11be23e 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -223,14 +223,14 @@ pub fn start(opts: ComputeNodeOpts) -> Pin + Send>> }) } -fn default_total_memory_bytes() -> usize { +pub fn default_total_memory_bytes() -> usize { (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize } -fn default_parallelism() -> usize { +pub fn default_parallelism() -> usize { total_cpu_available().ceil() as usize } -fn default_role() -> Role { +pub fn default_role() -> Role { Role::Both } diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 618420a89da14..55df5fcff89f3 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -39,10 +39,10 @@ use crate::manager::MetaOpts; #[command(version, about = "The central metadata management service")] pub struct MetaNodeOpts { #[clap(long, env = "RW_VPC_ID")] - vpc_id: Option, + pub vpc_id: Option, #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] - security_group_id: Option, + pub security_group_id: Option, // TODO: use `SocketAddr` #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] @@ -54,45 +54,45 @@ pub struct MetaNodeOpts { /// It will serve as a unique identifier in cluster /// membership and leader election. Must be specified for etcd backend. #[clap(long, env = "RW_ADVERTISE_ADDR")] - advertise_addr: String, + pub advertise_addr: String, #[clap(long, env = "RW_DASHBOARD_HOST")] - dashboard_host: Option, + pub dashboard_host: Option, #[clap(long, env = "RW_PROMETHEUS_HOST")] pub prometheus_host: Option, #[clap(long, env = "RW_ETCD_ENDPOINTS", default_value_t = String::from(""))] - etcd_endpoints: String, + pub etcd_endpoints: String, /// Enable authentication with etcd. By default disabled. #[clap(long, env = "RW_ETCD_AUTH")] - etcd_auth: bool, + pub etcd_auth: bool, /// Username of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_USERNAME", default_value = "")] - etcd_username: String, + pub etcd_username: String, /// Password of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_PASSWORD", default_value = "")] - etcd_password: Secret, + pub etcd_password: Secret, /// Endpoint of the SQL service, make it non-option when SQL service is required. #[clap(long, env = "RW_SQL_ENDPOINT")] - sql_endpoint: Option, + pub sql_endpoint: Option, #[clap(long, env = "RW_DASHBOARD_UI_PATH")] - dashboard_ui_path: Option, + pub dashboard_ui_path: Option, /// For dashboard service to fetch cluster info. #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] - prometheus_endpoint: Option, + pub prometheus_endpoint: Option, /// The additional selector used when querying Prometheus. /// /// The format is same as PromQL. Example: `instance="foo",namespace="bar"` #[clap(long, env = "RW_PROMETHEUS_SELECTOR")] - prometheus_selector: Option, + pub prometheus_selector: Option, /// Endpoint of the connector node, there will be a sidecar connector node /// colocated with Meta node in the cloud environment @@ -113,52 +113,52 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_BACKEND", value_enum)] #[override_opts(path = meta.backend)] - backend: Option, + pub backend: Option, /// The interval of periodic barrier. #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] #[override_opts(path = system.barrier_interval_ms)] - barrier_interval_ms: Option, + pub barrier_interval_ms: Option, /// Target size of the Sstable. #[clap(long, env = "RW_SSTABLE_SIZE_MB")] #[override_opts(path = system.sstable_size_mb)] - sstable_size_mb: Option, + pub sstable_size_mb: Option, /// Size of each block in bytes in SST. #[clap(long, env = "RW_BLOCK_SIZE_KB")] #[override_opts(path = system.block_size_kb)] - block_size_kb: Option, + pub block_size_kb: Option, /// False positive probability of bloom filter. #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] #[override_opts(path = system.bloom_false_positive)] - bloom_false_positive: Option, + pub bloom_false_positive: Option, /// State store url #[clap(long, env = "RW_STATE_STORE")] #[override_opts(path = system.state_store)] - state_store: Option, + pub state_store: Option, /// Remote directory for storing data and metadata objects. #[clap(long, env = "RW_DATA_DIRECTORY")] #[override_opts(path = system.data_directory)] - data_directory: Option, + pub data_directory: Option, /// Whether config object storage bucket lifecycle to purge stale data. #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] #[override_opts(path = meta.do_not_config_object_storage_lifecycle)] - do_not_config_object_storage_lifecycle: Option, + pub do_not_config_object_storage_lifecycle: Option, /// Remote storage url for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_URL")] #[override_opts(path = system.backup_storage_url)] - backup_storage_url: Option, + pub backup_storage_url: Option, /// Remote directory for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] #[override_opts(path = system.backup_storage_directory)] - backup_storage_directory: Option, + pub backup_storage_directory: Option, /// Enable heap profile dump when memory usage is high. #[clap(long, env = "RW_HEAP_PROFILING_DIR")]