From 3940e2cbca27a5f5e0da177181ddfa0f11e35694 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 31 Jan 2024 20:45:55 +0800 Subject: [PATCH 01/16] add hack to support risingwave as entrypoint --- src/cmd_all/src/bin/risingwave.rs | 12 +++++++++++- src/cmd_all/src/single.rs | 0 src/cmd_all/src/standalone.rs | 8 ++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 src/cmd_all/src/single.rs diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index a1e3a1b5f7063..18947816884ce 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use anyhow::Result; use clap::{command, ArgMatches, Args, Command, FromArgMatches}; +use clap::error::ErrorKind; use risingwave_cmd::{compactor, compute, ctl, frontend, meta}; use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts}; use risingwave_common::git_sha; @@ -179,7 +180,16 @@ fn main() -> Result<()> { .subcommands(Component::commands()), ); - let matches = command.get_matches(); + let matches = match command.try_get_matches() { + Ok(m) => m, + Err(e) if e.kind() == ErrorKind::MissingSubcommand => { + println!("jackpot!"); + return Ok(()); + } + Err(e) => { + e.exit(); + } + }; let multicall = matches.subcommand().unwrap(); let argv_1 = multicall.1.subcommand(); diff --git a/src/cmd_all/src/single.rs b/src/cmd_all/src/single.rs new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 8765db18a07cd..bdd662324ee01 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -161,6 +161,14 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts } } +/// For `standalone` mode, we can configure and start multiple services in one process. +/// Note that this is different from `single` mode, where we start +/// pre-defined services all-in-one process, +/// the pre-defined services are not configurable. +/// `single` mode is meant to be user-facing, where users can just use `./risingwave` +/// to start the service. +/// `standalone` mode is meant to be used by our cloud service, where we can configure +/// and start multiple services in one process. pub async fn standalone( ParsedStandaloneOpts { meta_opts, From 8901a4b766a35dc9d7141b7ac772d5fca6f4d4af Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 1 Feb 2024 13:35:39 +0800 Subject: [PATCH 02/16] add single_node interface + skeleton file --- src/cmd_all/src/bin/risingwave.rs | 27 ++- src/cmd_all/src/lib.rs | 3 + src/cmd_all/src/single_node.rs | 345 ++++++++++++++++++++++++++++++ src/cmd_all/src/standalone.rs | 4 + 4 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 src/cmd_all/src/single_node.rs diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 18947816884ce..48c8386a54cfa 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -17,10 +17,10 @@ use std::str::FromStr; use anyhow::Result; -use clap::{command, ArgMatches, Args, Command, FromArgMatches}; use clap::error::ErrorKind; +use clap::{command, ArgMatches, Args, Command, FromArgMatches}; use risingwave_cmd::{compactor, compute, ctl, frontend, meta}; -use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts}; +use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts}; use risingwave_common::git_sha; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -99,7 +99,14 @@ enum Component { Compactor, Ctl, Playground, + /// Used by cloud to bundle different components into a single node. + /// It exposes the low level configuration options of each node. Standalone, + /// Used by users to run a single node. + /// The low level configuration options are hidden. + /// We only expose high-level configuration options, + /// which map across multiple nodes. + SingleNode, } impl Component { @@ -118,6 +125,7 @@ impl Component { Self::Ctl => ctl(parse_opts(matches)), Self::Playground => playground(parse_opts(matches)), Self::Standalone => standalone(parse_opts(matches)), + Self::SingleNode => single_node(parse_opts(matches)), } } @@ -131,6 +139,7 @@ impl Component { Component::Ctl => vec!["risectl"], Component::Playground => vec!["play"], Component::Standalone => vec![], + Component::SingleNode => vec![], } } @@ -144,6 +153,7 @@ impl Component { Component::Ctl => CtlOpts::augment_args(cmd), Component::Playground => PlaygroundOpts::augment_args(cmd), Component::Standalone => StandaloneOpts::augment_args(cmd), + Component::SingleNode => SingleNodeOpts::augment_args(cmd), } } @@ -183,7 +193,9 @@ fn main() -> Result<()> { let matches = match command.try_get_matches() { Ok(m) => m, Err(e) if e.kind() == ErrorKind::MissingSubcommand => { - println!("jackpot!"); + let command = Component::SingleNode.augment_args(risingwave()); + let matches = command.get_matches(); + Component::SingleNode.start(&matches); return Ok(()); } Err(e) => { @@ -217,3 +229,12 @@ fn standalone(opts: StandaloneOpts) { risingwave_rt::init_risingwave_logger(settings); risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } + +fn single_node(opts: SingleNodeOpts) { + let opts = risingwave_cmd_all::parse_single_node_opt_args(&opts); + let settings = risingwave_rt::LoggerSettings::from_opts(&opts) + .with_target("risingwave_storage", Level::WARN) + .with_thread_name(true); + risingwave_rt::init_risingwave_logger(settings); + risingwave_rt::main_okk(risingwave_cmd_all::single_node(opts)).unwrap(); +} diff --git a/src/cmd_all/src/lib.rs b/src/cmd_all/src/lib.rs index 94d4fd7ae3929..54ee3243bc662 100644 --- a/src/cmd_all/src/lib.rs +++ b/src/cmd_all/src/lib.rs @@ -18,7 +18,10 @@ mod common; pub mod playground; mod standalone; +pub mod single_node; + pub use playground::*; +pub use single_node::*; pub use standalone::*; risingwave_expr_impl::enable!(); diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs new file mode 100644 index 0000000000000..19098d6e75c9b --- /dev/null +++ b/src/cmd_all/src/single_node.rs @@ -0,0 +1,345 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use clap::Parser; +use risingwave_common::util::meta_addr::MetaAddressStrategy; +use risingwave_compactor::CompactorOpts; +use risingwave_compute::ComputeNodeOpts; +use risingwave_frontend::FrontendOpts; +use risingwave_meta_node::MetaNodeOpts; +use shell_words::split; +use tokio::signal; + +use crate::common::osstrs; + +#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] +#[command( +version, +about = "[DEFAULT] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" +)] +pub struct SingleNodeOpts { + /// Compute node options + /// If missing, compute node won't start + #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")] + compute_opts: Option, + + #[clap(short, long, env = "RW_STANDALONE_META_OPTS")] + /// Meta node options + /// If missing, meta node won't start + meta_opts: Option, + + #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")] + /// Frontend node options + /// If missing, frontend node won't start + frontend_opts: Option, + + #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")] + /// Compactor node options + /// If missing compactor node won't start + compactor_opts: Option, + + #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")] + /// Prometheus listener address + /// If present, it will override prometheus listener address for + /// Frontend, Compute and Compactor nodes + prometheus_listener_addr: Option, + + #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")] + /// Path to the config file + /// If present, it will override config path for + /// Frontend, Compute and Compactor nodes + config_path: Option, +} + +#[derive(Debug)] +pub struct ParsedSingleNodeOpts { + pub meta_opts: Option, + pub compute_opts: Option, + pub frontend_opts: Option, + pub compactor_opts: Option, +} + +impl risingwave_common::opts::Opts for ParsedSingleNodeOpts { + fn name() -> &'static str { + "single_node" + } + + fn meta_addr(&self) -> MetaAddressStrategy { + if let Some(opts) = self.meta_opts.as_ref() { + opts.meta_addr() + } else if let Some(opts) = self.compute_opts.as_ref() { + opts.meta_addr() + } else if let Some(opts) = self.frontend_opts.as_ref() { + opts.meta_addr() + } else if let Some(opts) = self.compactor_opts.as_ref() { + opts.meta_addr() + } else { + unreachable!("at least one service should be specified as checked during parsing") + } + } +} + +pub fn parse_single_node_opt_args(opts: &SingleNodeOpts) -> ParsedSingleNodeOpts { + let meta_opts = opts.meta_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "meta-node".into()); + s + }); + let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o))); + + let compute_opts = opts.compute_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "compute-node".into()); + s + }); + let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o))); + + let frontend_opts = opts.frontend_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "frontend-node".into()); + s + }); + let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o))); + + let compactor_opts = opts.compactor_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "compactor-node".into()); + s + }); + let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o))); + + if let Some(config_path) = opts.config_path.as_ref() { + if let Some(meta_opts) = meta_opts.as_mut() { + meta_opts.config_path = config_path.clone(); + } + if let Some(compute_opts) = compute_opts.as_mut() { + compute_opts.config_path = config_path.clone(); + } + if let Some(frontend_opts) = frontend_opts.as_mut() { + frontend_opts.config_path = config_path.clone(); + } + if let Some(compactor_opts) = compactor_opts.as_mut() { + compactor_opts.config_path = config_path.clone(); + } + } + if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() { + if let Some(compute_opts) = compute_opts.as_mut() { + compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(frontend_opts) = frontend_opts.as_mut() { + frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(compactor_opts) = compactor_opts.as_mut() { + compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(meta_opts) = meta_opts.as_mut() { + meta_opts.prometheus_host = Some(prometheus_listener_addr.clone()); + } + } + + if meta_opts.is_none() + && compute_opts.is_none() + && frontend_opts.is_none() + && compactor_opts.is_none() + { + panic!("No service is specified to start."); + } + + ParsedSingleNodeOpts { + meta_opts, + compute_opts, + frontend_opts, + compactor_opts, + } +} + +/// For `single_node` mode, we can configure and start multiple services in one process. +/// Note that this is different from `single` mode, where we start +/// pre-defined services all-in-one process, +/// the pre-defined services are not configurable. +/// `single` mode is meant to be user-facing, where users can just use `./risingwave` +/// to start the service. +/// `single_node` mode is meant to be used by our cloud service, where we can configure +/// and start multiple services in one process. +pub async fn single_node( + ParsedSingleNodeOpts { + meta_opts, + compute_opts, + frontend_opts, + compactor_opts, + }: ParsedSingleNodeOpts, +) -> Result<()> { + tracing::info!("launching Risingwave in single_node mode"); + + if let Some(opts) = meta_opts { + tracing::info!("starting meta-node thread with cli args: {:?}", opts); + + let _meta_handle = tokio::spawn(async move { + risingwave_meta_node::start(opts).await; + tracing::warn!("meta is stopped, shutdown all nodes"); + }); + // wait for the service to be ready + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + if let Some(opts) = compute_opts { + tracing::info!("starting compute-node thread with cli args: {:?}", opts); + let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await }); + } + if let Some(opts) = frontend_opts { + tracing::info!("starting frontend-node thread with cli args: {:?}", opts); + let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await }); + } + if let Some(opts) = compactor_opts { + tracing::info!("starting compactor-node thread with cli args: {:?}", opts); + let _compactor_handle = + tokio::spawn(async move { risingwave_compactor::start(opts).await }); + } + + // wait for log messages to be flushed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + eprintln!("-------------------------------"); + eprintln!("RisingWave single_node mode is ready."); + + // TODO: should we join all handles? + // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. + // TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not + // support it? + signal::ctrl_c().await.unwrap(); + tracing::info!("Ctrl+C received, now exiting"); + + Ok(()) +} + +#[cfg(test)] +mod test { + use std::fmt::Debug; + + use expect_test::{expect, Expect}; + + use super::*; + + fn check(actual: impl Debug, expect: Expect) { + let actual = format!("{:#?}", actual); + expect.assert_eq(&actual); + } + + #[test] + fn test_parse_opt_args() { + // Test parsing into single_node-level opts. + let raw_opts = " +--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 +--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --etcd-password 1234 +--frontend-opts=--config-path=src/config/original.toml +--prometheus-listener-addr=127.0.0.1:1234 +--config-path=src/config/test.toml +"; + let actual = SingleNodeOpts::parse_from(raw_opts.lines()); + let opts = SingleNodeOpts { + compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10".into()), + meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --etcd-password 1234".into()), + frontend_opts: Some("--config-path=src/config/original.toml".into()), + compactor_opts: None, + prometheus_listener_addr: Some("127.0.0.1:1234".into()), + config_path: Some("src/config/test.toml".into()), + }; + assert_eq!(actual, opts); + + // Test parsing into node-level opts. + let actual = parse_single_node_opt_args(&opts); + check( + actual, + expect![[r#" + ParsedSingleNodeOpts { + meta_opts: Some( + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "127.0.0.1:8001", + advertise_addr: "127.0.0.1:9999", + dashboard_host: None, + prometheus_host: Some( + "127.0.0.1:1234", + ), + etcd_endpoints: "", + etcd_auth: false, + etcd_username: "", + etcd_password: [REDACTED alloc::string::String], + sql_endpoint: None, + dashboard_ui_path: None, + prometheus_endpoint: None, + prometheus_selector: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "src/config/test.toml", + backend: None, + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + state_store: None, + data_directory: Some( + "some path with spaces", + ), + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + heap_profiling_dir: None, + }, + ), + compute_opts: Some( + ComputeNodeOpts { + listen_addr: "127.0.0.1:8000", + advertise_addr: None, + prometheus_listener_addr: "127.0.0.1:1234", + meta_address: List( + [ + http://127.0.0.1:5690/, + ], + ), + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "src/config/test.toml", + total_memory_bytes: 34359738368, + parallelism: 10, + role: Both, + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: None, + heap_profiling_dir: None, + }, + ), + frontend_opts: Some( + FrontendOpts { + listen_addr: "127.0.0.1:4566", + advertise_addr: None, + port: None, + meta_addr: List( + [ + http://127.0.0.1:5690/, + ], + ), + prometheus_listener_addr: "127.0.0.1:1234", + health_check_listener_addr: "127.0.0.1:6786", + config_path: "src/config/test.toml", + metrics_level: None, + enable_barrier_read: None, + }, + ), + compactor_opts: None, + }"#]], + ); + } +} diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index bdd662324ee01..9f943c5c9faaf 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -25,6 +25,10 @@ use tokio::signal; use crate::common::osstrs; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] +#[command( + version, + about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service" +)] pub struct StandaloneOpts { /// Compute node options /// If missing, compute node won't start From 803d90e937672b7f9a7f0de25343497ad81d976b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 1 Feb 2024 14:51:41 +0800 Subject: [PATCH 03/16] refactor away parse logic --- src/cmd_all/src/bin/risingwave.rs | 4 +- src/cmd_all/src/single_node.rs | 302 +----------------------------- 2 files changed, 11 insertions(+), 295 deletions(-) diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 48c8386a54cfa..8b99093fb23da 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -231,10 +231,10 @@ fn standalone(opts: StandaloneOpts) { } fn single_node(opts: SingleNodeOpts) { - let opts = risingwave_cmd_all::parse_single_node_opt_args(&opts); + let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) .with_target("risingwave_storage", Level::WARN) .with_thread_name(true); risingwave_rt::init_risingwave_logger(settings); - risingwave_rt::main_okk(risingwave_cmd_all::single_node(opts)).unwrap(); + risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 19098d6e75c9b..9f102ea714a10 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -23,203 +23,24 @@ use shell_words::split; use tokio::signal; use crate::common::osstrs; +use crate::ParsedStandaloneOpts; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] #[command( -version, -about = "[DEFAULT] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" + version, + about = "[DEFAULT] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" )] +/// Here we define our own defaults for the single node mode. pub struct SingleNodeOpts { - /// Compute node options - /// If missing, compute node won't start - #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")] - compute_opts: Option, - - #[clap(short, long, env = "RW_STANDALONE_META_OPTS")] - /// Meta node options - /// If missing, meta node won't start - meta_opts: Option, - - #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")] - /// Frontend node options - /// If missing, frontend node won't start - frontend_opts: Option, - - #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")] - /// Compactor node options - /// If missing compactor node won't start - compactor_opts: Option, - - #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")] - /// Prometheus listener address - /// If present, it will override prometheus listener address for - /// Frontend, Compute and Compactor nodes + #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")] prometheus_listener_addr: Option, - #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")] - /// Path to the config file - /// If present, it will override config path for - /// Frontend, Compute and Compactor nodes + #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")] config_path: Option, } -#[derive(Debug)] -pub struct ParsedSingleNodeOpts { - pub meta_opts: Option, - pub compute_opts: Option, - pub frontend_opts: Option, - pub compactor_opts: Option, -} - -impl risingwave_common::opts::Opts for ParsedSingleNodeOpts { - fn name() -> &'static str { - "single_node" - } - - fn meta_addr(&self) -> MetaAddressStrategy { - if let Some(opts) = self.meta_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.compute_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.frontend_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.compactor_opts.as_ref() { - opts.meta_addr() - } else { - unreachable!("at least one service should be specified as checked during parsing") - } - } -} - -pub fn parse_single_node_opt_args(opts: &SingleNodeOpts) -> ParsedSingleNodeOpts { - let meta_opts = opts.meta_opts.as_ref().map(|s| { - let mut s = split(s).unwrap(); - s.insert(0, "meta-node".into()); - s - }); - let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o))); - - let compute_opts = opts.compute_opts.as_ref().map(|s| { - let mut s = split(s).unwrap(); - s.insert(0, "compute-node".into()); - s - }); - let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o))); - - let frontend_opts = opts.frontend_opts.as_ref().map(|s| { - let mut s = split(s).unwrap(); - s.insert(0, "frontend-node".into()); - s - }); - let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o))); - - let compactor_opts = opts.compactor_opts.as_ref().map(|s| { - let mut s = split(s).unwrap(); - s.insert(0, "compactor-node".into()); - s - }); - let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o))); - - if let Some(config_path) = opts.config_path.as_ref() { - if let Some(meta_opts) = meta_opts.as_mut() { - meta_opts.config_path = config_path.clone(); - } - if let Some(compute_opts) = compute_opts.as_mut() { - compute_opts.config_path = config_path.clone(); - } - if let Some(frontend_opts) = frontend_opts.as_mut() { - frontend_opts.config_path = config_path.clone(); - } - if let Some(compactor_opts) = compactor_opts.as_mut() { - compactor_opts.config_path = config_path.clone(); - } - } - if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() { - if let Some(compute_opts) = compute_opts.as_mut() { - compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); - } - if let Some(frontend_opts) = frontend_opts.as_mut() { - frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); - } - if let Some(compactor_opts) = compactor_opts.as_mut() { - compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); - } - if let Some(meta_opts) = meta_opts.as_mut() { - meta_opts.prometheus_host = Some(prometheus_listener_addr.clone()); - } - } - - if meta_opts.is_none() - && compute_opts.is_none() - && frontend_opts.is_none() - && compactor_opts.is_none() - { - panic!("No service is specified to start."); - } - - ParsedSingleNodeOpts { - meta_opts, - compute_opts, - frontend_opts, - compactor_opts, - } -} - -/// For `single_node` mode, we can configure and start multiple services in one process. -/// Note that this is different from `single` mode, where we start -/// pre-defined services all-in-one process, -/// the pre-defined services are not configurable. -/// `single` mode is meant to be user-facing, where users can just use `./risingwave` -/// to start the service. -/// `single_node` mode is meant to be used by our cloud service, where we can configure -/// and start multiple services in one process. -pub async fn single_node( - ParsedSingleNodeOpts { - meta_opts, - compute_opts, - frontend_opts, - compactor_opts, - }: ParsedSingleNodeOpts, -) -> Result<()> { - tracing::info!("launching Risingwave in single_node mode"); - - if let Some(opts) = meta_opts { - tracing::info!("starting meta-node thread with cli args: {:?}", opts); - - let _meta_handle = tokio::spawn(async move { - risingwave_meta_node::start(opts).await; - tracing::warn!("meta is stopped, shutdown all nodes"); - }); - // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - if let Some(opts) = compute_opts { - tracing::info!("starting compute-node thread with cli args: {:?}", opts); - let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await }); - } - if let Some(opts) = frontend_opts { - tracing::info!("starting frontend-node thread with cli args: {:?}", opts); - let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await }); - } - if let Some(opts) = compactor_opts { - tracing::info!("starting compactor-node thread with cli args: {:?}", opts); - let _compactor_handle = - tokio::spawn(async move { risingwave_compactor::start(opts).await }); - } - - // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - eprintln!("-------------------------------"); - eprintln!("RisingWave single_node mode is ready."); - - // TODO: should we join all handles? - // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. - // TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not - // support it? - signal::ctrl_c().await.unwrap(); - tracing::info!("Ctrl+C received, now exiting"); - - Ok(()) +pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { + todo!() } #[cfg(test)] @@ -236,110 +57,5 @@ mod test { } #[test] - fn test_parse_opt_args() { - // Test parsing into single_node-level opts. - let raw_opts = " ---compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 ---meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --etcd-password 1234 ---frontend-opts=--config-path=src/config/original.toml ---prometheus-listener-addr=127.0.0.1:1234 ---config-path=src/config/test.toml -"; - let actual = SingleNodeOpts::parse_from(raw_opts.lines()); - let opts = SingleNodeOpts { - compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10".into()), - meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 --etcd-password 1234".into()), - frontend_opts: Some("--config-path=src/config/original.toml".into()), - compactor_opts: None, - prometheus_listener_addr: Some("127.0.0.1:1234".into()), - config_path: Some("src/config/test.toml".into()), - }; - assert_eq!(actual, opts); - - // Test parsing into node-level opts. - let actual = parse_single_node_opt_args(&opts); - check( - actual, - expect![[r#" - ParsedSingleNodeOpts { - meta_opts: Some( - MetaNodeOpts { - vpc_id: None, - security_group_id: None, - listen_addr: "127.0.0.1:8001", - advertise_addr: "127.0.0.1:9999", - dashboard_host: None, - prometheus_host: Some( - "127.0.0.1:1234", - ), - etcd_endpoints: "", - etcd_auth: false, - etcd_username: "", - etcd_password: [REDACTED alloc::string::String], - sql_endpoint: None, - dashboard_ui_path: None, - prometheus_endpoint: None, - prometheus_selector: None, - connector_rpc_endpoint: None, - privatelink_endpoint_default_tags: None, - config_path: "src/config/test.toml", - backend: None, - barrier_interval_ms: None, - sstable_size_mb: None, - block_size_kb: None, - bloom_false_positive: None, - state_store: None, - data_directory: Some( - "some path with spaces", - ), - do_not_config_object_storage_lifecycle: None, - backup_storage_url: None, - backup_storage_directory: None, - heap_profiling_dir: None, - }, - ), - compute_opts: Some( - ComputeNodeOpts { - listen_addr: "127.0.0.1:8000", - advertise_addr: None, - prometheus_listener_addr: "127.0.0.1:1234", - meta_address: List( - [ - http://127.0.0.1:5690/, - ], - ), - connector_rpc_endpoint: None, - connector_rpc_sink_payload_format: None, - config_path: "src/config/test.toml", - total_memory_bytes: 34359738368, - parallelism: 10, - role: Both, - metrics_level: None, - data_file_cache_dir: None, - meta_file_cache_dir: None, - async_stack_trace: None, - heap_profiling_dir: None, - }, - ), - frontend_opts: Some( - FrontendOpts { - listen_addr: "127.0.0.1:4566", - advertise_addr: None, - port: None, - meta_addr: List( - [ - http://127.0.0.1:5690/, - ], - ), - prometheus_listener_addr: "127.0.0.1:1234", - health_check_listener_addr: "127.0.0.1:6786", - config_path: "src/config/test.toml", - metrics_level: None, - enable_barrier_read: None, - }, - ), - compactor_opts: None, - }"#]], - ); - } + fn test_parse_opt_args() {} } From fab3887a51d38c69b10d9dd1f798725047b76bc4 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 1 Feb 2024 15:55:41 +0800 Subject: [PATCH 04/16] interim commit add data directory --- src/cmd_all/src/single_node.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 9f102ea714a10..5747a594e3599 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -37,6 +37,9 @@ pub struct SingleNodeOpts { #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")] config_path: Option, + + #[clap(long, env = "RW_SINGLE_NODE_META_ADDR", default_value = "")] + data_directory: Option, } pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { From 6250c197d5ec4a683cc7e0dcc78884cd3f890f2a Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 10:36:31 +0800 Subject: [PATCH 05/16] add standalone defaults --- src/cmd_all/src/single_node.rs | 65 +++++++++++++++++++++++++++++++- src/compute/src/lib.rs | 23 +++++++++++ src/frontend/src/lib.rs | 16 ++++++++ src/meta/node/src/lib.rs | 41 +++++++++++++++++++- src/storage/compactor/src/lib.rs | 19 ++++++++++ 5 files changed, 160 insertions(+), 4 deletions(-) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 5747a594e3599..e80c3183e6819 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; + use anyhow::Result; use clap::Parser; use risingwave_common::util::meta_addr::MetaAddressStrategy; @@ -38,12 +40,71 @@ pub struct SingleNodeOpts { #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")] config_path: Option, - #[clap(long, env = "RW_SINGLE_NODE_META_ADDR", default_value = "")] + /// The data directory used by meta store and object store. + #[clap(long, env = "RW_SINGLE_NODE_DATA_DIRECTORY", default_value = "")] data_directory: Option, + + /// The address of the meta node. + #[clap(long, env = "RW_SINGLE_NODE_META_ADDR", default_value = "")] + meta_addr: Option, + + /// The address of the compute node + #[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR", default_value = "")] + compute_addr: Option, + + /// The address of the frontend node + #[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR", default_value = "")] + frontend_addr: Option, + + /// The address of the compactor node + #[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR", default_value = "")] + compactor_addr: Option, } pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { - todo!() + let mut meta_opts = MetaNodeOpts::new_for_single_node(); + let mut compute_opts = ComputeNodeOpts::new_for_single_node(); + let mut frontend_opts = FrontendOpts::new_for_single_node(); + let mut compactor_opts = CompactorOpts::new_for_single_node(); + if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr { + meta_opts.prometheus_host = Some(prometheus_listener_addr.clone()); + compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(config_path) = &opts.config_path { + meta_opts.config_path = config_path.clone(); + compute_opts.config_path = config_path.clone(); + frontend_opts.config_path = config_path.clone(); + compactor_opts.config_path = config_path.clone(); + } + // TODO(kwannoel): Also update state store URL + if let Some(data_directory) = &opts.data_directory { + meta_opts.data_directory = Some(data_directory.clone()); + } + if let Some(meta_addr) = &opts.meta_addr { + meta_opts.listen_addr = meta_addr.clone(); + meta_opts.advertise_addr = meta_addr.clone(); + + compute_opts.meta_address = meta_addr.parse().unwrap(); + frontend_opts.meta_addr = meta_addr.parse().unwrap(); + compactor_opts.meta_address = meta_addr.parse().unwrap(); + } + if let Some(compute_addr) = &opts.compute_addr { + compute_opts.listen_addr = compute_addr.clone(); + } + if let Some(frontend_addr) = &opts.frontend_addr { + frontend_opts.listen_addr = frontend_addr.clone(); + } + if let Some(compactor_addr) = &opts.compactor_addr { + compactor_opts.listen_addr = compactor_addr.clone(); + } + ParsedStandaloneOpts { + meta_opts: Some(meta_opts), + compute_opts: Some(compute_opts), + frontend_opts: Some(frontend_opts), + compactor_opts: Some(compactor_opts), + } } #[cfg(test)] diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index b713aa9951622..183d8ff5e99cd 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -39,6 +39,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use serde::{Deserialize, Serialize}; +use tonic::IntoStreamingRequest; /// If `total_memory_bytes` is not specified, the default memory limit will be set to /// the system memory limit multiplied by this proportion @@ -133,6 +134,28 @@ pub struct ComputeNodeOpts { pub heap_profiling_dir: Option, } +impl ComputeNodeOpts { + pub fn new_for_single_node() -> Self { + Self { + listen_addr: "".to_string(), + advertise_addr: Default::default(), + prometheus_listener_addr: Default::default(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "".to_string(), + total_memory_bytes: default_total_memory_bytes(), + parallelism: default_parallelism(), + role: Default::default(), + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), + heap_profiling_dir: None, + } + } +} + impl risingwave_common::opts::Opts for ComputeNodeOpts { fn name() -> &'static str { "compute" diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 805221327f016..1586212c5a7cc 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -143,6 +143,22 @@ pub struct FrontendOpts { pub enable_barrier_read: Option, } +impl FrontendOpts { + pub fn new_for_single_node() -> Self { + Self { + listen_addr: "0.0.0.0:4566".to_string(), + advertise_addr: Some("0.0.0.0:4566".to_string()), + port: None, + meta_addr: "http://0.0.0.0:5690".parse().unwrap(), + prometheus_listener_addr: "".to_string(), + health_check_listener_addr: "0.0.0.0:6786".to_string(), + config_path: "".to_string(), + metrics_level: None, + enable_barrier_read: None, + } + } +} + impl risingwave_common::opts::Opts for FrontendOpts { fn name() -> &'static str { "frontend" diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 1fe65aaaf1633..3eff07fbe81e0 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -54,7 +54,7 @@ pub struct MetaNodeOpts { /// It will serve as a unique identifier in cluster /// membership and leader election. Must be specified for etcd backend. #[clap(long, env = "RW_ADVERTISE_ADDR")] - advertise_addr: String, + pub advertise_addr: String, #[clap(long, env = "RW_DASHBOARD_HOST")] dashboard_host: Option, @@ -147,7 +147,7 @@ pub struct MetaNodeOpts { /// Remote directory for storing data and metadata objects. #[clap(long, env = "RW_DATA_DIRECTORY")] #[override_opts(path = system.data_directory)] - data_directory: Option, + pub data_directory: Option, /// Whether config object storage bucket lifecycle to purge stale data. #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] @@ -170,6 +170,43 @@ pub struct MetaNodeOpts { pub heap_profiling_dir: Option, } +impl MetaNodeOpts { + pub fn new_for_single_node() -> Self { + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "0.0.0.0:5690".to_string(), + advertise_addr: "0.0.0.0:5690".to_string(), + dashboard_host: Some("0.0.0.0:5691".to_string()), + prometheus_host: None, + etcd_endpoints: Default::default(), + etcd_auth: false, + etcd_username: Default::default(), + etcd_password: Default::default(), + sql_endpoint: None, + dashboard_ui_path: None, + prometheus_endpoint: None, + prometheus_selector: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "".to_string(), + backend: None, + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + // TODO: Update the state store + state_store: None, + // TODO: Update the data directory + data_directory: None, + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + heap_profiling_dir: None, + } + } +} + impl risingwave_common::opts::Opts for MetaNodeOpts { fn name() -> &'static str { "meta" diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 2204ffe0af7ef..e274440ba2072 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -92,6 +92,25 @@ pub struct CompactorOpts { pub proxy_rpc_endpoint: String, } +impl CompactorOpts { + pub fn new_for_single_node() -> Self { + Self { + listen_addr: "0.0.0.0:6660".to_string(), + advertise_addr: Some("0.0.0.0:6660".to_string()), + port: None, + prometheus_listener_addr: "".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + compaction_worker_threads_number: None, + config_path: "".to_string(), + metrics_level: None, + async_stack_trace: None, + heap_profiling_dir: None, + compactor_mode: None, + proxy_rpc_endpoint: "".to_string(), + } + } +} + impl risingwave_common::opts::Opts for CompactorOpts { fn name() -> &'static str { "compactor" From cb500b43bbaa6781c154baafae9e76ed935b9503 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 11:27:55 +0800 Subject: [PATCH 06/16] add sql endpoint defaults --- Cargo.lock | 2 ++ src/cmd_all/Cargo.toml | 1 + src/cmd_all/src/common.rs | 2 ++ src/common/Cargo.toml | 1 + src/common/src/lib.rs | 1 + src/common/src/single_process_config.rs | 29 +++++++++++++++++++++++++ src/meta/node/src/lib.rs | 6 ++++- 7 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 src/common/src/single_process_config.rs diff --git a/Cargo.lock b/Cargo.lock index ac02d4713c73e..c1df2408bfcda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8686,6 +8686,7 @@ dependencies = [ "console", "const-str", "expect-test", + "home", "madsim-tokio", "prometheus", "risingwave_cmd", @@ -8748,6 +8749,7 @@ dependencies = [ "futures", "governor", "hex", + "home", "http 0.2.9", "http-body", "humantime", diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index f5a08e6c4b688..bb57fbfe88a09 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -23,6 +23,7 @@ anyhow = "1" clap = { version = "4", features = ["cargo", "derive"] } console = "0.15" const-str = "0.5" +home = "0.5" prometheus = { version = "0.13" } risingwave_cmd = { workspace = true } risingwave_common = { workspace = true } diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 68e9090578ab0..4347dde1563f1 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::env::home_dir; use std::ffi::OsString; +use std::sync::LazyLock; pub fn osstrs + AsRef>(s: impl AsRef<[T]>) -> Vec { s.as_ref().iter().map(OsString::from).collect() diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 30ecaa87a8d1a..2bbb91a17c233 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -49,6 +49,7 @@ fs-err = "2" futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } hex = "0.4.3" +home = "0.5" http = "0.2" humantime = "2.1" hyper = "0.14" diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 20428599b1039..ed7696131a437 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -74,6 +74,7 @@ pub mod opts; pub mod range; pub mod row; pub mod session_config; +pub mod single_process_config; pub mod system_param; pub mod telemetry; pub mod test_utils; diff --git a/src/common/src/single_process_config.rs b/src/common/src/single_process_config.rs new file mode 100644 index 0000000000000..f51093cdadbd0 --- /dev/null +++ b/src/common/src/single_process_config.rs @@ -0,0 +1,29 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module defines default configurations for single node mode. + +use std::sync::LazyLock; + +use home::home_dir; + +pub static DEFAULT_DATA_DIRECTORY: LazyLock = LazyLock::new(|| { + let mut home_path = home_dir().unwrap(); + home_path.push(".risingwave"); + let home_path = home_path.to_str().unwrap(); + home_path.to_string() +}); + +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = + LazyLock::new(|| format!("{}/single_node.db", DEFAULT_DATA_DIRECTORY.clone())); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 3eff07fbe81e0..b311c9ca987ca 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -24,6 +24,7 @@ use clap::Parser; pub use error::{MetaError, MetaResult}; use redact::Secret; use risingwave_common::config::OverrideConfig; +use risingwave_common::single_process_config::DEFAULT_SINGLE_NODE_SQLITE_PATH; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; use risingwave_common::{GIT_SHA, RW_VERSION}; @@ -183,7 +184,10 @@ impl MetaNodeOpts { etcd_auth: false, etcd_username: Default::default(), etcd_password: Default::default(), - sql_endpoint: None, + sql_endpoint: Some(format!( + "sqlite://{}?mode=rwc", + *DEFAULT_SINGLE_NODE_SQLITE_PATH, + )), dashboard_ui_path: None, prometheus_endpoint: None, prometheus_selector: None, From b952d8d7d117796c33089d7cee6f5b320c05eb42 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 13:59:51 +0800 Subject: [PATCH 07/16] register state store, meta store --- src/common/src/single_process_config.rs | 21 +++++++++++++++++++-- src/meta/node/src/lib.rs | 13 ++++++------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/common/src/single_process_config.rs b/src/common/src/single_process_config.rs index f51093cdadbd0..b5c486e92a36f 100644 --- a/src/common/src/single_process_config.rs +++ b/src/common/src/single_process_config.rs @@ -25,5 +25,22 @@ pub static DEFAULT_DATA_DIRECTORY: LazyLock = LazyLock::new(|| { home_path.to_string() }); -pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = - LazyLock::new(|| format!("{}/single_node.db", DEFAULT_DATA_DIRECTORY.clone())); +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = LazyLock::new(|| { + format!( + "{}/meta_store/single_node.db", + DEFAULT_DATA_DIRECTORY.clone() + ) +}); + +pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = + LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = + LazyLock::new(|| format!("{}/state_store", DEFAULT_DATA_DIRECTORY.clone())); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { + format!( + "hummock+fs://{}", + DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() + ) +}); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index b311c9ca987ca..32cfe52b7d96a 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -24,7 +24,9 @@ use clap::Parser; pub use error::{MetaError, MetaResult}; use redact::Secret; use risingwave_common::config::OverrideConfig; -use risingwave_common::single_process_config::DEFAULT_SINGLE_NODE_SQLITE_PATH; +use risingwave_common::single_process_config::{ + DEFAULT_SINGLE_NODE_SQL_ENDPOINT, DEFAULT_SINGLE_NODE_STATE_STORE_URL, +}; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; use risingwave_common::{GIT_SHA, RW_VERSION}; @@ -184,10 +186,7 @@ impl MetaNodeOpts { etcd_auth: false, etcd_username: Default::default(), etcd_password: Default::default(), - sql_endpoint: Some(format!( - "sqlite://{}?mode=rwc", - *DEFAULT_SINGLE_NODE_SQLITE_PATH, - )), + sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), dashboard_ui_path: None, prometheus_endpoint: None, prometheus_selector: None, @@ -200,9 +199,9 @@ impl MetaNodeOpts { block_size_kb: None, bloom_false_positive: None, // TODO: Update the state store - state_store: None, + state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), // TODO: Update the data directory - data_directory: None, + data_directory: Some("hummock_001".to_string()), do_not_config_object_storage_lifecycle: None, backup_storage_url: None, backup_storage_directory: None, From f8d4af7eb20fa7b860c41077951b0131be058bf4 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 14:34:21 +0800 Subject: [PATCH 08/16] fix several bugs for unsupplied parameters --- src/cmd_all/src/single_node.rs | 10 +++++----- src/compute/src/lib.rs | 6 +++--- src/frontend/src/lib.rs | 2 +- src/meta/node/src/lib.rs | 2 +- src/storage/compactor/src/lib.rs | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index e80c3183e6819..104b88db5a3f7 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -41,23 +41,23 @@ pub struct SingleNodeOpts { config_path: Option, /// The data directory used by meta store and object store. - #[clap(long, env = "RW_SINGLE_NODE_DATA_DIRECTORY", default_value = "")] + #[clap(long, env = "RW_SINGLE_NODE_DATA_DIRECTORY")] data_directory: Option, /// The address of the meta node. - #[clap(long, env = "RW_SINGLE_NODE_META_ADDR", default_value = "")] + #[clap(long, env = "RW_SINGLE_NODE_META_ADDR")] meta_addr: Option, /// The address of the compute node - #[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR", default_value = "")] + #[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")] compute_addr: Option, /// The address of the frontend node - #[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR", default_value = "")] + #[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")] frontend_addr: Option, /// The address of the compactor node - #[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR", default_value = "")] + #[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")] compactor_addr: Option, } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 183d8ff5e99cd..41c18bdf13355 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -137,9 +137,9 @@ pub struct ComputeNodeOpts { impl ComputeNodeOpts { pub fn new_for_single_node() -> Self { Self { - listen_addr: "".to_string(), - advertise_addr: Default::default(), - prometheus_listener_addr: Default::default(), + listen_addr: "0.0.0.0:5688".to_string(), + advertise_addr: Some("0.0.0.0:5688".to_string()), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), meta_address: "http://0.0.0.0:5690".parse().unwrap(), connector_rpc_endpoint: None, connector_rpc_sink_payload_format: None, diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 1586212c5a7cc..d59a0071d6916 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -150,7 +150,7 @@ impl FrontendOpts { advertise_addr: Some("0.0.0.0:4566".to_string()), port: None, meta_addr: "http://0.0.0.0:5690".parse().unwrap(), - prometheus_listener_addr: "".to_string(), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), health_check_listener_addr: "0.0.0.0:6786".to_string(), config_path: "".to_string(), metrics_level: None, diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 32cfe52b7d96a..2b193170bbebe 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -181,7 +181,7 @@ impl MetaNodeOpts { listen_addr: "0.0.0.0:5690".to_string(), advertise_addr: "0.0.0.0:5690".to_string(), dashboard_host: Some("0.0.0.0:5691".to_string()), - prometheus_host: None, + prometheus_host: Some("0.0.0.0:1250".to_string()), etcd_endpoints: Default::default(), etcd_auth: false, etcd_username: Default::default(), diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index e274440ba2072..f212261d9dd48 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -98,7 +98,7 @@ impl CompactorOpts { listen_addr: "0.0.0.0:6660".to_string(), advertise_addr: Some("0.0.0.0:6660".to_string()), port: None, - prometheus_listener_addr: "".to_string(), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), meta_address: "http://0.0.0.0:5690".parse().unwrap(), compaction_worker_threads_number: None, config_path: "".to_string(), From d84d0761de7be8523f4109c8586ce8d22cec871d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 14:45:39 +0800 Subject: [PATCH 09/16] fix conflict change + add --backend option --- src/cmd_all/src/single_node.rs | 2 +- src/meta/node/src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 104b88db5a3f7..4f3bb7e5f695a 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -67,7 +67,7 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS let mut frontend_opts = FrontendOpts::new_for_single_node(); let mut compactor_opts = CompactorOpts::new_for_single_node(); if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr { - meta_opts.prometheus_host = Some(prometheus_listener_addr.clone()); + meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone()); compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 2b193170bbebe..47b1a022f09fa 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -181,7 +181,7 @@ impl MetaNodeOpts { listen_addr: "0.0.0.0:5690".to_string(), advertise_addr: "0.0.0.0:5690".to_string(), dashboard_host: Some("0.0.0.0:5691".to_string()), - prometheus_host: Some("0.0.0.0:1250".to_string()), + prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), etcd_endpoints: Default::default(), etcd_auth: false, etcd_username: Default::default(), @@ -193,7 +193,7 @@ impl MetaNodeOpts { connector_rpc_endpoint: None, privatelink_endpoint_default_tags: None, config_path: "".to_string(), - backend: None, + backend: Some(MetaBackend::Sql), barrier_interval_ms: None, sstable_size_mb: None, block_size_kb: None, From 4f77db02f90dc070a523ce72ae7417fe09962f22 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 15:01:53 +0800 Subject: [PATCH 10/16] rm empty --- src/cmd_all/src/single.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/cmd_all/src/single.rs diff --git a/src/cmd_all/src/single.rs b/src/cmd_all/src/single.rs deleted file mode 100644 index e69de29bb2d1d..0000000000000 From a57cf07362cf5aa32a613efe3393a409fee8d925 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 15:30:54 +0800 Subject: [PATCH 11/16] allow config store_directory --- src/cmd_all/src/single_node.rs | 22 ++++++++++++++++------ src/common/src/single_process_config.rs | 23 +++++++++++++++-------- src/meta/node/src/lib.rs | 6 ++---- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 4f3bb7e5f695a..0dffaeec877ed 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -16,6 +16,9 @@ use std::str::FromStr; use anyhow::Result; use clap::Parser; +use risingwave_common::single_process_config::{ + make_single_node_sql_endpoint, make_single_node_state_store_url, +}; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; @@ -30,19 +33,23 @@ use crate::ParsedStandaloneOpts; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] #[command( version, - about = "[DEFAULT] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" + about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified" )] /// Here we define our own defaults for the single node mode. pub struct SingleNodeOpts { + /// The prometheus address used by the single-node cluster. + /// If you have a prometheus instance, + /// it will poll the metrics from this address. #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")] prometheus_listener_addr: Option, + /// The path to the cluster configuration file. #[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")] config_path: Option, - /// The data directory used by meta store and object store. - #[clap(long, env = "RW_SINGLE_NODE_DATA_DIRECTORY")] - data_directory: Option, + /// The store directory used by meta store and object store. + #[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")] + store_directory: Option, /// The address of the meta node. #[clap(long, env = "RW_SINGLE_NODE_META_ADDR")] @@ -79,8 +86,11 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS compactor_opts.config_path = config_path.clone(); } // TODO(kwannoel): Also update state store URL - if let Some(data_directory) = &opts.data_directory { - meta_opts.data_directory = Some(data_directory.clone()); + if let Some(store_directory) = &opts.store_directory { + let state_store_url = make_single_node_state_store_url(store_directory); + let meta_store_endpoint = make_single_node_sql_endpoint(store_directory); + meta_opts.state_store = Some(state_store_url); + meta_opts.sql_endpoint = Some(meta_store_endpoint); } if let Some(meta_addr) = &opts.meta_addr { meta_opts.listen_addr = meta_addr.clone(); diff --git a/src/common/src/single_process_config.rs b/src/common/src/single_process_config.rs index b5c486e92a36f..4c272b6742e04 100644 --- a/src/common/src/single_process_config.rs +++ b/src/common/src/single_process_config.rs @@ -18,25 +18,28 @@ use std::sync::LazyLock; use home::home_dir; -pub static DEFAULT_DATA_DIRECTORY: LazyLock = LazyLock::new(|| { +pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { let mut home_path = home_dir().unwrap(); home_path.push(".risingwave"); let home_path = home_path.to_str().unwrap(); home_path.to_string() }); -pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = LazyLock::new(|| { - format!( - "{}/meta_store/single_node.db", - DEFAULT_DATA_DIRECTORY.clone() - ) -}); +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = + LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); +pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { + format!( + "sqlite://{}/meta_store/single_node.db?mode=rwc", + store_directory + ) +} + pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = - LazyLock::new(|| format!("{}/state_store", DEFAULT_DATA_DIRECTORY.clone())); + LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { format!( @@ -44,3 +47,7 @@ pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() ) }); + +pub fn make_single_node_state_store_url(store_directory: &String) -> String { + format!("hummock+fs://{}/state_store", store_directory) +} diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 47b1a022f09fa..3c50aa0df8b89 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -84,7 +84,7 @@ pub struct MetaNodeOpts { /// Endpoint of the SQL service, make it non-option when SQL service is required. #[clap(long, env = "RW_SQL_ENDPOINT")] - sql_endpoint: Option, + pub sql_endpoint: Option, #[clap(long, env = "RW_DASHBOARD_UI_PATH")] dashboard_ui_path: Option, @@ -145,7 +145,7 @@ pub struct MetaNodeOpts { /// State store url #[clap(long, env = "RW_STATE_STORE")] #[override_opts(path = system.state_store)] - state_store: Option, + pub state_store: Option, /// Remote directory for storing data and metadata objects. #[clap(long, env = "RW_DATA_DIRECTORY")] @@ -198,9 +198,7 @@ impl MetaNodeOpts { sstable_size_mb: None, block_size_kb: None, bloom_false_positive: None, - // TODO: Update the state store state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), - // TODO: Update the data directory data_directory: Some("hummock_001".to_string()), do_not_config_object_storage_lifecycle: None, backup_storage_url: None, From 8b3831939fbd5c22f1c8cb9b5546928676c3b36e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 16:18:32 +0800 Subject: [PATCH 12/16] move single node defaults configs into its module --- src/cmd_all/src/common.rs | 2 - src/cmd_all/src/single_node.rs | 148 +++++++++++++++++++----- src/common/src/lib.rs | 2 - src/common/src/single_process_config.rs | 53 --------- src/compute/src/lib.rs | 29 +---- src/frontend/src/lib.rs | 16 --- src/meta/node/src/lib.rs | 74 +++--------- src/storage/compactor/src/lib.rs | 19 --- 8 files changed, 143 insertions(+), 200 deletions(-) delete mode 100644 src/common/src/single_process_config.rs diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 4347dde1563f1..68e9090578ab0 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::env::home_dir; use std::ffi::OsString; -use std::sync::LazyLock; pub fn osstrs + AsRef>(s: impl AsRef<[T]>) -> Vec { s.as_ref().iter().map(OsString::from).collect() diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 0dffaeec877ed..19f79128ded22 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -12,24 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; +use std::sync::LazyLock; -use anyhow::Result; use clap::Parser; -use risingwave_common::single_process_config::{ - make_single_node_sql_endpoint, make_single_node_state_store_url, -}; -use risingwave_common::util::meta_addr::MetaAddressStrategy; +use home::home_dir; +use risingwave_common::config::{AsyncStackTraceOption, MetaBackend}; use risingwave_compactor::CompactorOpts; -use risingwave_compute::ComputeNodeOpts; +use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts}; use risingwave_frontend::FrontendOpts; use risingwave_meta_node::MetaNodeOpts; -use shell_words::split; -use tokio::signal; -use crate::common::osstrs; use crate::ParsedStandaloneOpts; +pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { + let mut home_path = home_dir().unwrap(); + home_path.push(".risingwave"); + let home_path = home_path.to_str().unwrap(); + home_path.to_string() +}); + +pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = + LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); + +pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = + LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = + LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); + +pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { + format!( + "hummock+fs://{}", + DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() + ) +}); + #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] #[command( version, @@ -68,11 +85,22 @@ pub struct SingleNodeOpts { compactor_addr: Option, } +pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { + format!( + "sqlite://{}/meta_store/single_node.db?mode=rwc", + store_directory + ) +} + +pub fn make_single_node_state_store_url(store_directory: &String) -> String { + format!("hummock+fs://{}/state_store", store_directory) +} + pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { - let mut meta_opts = MetaNodeOpts::new_for_single_node(); - let mut compute_opts = ComputeNodeOpts::new_for_single_node(); - let mut frontend_opts = FrontendOpts::new_for_single_node(); - let mut compactor_opts = CompactorOpts::new_for_single_node(); + let mut meta_opts = SingleNodeOpts::default_meta_opts(); + let mut compute_opts = SingleNodeOpts::default_compute_opts(); + let mut frontend_opts = SingleNodeOpts::default_frontend_opts(); + let mut compactor_opts = SingleNodeOpts::default_compactor_opts(); if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr { meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone()); compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); @@ -85,7 +113,6 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS frontend_opts.config_path = config_path.clone(); compactor_opts.config_path = config_path.clone(); } - // TODO(kwannoel): Also update state store URL if let Some(store_directory) = &opts.store_directory { let state_store_url = make_single_node_state_store_url(store_directory); let meta_store_endpoint = make_single_node_sql_endpoint(store_directory); @@ -117,19 +144,88 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS } } -#[cfg(test)] -mod test { - use std::fmt::Debug; - - use expect_test::{expect, Expect}; +impl SingleNodeOpts { + fn default_frontend_opts() -> FrontendOpts { + FrontendOpts { + listen_addr: "0.0.0.0:4566".to_string(), + advertise_addr: Some("0.0.0.0:4566".to_string()), + port: None, + meta_addr: "http://0.0.0.0:5690".parse().unwrap(), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + health_check_listener_addr: "0.0.0.0:6786".to_string(), + config_path: "".to_string(), + metrics_level: None, + enable_barrier_read: None, + } + } - use super::*; + fn default_meta_opts() -> MetaNodeOpts { + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "0.0.0.0:5690".to_string(), + advertise_addr: "0.0.0.0:5690".to_string(), + dashboard_host: Some("0.0.0.0:5691".to_string()), + prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), + etcd_endpoints: Default::default(), + etcd_auth: false, + etcd_username: Default::default(), + etcd_password: Default::default(), + sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), + dashboard_ui_path: None, + prometheus_endpoint: None, + prometheus_selector: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "".to_string(), + backend: Some(MetaBackend::Sql), + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), + data_directory: Some("hummock_001".to_string()), + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + heap_profiling_dir: None, + } + } - fn check(actual: impl Debug, expect: Expect) { - let actual = format!("{:#?}", actual); - expect.assert_eq(&actual); + pub fn default_compute_opts() -> ComputeNodeOpts { + ComputeNodeOpts { + listen_addr: "0.0.0.0:5688".to_string(), + advertise_addr: Some("0.0.0.0:5688".to_string()), + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "".to_string(), + total_memory_bytes: default_total_memory_bytes(), + parallelism: default_parallelism(), + role: Default::default(), + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), + heap_profiling_dir: None, + } } - #[test] - fn test_parse_opt_args() {} + fn default_compactor_opts() -> CompactorOpts { + CompactorOpts { + listen_addr: "0.0.0.0:6660".to_string(), + advertise_addr: Some("0.0.0.0:6660".to_string()), + port: None, + prometheus_listener_addr: "0.0.0.0:1250".to_string(), + meta_address: "http://0.0.0.0:5690".parse().unwrap(), + compaction_worker_threads_number: None, + config_path: "".to_string(), + metrics_level: None, + async_stack_trace: None, + heap_profiling_dir: None, + compactor_mode: None, + proxy_rpc_endpoint: "".to_string(), + } + } } diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index ed7696131a437..980897d5636e7 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -74,14 +74,12 @@ pub mod opts; pub mod range; pub mod row; pub mod session_config; -pub mod single_process_config; pub mod system_param; pub mod telemetry; pub mod test_utils; pub mod transaction; pub mod types; pub mod vnode_mapping; - pub mod test_prelude { pub use super::array::{DataChunkTestExt, StreamChunkTestExt}; pub use super::catalog::test_utils::ColumnDescTestExt; diff --git a/src/common/src/single_process_config.rs b/src/common/src/single_process_config.rs deleted file mode 100644 index 4c272b6742e04..0000000000000 --- a/src/common/src/single_process_config.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! This module defines default configurations for single node mode. - -use std::sync::LazyLock; - -use home::home_dir; - -pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { - let mut home_path = home_dir().unwrap(); - home_path.push(".risingwave"); - let home_path = home_path.to_str().unwrap(); - home_path.to_string() -}); - -pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock = - LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY)); - -pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock = - LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH)); - -pub fn make_single_node_sql_endpoint(store_directory: &String) -> String { - format!( - "sqlite://{}/meta_store/single_node.db?mode=rwc", - store_directory - ) -} - -pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock = - LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone())); - -pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new(|| { - format!( - "hummock+fs://{}", - DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone() - ) -}); - -pub fn make_single_node_state_store_url(store_directory: &String) -> String { - format!("hummock+fs://{}/state_store", store_directory) -} diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 41c18bdf13355..3673997d2a128 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -39,7 +39,6 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use serde::{Deserialize, Serialize}; -use tonic::IntoStreamingRequest; /// If `total_memory_bytes` is not specified, the default memory limit will be set to /// the system memory limit multiplied by this proportion @@ -134,28 +133,6 @@ pub struct ComputeNodeOpts { pub heap_profiling_dir: Option, } -impl ComputeNodeOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:5688".to_string(), - advertise_addr: Some("0.0.0.0:5688".to_string()), - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - meta_address: "http://0.0.0.0:5690".parse().unwrap(), - connector_rpc_endpoint: None, - connector_rpc_sink_payload_format: None, - config_path: "".to_string(), - total_memory_bytes: default_total_memory_bytes(), - parallelism: default_parallelism(), - role: Default::default(), - metrics_level: None, - data_file_cache_dir: None, - meta_file_cache_dir: None, - async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose), - heap_profiling_dir: None, - } - } -} - impl risingwave_common::opts::Opts for ComputeNodeOpts { fn name() -> &'static str { "compute" @@ -248,14 +225,14 @@ pub fn start(opts: ComputeNodeOpts) -> Pin + Send>> }) } -fn default_total_memory_bytes() -> usize { +pub fn default_total_memory_bytes() -> usize { (system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize } -fn default_parallelism() -> usize { +pub fn default_parallelism() -> usize { total_cpu_available().ceil() as usize } -fn default_role() -> Role { +pub fn default_role() -> Role { Role::Both } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index d59a0071d6916..805221327f016 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -143,22 +143,6 @@ pub struct FrontendOpts { pub enable_barrier_read: Option, } -impl FrontendOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:4566".to_string(), - advertise_addr: Some("0.0.0.0:4566".to_string()), - port: None, - meta_addr: "http://0.0.0.0:5690".parse().unwrap(), - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - health_check_listener_addr: "0.0.0.0:6786".to_string(), - config_path: "".to_string(), - metrics_level: None, - enable_barrier_read: None, - } - } -} - impl risingwave_common::opts::Opts for FrontendOpts { fn name() -> &'static str { "frontend" diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 3c50aa0df8b89..2e770fb841ada 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -24,9 +24,6 @@ use clap::Parser; pub use error::{MetaError, MetaResult}; use redact::Secret; use risingwave_common::config::OverrideConfig; -use risingwave_common::single_process_config::{ - DEFAULT_SINGLE_NODE_SQL_ENDPOINT, DEFAULT_SINGLE_NODE_STATE_STORE_URL, -}; use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util; use risingwave_common::{GIT_SHA, RW_VERSION}; @@ -42,10 +39,10 @@ use crate::manager::MetaOpts; #[command(version, about = "The central metadata management service")] pub struct MetaNodeOpts { #[clap(long, env = "RW_VPC_ID")] - vpc_id: Option, + pub vpc_id: Option, #[clap(long, env = "RW_VPC_SECURITY_GROUP_ID")] - security_group_id: Option, + pub security_group_id: Option, // TODO: use `SocketAddr` #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:5690")] @@ -60,7 +57,7 @@ pub struct MetaNodeOpts { pub advertise_addr: String, #[clap(long, env = "RW_DASHBOARD_HOST")] - dashboard_host: Option, + pub dashboard_host: Option, /// We will start a http server at this address via `MetricsManager`. /// Then the prometheus instance will poll the metrics from this address. @@ -68,38 +65,38 @@ pub struct MetaNodeOpts { pub prometheus_listener_addr: Option, #[clap(long, env = "RW_ETCD_ENDPOINTS", default_value_t = String::from(""))] - etcd_endpoints: String, + pub etcd_endpoints: String, /// Enable authentication with etcd. By default disabled. #[clap(long, env = "RW_ETCD_AUTH")] - etcd_auth: bool, + pub etcd_auth: bool, /// Username of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_USERNAME", default_value = "")] - etcd_username: String, + pub etcd_username: String, /// Password of etcd, required when --etcd-auth is enabled. #[clap(long, env = "RW_ETCD_PASSWORD", default_value = "")] - etcd_password: Secret, + pub etcd_password: Secret, /// Endpoint of the SQL service, make it non-option when SQL service is required. #[clap(long, env = "RW_SQL_ENDPOINT")] pub sql_endpoint: Option, #[clap(long, env = "RW_DASHBOARD_UI_PATH")] - dashboard_ui_path: Option, + pub dashboard_ui_path: Option, /// The HTTP REST-API address of the Prometheus instance associated to this cluster. /// This address is used to serve PromQL queries to Prometheus. /// It is also used by Grafana Dashboard Service to fetch metrics and visualize them. #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")] - prometheus_endpoint: Option, + pub prometheus_endpoint: Option, /// The additional selector used when querying Prometheus. /// /// The format is same as PromQL. Example: `instance="foo",namespace="bar"` #[clap(long, env = "RW_PROMETHEUS_SELECTOR")] - prometheus_selector: Option, + pub prometheus_selector: Option, /// Endpoint of the connector node, there will be a sidecar connector node /// colocated with Meta node in the cloud environment @@ -120,27 +117,27 @@ pub struct MetaNodeOpts { #[clap(long, env = "RW_BACKEND", value_enum)] #[override_opts(path = meta.backend)] - backend: Option, + pub backend: Option, /// The interval of periodic barrier. #[clap(long, env = "RW_BARRIER_INTERVAL_MS")] #[override_opts(path = system.barrier_interval_ms)] - barrier_interval_ms: Option, + pub barrier_interval_ms: Option, /// Target size of the Sstable. #[clap(long, env = "RW_SSTABLE_SIZE_MB")] #[override_opts(path = system.sstable_size_mb)] - sstable_size_mb: Option, + pub sstable_size_mb: Option, /// Size of each block in bytes in SST. #[clap(long, env = "RW_BLOCK_SIZE_KB")] #[override_opts(path = system.block_size_kb)] - block_size_kb: Option, + pub block_size_kb: Option, /// False positive probability of bloom filter. #[clap(long, env = "RW_BLOOM_FALSE_POSITIVE")] #[override_opts(path = system.bloom_false_positive)] - bloom_false_positive: Option, + pub bloom_false_positive: Option, /// State store url #[clap(long, env = "RW_STATE_STORE")] @@ -155,17 +152,17 @@ pub struct MetaNodeOpts { /// Whether config object storage bucket lifecycle to purge stale data. #[clap(long, env = "RW_DO_NOT_CONFIG_BUCKET_LIFECYCLE")] #[override_opts(path = meta.do_not_config_object_storage_lifecycle)] - do_not_config_object_storage_lifecycle: Option, + pub do_not_config_object_storage_lifecycle: Option, /// Remote storage url for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_URL")] #[override_opts(path = system.backup_storage_url)] - backup_storage_url: Option, + pub backup_storage_url: Option, /// Remote directory for storing snapshots. #[clap(long, env = "RW_BACKUP_STORAGE_DIRECTORY")] #[override_opts(path = system.backup_storage_directory)] - backup_storage_directory: Option, + pub backup_storage_directory: Option, /// Enable heap profile dump when memory usage is high. #[clap(long, env = "RW_HEAP_PROFILING_DIR")] @@ -173,41 +170,6 @@ pub struct MetaNodeOpts { pub heap_profiling_dir: Option, } -impl MetaNodeOpts { - pub fn new_for_single_node() -> Self { - MetaNodeOpts { - vpc_id: None, - security_group_id: None, - listen_addr: "0.0.0.0:5690".to_string(), - advertise_addr: "0.0.0.0:5690".to_string(), - dashboard_host: Some("0.0.0.0:5691".to_string()), - prometheus_listener_addr: Some("0.0.0.0:1250".to_string()), - etcd_endpoints: Default::default(), - etcd_auth: false, - etcd_username: Default::default(), - etcd_password: Default::default(), - sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()), - dashboard_ui_path: None, - prometheus_endpoint: None, - prometheus_selector: None, - connector_rpc_endpoint: None, - privatelink_endpoint_default_tags: None, - config_path: "".to_string(), - backend: Some(MetaBackend::Sql), - barrier_interval_ms: None, - sstable_size_mb: None, - block_size_kb: None, - bloom_false_positive: None, - state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()), - data_directory: Some("hummock_001".to_string()), - do_not_config_object_storage_lifecycle: None, - backup_storage_url: None, - backup_storage_directory: None, - heap_profiling_dir: None, - } - } -} - impl risingwave_common::opts::Opts for MetaNodeOpts { fn name() -> &'static str { "meta" diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index f212261d9dd48..2204ffe0af7ef 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -92,25 +92,6 @@ pub struct CompactorOpts { pub proxy_rpc_endpoint: String, } -impl CompactorOpts { - pub fn new_for_single_node() -> Self { - Self { - listen_addr: "0.0.0.0:6660".to_string(), - advertise_addr: Some("0.0.0.0:6660".to_string()), - port: None, - prometheus_listener_addr: "0.0.0.0:1250".to_string(), - meta_address: "http://0.0.0.0:5690".parse().unwrap(), - compaction_worker_threads_number: None, - config_path: "".to_string(), - metrics_level: None, - async_stack_trace: None, - heap_profiling_dir: None, - compactor_mode: None, - proxy_rpc_endpoint: "".to_string(), - } - } -} - impl risingwave_common::opts::Opts for CompactorOpts { fn name() -> &'static str { "compactor" From 4b75db3deb5480226f5601d15270a555a19a6d20 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 16:19:45 +0800 Subject: [PATCH 13/16] add some aliases --- src/cmd_all/src/bin/risingwave.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 8b99093fb23da..fb1f120990812 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -139,7 +139,7 @@ impl Component { Component::Ctl => vec!["risectl"], Component::Playground => vec!["play"], Component::Standalone => vec![], - Component::SingleNode => vec![], + Component::SingleNode => vec!["single-node", "single"], } } From 2340acbd7c49f2cb02407b4d11d542a79da53079 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 16:32:20 +0800 Subject: [PATCH 14/16] docs --- Cargo.lock | 1 - src/cmd_all/src/bin/risingwave.rs | 8 ++++++++ src/cmd_all/src/standalone.rs | 9 ++------- src/common/Cargo.toml | 1 - 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1df2408bfcda..5cd3c0ce8cb0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8749,7 +8749,6 @@ dependencies = [ "futures", "governor", "hex", - "home", "http 0.2.9", "http-body", "humantime", diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index fb1f120990812..2c167fc1bdc20 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -193,6 +193,11 @@ fn main() -> Result<()> { let matches = match command.try_get_matches() { Ok(m) => m, Err(e) if e.kind() == ErrorKind::MissingSubcommand => { + // `$ ./risingwave` + // NOTE(kwannoel): This is a hack to make `risingwave` + // work as an alias of `risingwave single-process`. + // If invocation is not a multicall and there's no subcommand, + // we will try to invoke it as a single node. let command = Component::SingleNode.augment_args(risingwave()); let matches = command.get_matches(); Component::SingleNode.start(&matches); @@ -230,6 +235,9 @@ fn standalone(opts: StandaloneOpts) { risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } +/// For single node, the internals are just a config mapping from its +/// high level options to standalone mode node-level options. +/// We will start a standalone instance, with all nodes in the same process. fn single_node(opts: SingleNodeOpts) { let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 9f943c5c9faaf..3092ad5f2be66 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -166,13 +166,8 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts } /// For `standalone` mode, we can configure and start multiple services in one process. -/// Note that this is different from `single` mode, where we start -/// pre-defined services all-in-one process, -/// the pre-defined services are not configurable. -/// `single` mode is meant to be user-facing, where users can just use `./risingwave` -/// to start the service. -/// `standalone` mode is meant to be used by our cloud service, where we can configure -/// and start multiple services in one process. +/// `standalone` mode is meant to be used by our cloud service and docker, +/// where we can configure and start multiple services in one process. pub async fn standalone( ParsedStandaloneOpts { meta_opts, diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 2bbb91a17c233..30ecaa87a8d1a 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -49,7 +49,6 @@ fs-err = "2" futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } hex = "0.4.3" -home = "0.5" http = "0.2" humantime = "2.1" hyper = "0.14" From 76f1c082ebb84d80a0cb4c3f2b36893589a39bd4 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 16:37:02 +0800 Subject: [PATCH 15/16] docs --- src/cmd_all/src/single_node.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index 19f79128ded22..b89f861f6e4fd 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -54,9 +54,7 @@ pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock = LazyLock::new )] /// Here we define our own defaults for the single node mode. pub struct SingleNodeOpts { - /// The prometheus address used by the single-node cluster. - /// If you have a prometheus instance, - /// it will poll the metrics from this address. + /// The address prometheus polls metrics from. #[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")] prometheus_listener_addr: Option, From d2aef0fc94ac8217a6d1571efacd3635bab11d48 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 2 Feb 2024 18:04:07 +0800 Subject: [PATCH 16/16] hide standalone, update docs for playground --- src/cmd_all/src/playground.rs | 2 +- src/cmd_all/src/standalone.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index 70039264f6ef7..1b03048d5d6e0 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -137,7 +137,7 @@ fn get_services(profile: &str) -> (Vec, bool) { } #[derive(Debug, Clone, Parser)] -#[command(about = "The quick way to start a RisingWave cluster for playing around")] +#[command(about = "The quick way to start an in-memory RisingWave cluster for playing around")] pub struct PlaygroundOpts { /// The profile to use. #[clap(short, long, env = "PLAYGROUND_PROFILE", default_value = "playground")] diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 3092ad5f2be66..33e61e5b41f2b 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -27,7 +27,8 @@ use crate::common::osstrs; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] #[command( version, - about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service" + about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service", + hide = true )] pub struct StandaloneOpts { /// Compute node options