Skip to content

Commit

Permalink
feat(cmd_all): support single_node mode (#14951)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 5, 2024
1 parent 94363df commit 896881d
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "derive"] }
console = "0.15"
const-str = "0.5"
home = "0.5"
prometheus = { version = "0.13" }
risingwave_cmd = { workspace = true }
risingwave_common = { workspace = true }
Expand Down
43 changes: 41 additions & 2 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
use std::str::FromStr;

use anyhow::Result;
use clap::error::ErrorKind;
use clap::{command, ArgMatches, Args, Command, FromArgMatches};
use risingwave_cmd::{compactor, compute, ctl, frontend, meta};
use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts};
use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts};
use risingwave_common::git_sha;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
Expand Down Expand Up @@ -98,7 +99,14 @@ enum Component {
Compactor,
Ctl,
Playground,
/// Used by cloud to bundle different components into a single node.
/// It exposes the low level configuration options of each node.
Standalone,
/// Used by users to run a single node.
/// The low level configuration options are hidden.
/// We only expose high-level configuration options,
/// which map across multiple nodes.
SingleNode,
}

impl Component {
Expand All @@ -117,6 +125,7 @@ impl Component {
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Standalone => standalone(parse_opts(matches)),
Self::SingleNode => single_node(parse_opts(matches)),
}
}

Expand All @@ -130,6 +139,7 @@ impl Component {
Component::Ctl => vec!["risectl"],
Component::Playground => vec!["play"],
Component::Standalone => vec![],
Component::SingleNode => vec!["single-node", "single"],
}
}

Expand All @@ -143,6 +153,7 @@ impl Component {
Component::Ctl => CtlOpts::augment_args(cmd),
Component::Playground => PlaygroundOpts::augment_args(cmd),
Component::Standalone => StandaloneOpts::augment_args(cmd),
Component::SingleNode => SingleNodeOpts::augment_args(cmd),
}
}

Expand Down Expand Up @@ -179,7 +190,23 @@ fn main() -> Result<()> {
.subcommands(Component::commands()),
);

let matches = command.get_matches();
let matches = match command.try_get_matches() {
Ok(m) => m,
Err(e) if e.kind() == ErrorKind::MissingSubcommand => {
// `$ ./risingwave`
// NOTE(kwannoel): This is a hack to make `risingwave`
// work as an alias of `risingwave single-process`.
// If invocation is not a multicall and there's no subcommand,
// we will try to invoke it as a single node.
let command = Component::SingleNode.augment_args(risingwave());
let matches = command.get_matches();
Component::SingleNode.start(&matches);
return Ok(());
}
Err(e) => {
e.exit();
}
};

let multicall = matches.subcommand().unwrap();
let argv_1 = multicall.1.subcommand();
Expand Down Expand Up @@ -207,3 +234,15 @@ fn standalone(opts: StandaloneOpts) {
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}

/// For single node, the internals are just a config mapping from its
/// high level options to standalone mode node-level options.
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}
3 changes: 3 additions & 0 deletions src/cmd_all/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ mod common;
pub mod playground;
mod standalone;

pub mod single_node;

pub use playground::*;
pub use single_node::*;
pub use standalone::*;

risingwave_expr_impl::enable!();
2 changes: 1 addition & 1 deletion src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn get_services(profile: &str) -> (Vec<RisingWaveService>, bool) {
}

#[derive(Debug, Clone, Parser)]
#[command(about = "The quick way to start a RisingWave cluster for playing around")]
#[command(about = "The quick way to start an in-memory RisingWave cluster for playing around")]
pub struct PlaygroundOpts {
/// The profile to use.
#[clap(short, long, env = "PLAYGROUND_PROFILE", default_value = "playground")]
Expand Down
229 changes: 229 additions & 0 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use clap::Parser;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
use risingwave_compactor::CompactorOpts;
use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts};
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;

use crate::ParsedStandaloneOpts;

pub static DEFAULT_STORE_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
let home_path = home_path.to_str().unwrap();
home_path.to_string()
});

pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY));

pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock<String> =
LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone()));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock<String> = LazyLock::new(|| {
format!(
"hummock+fs://{}",
DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone()
)
});

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
)]
/// Here we define our own defaults for the single node mode.
pub struct SingleNodeOpts {
/// The address prometheus polls metrics from.
#[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
prometheus_listener_addr: Option<String>,

/// The path to the cluster configuration file.
#[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
config_path: Option<String>,

/// The store directory used by meta store and object store.
#[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
store_directory: Option<String>,

/// The address of the meta node.
#[clap(long, env = "RW_SINGLE_NODE_META_ADDR")]
meta_addr: Option<String>,

/// The address of the compute node
#[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")]
compute_addr: Option<String>,

/// The address of the frontend node
#[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")]
frontend_addr: Option<String>,

/// The address of the compactor node
#[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")]
compactor_addr: Option<String>,
}

pub fn make_single_node_sql_endpoint(store_directory: &String) -> String {
format!(
"sqlite://{}/meta_store/single_node.db?mode=rwc",
store_directory
)
}

pub fn make_single_node_state_store_url(store_directory: &String) -> String {
format!("hummock+fs://{}/state_store", store_directory)
}

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = SingleNodeOpts::default_meta_opts();
let mut compute_opts = SingleNodeOpts::default_compute_opts();
let mut frontend_opts = SingleNodeOpts::default_frontend_opts();
let mut compactor_opts = SingleNodeOpts::default_compactor_opts();
if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(config_path) = &opts.config_path {
meta_opts.config_path = config_path.clone();
compute_opts.config_path = config_path.clone();
frontend_opts.config_path = config_path.clone();
compactor_opts.config_path = config_path.clone();
}
if let Some(store_directory) = &opts.store_directory {
let state_store_url = make_single_node_state_store_url(store_directory);
let meta_store_endpoint = make_single_node_sql_endpoint(store_directory);
meta_opts.state_store = Some(state_store_url);
meta_opts.sql_endpoint = Some(meta_store_endpoint);
}
if let Some(meta_addr) = &opts.meta_addr {
meta_opts.listen_addr = meta_addr.clone();
meta_opts.advertise_addr = meta_addr.clone();

compute_opts.meta_address = meta_addr.parse().unwrap();
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();
}
if let Some(compute_addr) = &opts.compute_addr {
compute_opts.listen_addr = compute_addr.clone();
}
if let Some(frontend_addr) = &opts.frontend_addr {
frontend_opts.listen_addr = frontend_addr.clone();
}
if let Some(compactor_addr) = &opts.compactor_addr {
compactor_opts.listen_addr = compactor_addr.clone();
}
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
compactor_opts: Some(compactor_opts),
}
}

impl SingleNodeOpts {
fn default_frontend_opts() -> FrontendOpts {
FrontendOpts {
listen_addr: "0.0.0.0:4566".to_string(),
advertise_addr: Some("0.0.0.0:4566".to_string()),
port: None,
meta_addr: "http://0.0.0.0:5690".parse().unwrap(),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
health_check_listener_addr: "0.0.0.0:6786".to_string(),
config_path: "".to_string(),
metrics_level: None,
enable_barrier_read: None,
}
}

fn default_meta_opts() -> MetaNodeOpts {
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "0.0.0.0:5690".to_string(),
advertise_addr: "0.0.0.0:5690".to_string(),
dashboard_host: Some("0.0.0.0:5691".to_string()),
prometheus_listener_addr: Some("0.0.0.0:1250".to_string()),
etcd_endpoints: Default::default(),
etcd_auth: false,
etcd_username: Default::default(),
etcd_password: Default::default(),
sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()),
dashboard_ui_path: None,
prometheus_endpoint: None,
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "".to_string(),
backend: Some(MetaBackend::Sql),
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()),
data_directory: Some("hummock_001".to_string()),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
heap_profiling_dir: None,
}
}

pub fn default_compute_opts() -> ComputeNodeOpts {
ComputeNodeOpts {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
}

fn default_compactor_opts() -> CompactorOpts {
CompactorOpts {
listen_addr: "0.0.0.0:6660".to_string(),
advertise_addr: Some("0.0.0.0:6660".to_string()),
port: None,
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
compaction_worker_threads_number: None,
config_path: "".to_string(),
metrics_level: None,
async_stack_trace: None,
heap_profiling_dir: None,
compactor_mode: None,
proxy_rpc_endpoint: "".to_string(),
}
}
}
8 changes: 8 additions & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ use tokio::signal;
use crate::common::osstrs;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service",
hide = true
)]
pub struct StandaloneOpts {
/// Compute node options
/// If missing, compute node won't start
Expand Down Expand Up @@ -161,6 +166,9 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
}
}

/// For `standalone` mode, we can configure and start multiple services in one process.
/// `standalone` mode is meant to be used by our cloud service and docker,
/// where we can configure and start multiple services in one process.
pub async fn standalone(
ParsedStandaloneOpts {
meta_opts,
Expand Down
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub mod test_utils;
pub mod transaction;
pub mod types;
pub mod vnode_mapping;

pub mod test_prelude {
pub use super::array::{DataChunkTestExt, StreamChunkTestExt};
pub use super::catalog::test_utils::ColumnDescTestExt;
Expand Down
Loading

0 comments on commit 896881d

Please sign in to comment.