Skip to content

Commit

Permalink
Revert "separate single_node from standalone mode"
Browse files Browse the repository at this point in the history
This reverts commit fecbdbe.
  • Loading branch information
kwannoel committed Feb 21, 2024
1 parent 6435778 commit 17f6ee4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 87 deletions.
4 changes: 2 additions & 2 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ fn standalone(opts: StandaloneOpts) {
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
opts.create_store_directories().unwrap();
let opts = risingwave_cmd_all::normalize_single_node_opts(&opts);
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::single_node(opts)).unwrap();
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}
89 changes: 4 additions & 85 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@

use std::sync::LazyLock;

use anyhow::Result;
use clap::Parser;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts};
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;
use tokio::signal;

use crate::ParsedStandaloneOpts;

pub static DEFAULT_STORE_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
Expand Down Expand Up @@ -95,7 +94,7 @@ pub fn make_single_node_state_store_url(store_directory: &String) -> String {
format!("hummock+fs://{}/state_store", store_directory)
}

pub fn normalize_single_node_opts(opts: &SingleNodeOpts) -> NormalizedSingleNodeOpts {
pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = SingleNodeOpts::default_meta_opts();
let mut compute_opts = SingleNodeOpts::default_compute_opts();
let mut frontend_opts = SingleNodeOpts::default_frontend_opts();
Expand Down Expand Up @@ -135,7 +134,7 @@ pub fn normalize_single_node_opts(opts: &SingleNodeOpts) -> NormalizedSingleNode
if let Some(compactor_addr) = &opts.compactor_addr {
compactor_opts.listen_addr = compactor_addr.clone();
}
NormalizedSingleNodeOpts {
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
Expand Down Expand Up @@ -241,83 +240,3 @@ impl SingleNodeOpts {
Ok(())
}
}

#[derive(Debug)]
pub struct NormalizedSingleNodeOpts {
pub meta_opts: Option<MetaNodeOpts>,
pub compute_opts: Option<ComputeNodeOpts>,
pub frontend_opts: Option<FrontendOpts>,
pub compactor_opts: Option<CompactorOpts>,
}

impl risingwave_common::opts::Opts for NormalizedSingleNodeOpts {
fn name() -> &'static str {
"single_node"
}

fn meta_addr(&self) -> MetaAddressStrategy {
if let Some(opts) = self.meta_opts.as_ref() {
opts.meta_addr()
} else if let Some(opts) = self.compute_opts.as_ref() {
opts.meta_addr()
} else if let Some(opts) = self.frontend_opts.as_ref() {
opts.meta_addr()
} else if let Some(opts) = self.compactor_opts.as_ref() {
opts.meta_addr()
} else {
unreachable!("at least one service should be specified as checked during parsing")
}
}
}

/// For `single_node` mode, we can configure and start multiple services in one process.
/// `single_node` mode is meant to be used by our cloud service and docker,
/// where we can configure and start multiple services in one process.
pub async fn single_node(
NormalizedSingleNodeOpts {
meta_opts,
compute_opts,
frontend_opts,
compactor_opts,
}: NormalizedSingleNodeOpts,
) -> Result<()> {
tracing::info!("launching Risingwave in single_node mode");

if let Some(opts) = meta_opts {
tracing::info!("starting meta-node thread with cli args: {:?}", opts);

let _meta_handle = tokio::spawn(async move {
risingwave_meta_node::start(opts).await;
tracing::warn!("meta is stopped, shutdown all nodes");
});
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if let Some(opts) = compute_opts {
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await });
}
if let Some(opts) = frontend_opts {
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}

// wait for log messages to be flushed
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
eprintln!("-------------------------------");
eprintln!("RisingWave single_node mode is ready.");

// TODO: should we join all handles?
// Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.
// TODO(kwannoel): Why can't be shutdown gracefully? Is it that the service just does not
// support it?
signal::ctrl_c().await.unwrap();
tracing::info!("Ctrl+C received, now exiting");

Ok(())
}

0 comments on commit 17f6ee4

Please sign in to comment.