diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 1539bbe720f5b..e9173abefe1df 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -240,10 +240,10 @@ fn standalone(opts: StandaloneOpts) { /// We will start a standalone instance, with all nodes in the same process. fn single_node(opts: SingleNodeOpts) { opts.create_store_directories().unwrap(); - let opts = risingwave_cmd_all::normalize_single_node_opts(&opts); + let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts); let settings = risingwave_rt::LoggerSettings::from_opts(&opts) .with_target("risingwave_storage", Level::WARN) .with_thread_name(true); risingwave_rt::init_risingwave_logger(settings); - risingwave_rt::main_okk(risingwave_cmd_all::single_node(opts)).unwrap(); + risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap(); } diff --git a/src/cmd_all/src/single_node.rs b/src/cmd_all/src/single_node.rs index f77b2588f75d8..98852cca64b60 100644 --- a/src/cmd_all/src/single_node.rs +++ b/src/cmd_all/src/single_node.rs @@ -14,16 +14,15 @@ use std::sync::LazyLock; -use anyhow::Result; use clap::Parser; use home::home_dir; use risingwave_common::config::{AsyncStackTraceOption, MetaBackend}; -use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_compactor::CompactorOpts; use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts}; use risingwave_frontend::FrontendOpts; use risingwave_meta_node::MetaNodeOpts; -use tokio::signal; + +use crate::ParsedStandaloneOpts; pub static DEFAULT_STORE_DIRECTORY: LazyLock = LazyLock::new(|| { let mut home_path = home_dir().unwrap(); @@ -95,7 +94,7 @@ pub fn make_single_node_state_store_url(store_directory: &String) -> String { format!("hummock+fs://{}/state_store", store_directory) } -pub fn normalize_single_node_opts(opts: &SingleNodeOpts) -> NormalizedSingleNodeOpts { +pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts { let mut meta_opts = SingleNodeOpts::default_meta_opts(); let mut compute_opts = SingleNodeOpts::default_compute_opts(); let mut frontend_opts = SingleNodeOpts::default_frontend_opts(); @@ -135,7 +134,7 @@ pub fn normalize_single_node_opts(opts: &SingleNodeOpts) -> NormalizedSingleNode if let Some(compactor_addr) = &opts.compactor_addr { compactor_opts.listen_addr = compactor_addr.clone(); } - NormalizedSingleNodeOpts { + ParsedStandaloneOpts { meta_opts: Some(meta_opts), compute_opts: Some(compute_opts), frontend_opts: Some(frontend_opts), @@ -241,83 +240,3 @@ impl SingleNodeOpts { Ok(()) } } - -#[derive(Debug)] -pub struct NormalizedSingleNodeOpts { - pub meta_opts: Option, - pub compute_opts: Option, - pub frontend_opts: Option, - pub compactor_opts: Option, -} - -impl risingwave_common::opts::Opts for NormalizedSingleNodeOpts { - fn name() -> &'static str { - "single_node" - } - - fn meta_addr(&self) -> MetaAddressStrategy { - if let Some(opts) = self.meta_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.compute_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.frontend_opts.as_ref() { - opts.meta_addr() - } else if let Some(opts) = self.compactor_opts.as_ref() { - opts.meta_addr() - } else { - unreachable!("at least one service should be specified as checked during parsing") - } - } -} - -/// For `single_node` mode, we can configure and start multiple services in one process. -/// `single_node` mode is meant to be used by our cloud service and docker, -/// where we can configure and start multiple services in one process. -pub async fn single_node( - NormalizedSingleNodeOpts { - meta_opts, - compute_opts, - frontend_opts, - compactor_opts, - }: NormalizedSingleNodeOpts, -) -> Result<()> { - tracing::info!("launching Risingwave in single_node mode"); - - if let Some(opts) = meta_opts { - tracing::info!("starting meta-node thread with cli args: {:?}", opts); - - let _meta_handle = tokio::spawn(async move { - risingwave_meta_node::start(opts).await; - tracing::warn!("meta is stopped, shutdown all nodes"); - }); - // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - if let Some(opts) = compute_opts { - tracing::info!("starting compute-node thread with cli args: {:?}", opts); - let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await }); - } - if let Some(opts) = frontend_opts { - tracing::info!("starting frontend-node thread with cli args: {:?}", opts); - let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await }); - } - if let Some(opts) = compactor_opts { - tracing::info!("starting compactor-node thread with cli args: {:?}", opts); - let _compactor_handle = - tokio::spawn(async move { risingwave_compactor::start(opts).await }); - } - - // wait for log messages to be flushed - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - eprintln!("-------------------------------"); - eprintln!("RisingWave single_node mode is ready."); - - // TODO: should we join all handles? - // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. - // TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not - // support it? - signal::ctrl_c().await.unwrap(); - tracing::info!("Ctrl+C received, now exiting"); - - Ok(()) -}