diff --git a/src/cmd_all/src/common.rs b/src/cmd_all/src/common.rs index 4d9a47e7d4716..c7df4e5821360 100644 --- a/src/cmd_all/src/common.rs +++ b/src/cmd_all/src/common.rs @@ -22,6 +22,7 @@ pub enum RisingWaveService { Compute(Vec), Meta(Vec), Frontend(Vec), + #[allow(dead_code)] Compactor(Vec), ConnectorNode(Vec), } diff --git a/src/cmd_all/src/standalone.rs b/src/cmd_all/src/standalone.rs index 2a9039c53cfa1..f7c8068cf33b9 100644 --- a/src/cmd_all/src/standalone.rs +++ b/src/cmd_all/src/standalone.rs @@ -14,43 +14,112 @@ use anyhow::Result; use clap::Parser; +use risingwave_compactor::CompactorOpts; +use risingwave_compute::ComputeNodeOpts; +use risingwave_frontend::FrontendOpts; +use risingwave_meta::MetaNodeOpts; use shell_words::split; use tokio::signal; -use crate::common::{osstrs, RisingWaveService}; +use crate::common::osstrs; #[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)] pub struct StandaloneOpts { /// Compute node options - #[clap(short, long, env = "STANDALONE_COMPUTE_OPTS")] + /// If missing, compute node won't start + #[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")] compute_opts: Option, - #[clap(short, long, env = "STANDALONE_META_OPTS")] + #[clap(short, long, env = "RW_STANDALONE_META_OPTS")] /// Meta node options + /// If missing, meta node won't start meta_opts: Option, - #[clap(short, long, env = "STANDALONE_FRONTEND_OPTS")] + #[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")] /// Frontend node options + /// If missing, frontend node won't start frontend_opts: Option, - #[clap(long, env = "STANDALONE_COMPACTOR_OPTS")] - /// Frontend node options + #[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")] + /// Compactor node options + /// If missing compactor node won't start compactor_opts: Option, + + #[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")] + /// Prometheus listener address + /// If present, it will override prometheus listener address for + /// Frontend, Compute and Compactor nodes + prometheus_listener_addr: Option, + + #[clap(long, env = "RW_STANDALONE_CONFIG_PATH")] + /// Path to the config file + /// If present, it will override config path for + /// Frontend, Compute and Compactor nodes + config_path: Option, } #[derive(Debug)] pub struct ParsedStandaloneOpts { - pub meta_opts: Option>, - pub compute_opts: Option>, - pub frontend_opts: Option>, - pub compactor_opts: Option>, + pub meta_opts: Option, + pub compute_opts: Option, + pub frontend_opts: Option, + pub compactor_opts: Option, } fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts { - let meta_opts = opts.meta_opts.as_ref().map(|s| split(s).unwrap()); - let compute_opts = opts.compute_opts.as_ref().map(|s| split(s).unwrap()); - let frontend_opts = opts.frontend_opts.as_ref().map(|s| split(s).unwrap()); - let compactor_opts = opts.compactor_opts.as_ref().map(|s| split(s).unwrap()); + let meta_opts = opts.meta_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "meta-node".into()); + s + }); + let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o))); + + let compute_opts = opts.compute_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "compute-node".into()); + s + }); + let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o))); + + let frontend_opts = opts.frontend_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "frontend-node".into()); + s + }); + let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o))); + + let compactor_opts = opts.compactor_opts.as_ref().map(|s| { + let mut s = split(s).unwrap(); + s.insert(0, "compactor-node".into()); + s + }); + let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o))); + + if let Some(config_path) = opts.config_path.as_ref() { + if let Some(meta_opts) = meta_opts.as_mut() { + meta_opts.config_path = config_path.clone(); + } + if let Some(compute_opts) = compute_opts.as_mut() { + compute_opts.config_path = config_path.clone(); + } + if let Some(frontend_opts) = frontend_opts.as_mut() { + frontend_opts.config_path = config_path.clone(); + } + if let Some(compactor_opts) = compactor_opts.as_mut() { + compactor_opts.config_path = config_path.clone(); + } + } + if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() { + if let Some(compute_opts) = compute_opts.as_mut() { + compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(frontend_opts) = frontend_opts.as_mut() { + frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + if let Some(compactor_opts) = compactor_opts.as_mut() { + compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone(); + } + } ParsedStandaloneOpts { meta_opts, compute_opts, @@ -59,72 +128,38 @@ fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts { } } -fn get_services(opts: &StandaloneOpts) -> Vec { +pub async fn standalone(opts: StandaloneOpts) -> Result<()> { + tracing::info!("launching Risingwave in standalone mode"); + let ParsedStandaloneOpts { meta_opts, compute_opts, frontend_opts, compactor_opts, - } = parse_opt_args(opts); - let mut services = vec![]; - if let Some(meta_opts) = meta_opts { - services.push(RisingWaveService::Meta(osstrs(meta_opts))); - } - if let Some(compute_opts) = compute_opts { - services.push(RisingWaveService::Compute(osstrs(compute_opts))); + } = parse_opt_args(&opts); + + if let Some(opts) = meta_opts { + tracing::info!("starting meta-node thread with cli args: {:?}", opts); + + let _meta_handle = tokio::spawn(async move { + risingwave_meta::start(opts).await; + tracing::warn!("meta is stopped, shutdown all nodes"); + }); + // wait for the service to be ready + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - if let Some(frontend_opts) = frontend_opts { - services.push(RisingWaveService::Frontend(osstrs(frontend_opts))); + if let Some(opts) = compute_opts { + tracing::info!("starting compute-node thread with cli args: {:?}", opts); + let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await }); } - if let Some(compactor_opts) = compactor_opts { - services.push(RisingWaveService::Compactor(osstrs(compactor_opts))); + if let Some(opts) = frontend_opts { + tracing::info!("starting frontend-node thread with cli args: {:?}", opts); + let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await }); } - services -} - -pub async fn standalone(opts: StandaloneOpts) -> Result<()> { - tracing::info!("launching Risingwave in standalone mode"); - - let services = get_services(&opts); - - for service in services { - match service { - RisingWaveService::Meta(mut opts) => { - opts.insert(0, "meta-node".into()); - tracing::info!("starting meta-node thread with cli args: {:?}", opts); - let opts = risingwave_meta::MetaNodeOpts::parse_from(opts); - let _meta_handle = tokio::spawn(async move { - risingwave_meta::start(opts).await; - tracing::warn!("meta is stopped, shutdown all nodes"); - }); - // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - RisingWaveService::Compute(mut opts) => { - opts.insert(0, "compute-node".into()); - tracing::info!("starting compute-node thread with cli args: {:?}", opts); - let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts); - let _compute_handle = - tokio::spawn(async move { risingwave_compute::start(opts).await }); - } - RisingWaveService::Frontend(mut opts) => { - opts.insert(0, "frontend-node".into()); - tracing::info!("starting frontend-node thread with cli args: {:?}", opts); - let opts = risingwave_frontend::FrontendOpts::parse_from(opts); - let _frontend_handle = - tokio::spawn(async move { risingwave_frontend::start(opts).await }); - } - RisingWaveService::Compactor(mut opts) => { - opts.insert(0, "compactor-node".into()); - tracing::info!("starting compactor-node thread with cli args: {:?}", opts); - let opts = risingwave_compactor::CompactorOpts::parse_from(opts); - let _compactor_handle = - tokio::spawn(async move { risingwave_compactor::start(opts).await }); - } - RisingWaveService::ConnectorNode(_) => { - panic!("Connector node unsupported in Risingwave standalone mode."); - } - } + if let Some(opts) = compactor_opts { + tracing::info!("starting compactor-node thread with cli args: {:?}", opts); + let _compactor_handle = + tokio::spawn(async move { risingwave_compactor::start(opts).await }); } // wait for log messages to be flushed @@ -159,16 +194,20 @@ mod test { fn test_parse_opt_args() { // Test parsing into standalone-level opts. let raw_opts = " ---compute-opts=--listen-address 127.0.0.1 --port 8000 ---meta-opts=--data-dir \"some path with spaces\" --port 8001 ---frontend-opts=--some-option +--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10 +--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001 +--frontend-opts=--config-path=src/config/original.toml +--prometheus-listener-addr=127.0.0.1:1234 +--config-path=src/config/test.toml "; let actual = StandaloneOpts::parse_from(raw_opts.lines()); let opts = StandaloneOpts { - compute_opts: Some("--listen-address 127.0.0.1 --port 8000".into()), - meta_opts: Some("--data-dir \"some path with spaces\" --port 8001".into()), - frontend_opts: Some("--some-option".into()), + compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10".into()), + meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001".into()), + frontend_opts: Some("--config-path=src/config/original.toml".into()), compactor_opts: None, + prometheus_listener_addr: Some("127.0.0.1:1234".into()), + config_path: Some("src/config/test.toml".into()), }; assert_eq!(actual, opts); @@ -179,25 +218,77 @@ mod test { expect![[r#" ParsedStandaloneOpts { meta_opts: Some( - [ - "--data-dir", - "some path with spaces", - "--port", - "8001", - ], + MetaNodeOpts { + vpc_id: None, + security_group_id: None, + listen_addr: "127.0.0.1:8001", + advertise_addr: "127.0.0.1:9999", + dashboard_host: None, + prometheus_host: None, + etcd_endpoints: "", + etcd_auth: false, + etcd_username: "", + etcd_password: "", + sql_endpoint: None, + dashboard_ui_path: None, + prometheus_endpoint: None, + connector_rpc_endpoint: None, + privatelink_endpoint_default_tags: None, + config_path: "src/config/test.toml", + backend: None, + barrier_interval_ms: None, + sstable_size_mb: None, + block_size_kb: None, + bloom_false_positive: None, + state_store: None, + data_directory: Some( + "some path with spaces", + ), + do_not_config_object_storage_lifecycle: None, + backup_storage_url: None, + backup_storage_directory: None, + object_store_streaming_read_timeout_ms: None, + object_store_streaming_upload_timeout_ms: None, + object_store_upload_timeout_ms: None, + object_store_read_timeout_ms: None, + heap_profiling_dir: None, + }, ), compute_opts: Some( - [ - "--listen-address", - "127.0.0.1", - "--port", - "8000", - ], + ComputeNodeOpts { + listen_addr: "127.0.0.1:8000", + advertise_addr: None, + prometheus_listener_addr: "127.0.0.1:1234", + meta_address: "http://127.0.0.1:5690", + connector_rpc_endpoint: None, + connector_rpc_sink_payload_format: None, + config_path: "src/config/test.toml", + total_memory_bytes: 34359738368, + parallelism: 10, + role: Both, + metrics_level: None, + data_file_cache_dir: None, + meta_file_cache_dir: None, + async_stack_trace: None, + heap_profiling_dir: None, + object_store_streaming_read_timeout_ms: None, + object_store_streaming_upload_timeout_ms: None, + object_store_upload_timeout_ms: None, + object_store_read_timeout_ms: None, + }, ), frontend_opts: Some( - [ - "--some-option", - ], + FrontendOpts { + listen_addr: "127.0.0.1:4566", + advertise_addr: None, + port: None, + meta_addr: "http://127.0.0.1:5690", + prometheus_listener_addr: "127.0.0.1:1234", + health_check_listener_addr: "127.0.0.1:6786", + config_path: "src/config/test.toml", + metrics_level: None, + enable_barrier_read: None, + }, ), compactor_opts: None, }"#]],