Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cmd_all): support single_node mode #14951

Merged
merged 17 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "derive"] }
console = "0.15"
const-str = "0.5"
home = "0.5"
prometheus = { version = "0.13" }
risingwave_cmd = { workspace = true }
risingwave_common = { workspace = true }
Expand Down
35 changes: 33 additions & 2 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
use std::str::FromStr;

use anyhow::Result;
use clap::error::ErrorKind;
use clap::{command, ArgMatches, Args, Command, FromArgMatches};
use risingwave_cmd::{compactor, compute, ctl, frontend, meta};
use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts};
use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts};
use risingwave_common::git_sha;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
Expand Down Expand Up @@ -98,7 +99,14 @@ enum Component {
Compactor,
Ctl,
Playground,
/// Used by cloud to bundle different components into a single node.
/// It exposes the low level configuration options of each node.
Standalone,
/// Used by users to run a single node.
/// The low level configuration options are hidden.
/// We only expose high-level configuration options,
/// which map across multiple nodes.
SingleNode,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

impl Component {
Expand All @@ -117,6 +125,7 @@ impl Component {
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Standalone => standalone(parse_opts(matches)),
Self::SingleNode => single_node(parse_opts(matches)),
}
}

Expand All @@ -130,6 +139,7 @@ impl Component {
Component::Ctl => vec!["risectl"],
Component::Playground => vec!["play"],
Component::Standalone => vec![],
Component::SingleNode => vec![],
}
}

Expand All @@ -143,6 +153,7 @@ impl Component {
Component::Ctl => CtlOpts::augment_args(cmd),
Component::Playground => PlaygroundOpts::augment_args(cmd),
Component::Standalone => StandaloneOpts::augment_args(cmd),
Component::SingleNode => SingleNodeOpts::augment_args(cmd),
}
}

Expand Down Expand Up @@ -179,7 +190,18 @@ fn main() -> Result<()> {
.subcommands(Component::commands()),
);

let matches = command.get_matches();
let matches = match command.try_get_matches() {
Ok(m) => m,
Err(e) if e.kind() == ErrorKind::MissingSubcommand => {
let command = Component::SingleNode.augment_args(risingwave());
let matches = command.get_matches();
Component::SingleNode.start(&matches);
return Ok(());
}
Err(e) => {
e.exit();
}
};

let multicall = matches.subcommand().unwrap();
let argv_1 = multicall.1.subcommand();
Expand Down Expand Up @@ -207,3 +229,12 @@ fn standalone(opts: StandaloneOpts) {
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}

fn single_node(opts: SingleNodeOpts) {
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}
2 changes: 2 additions & 0 deletions src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env::home_dir;
use std::ffi::OsString;
use std::sync::LazyLock;

pub fn osstrs<T: Into<OsString> + AsRef<std::ffi::OsStr>>(s: impl AsRef<[T]>) -> Vec<OsString> {
s.as_ref().iter().map(OsString::from).collect()
Expand Down
3 changes: 3 additions & 0 deletions src/cmd_all/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ mod common;
pub mod playground;
mod standalone;

pub mod single_node;

pub use playground::*;
pub use single_node::*;
pub use standalone::*;

risingwave_expr_impl::enable!();
125 changes: 125 additions & 0 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use anyhow::Result;
use clap::Parser;
use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::osstrs;
use crate::ParsedStandaloneOpts;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "[DEFAULT] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
)]
/// Here we define our own defaults for the single node mode.
pub struct SingleNodeOpts {
#[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
prometheus_listener_addr: Option<String>,

#[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
config_path: Option<String>,

/// The data directory used by meta store and object store.
#[clap(long, env = "RW_SINGLE_NODE_DATA_DIRECTORY")]
data_directory: Option<String>,

/// The address of the meta node.
#[clap(long, env = "RW_SINGLE_NODE_META_ADDR")]
meta_addr: Option<String>,

/// The address of the compute node
#[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")]
compute_addr: Option<String>,

/// The address of the frontend node
#[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")]
frontend_addr: Option<String>,

/// The address of the compactor node
#[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")]
compactor_addr: Option<String>,
}

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = MetaNodeOpts::new_for_single_node();
let mut compute_opts = ComputeNodeOpts::new_for_single_node();
let mut frontend_opts = FrontendOpts::new_for_single_node();
let mut compactor_opts = CompactorOpts::new_for_single_node();
if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(config_path) = &opts.config_path {
meta_opts.config_path = config_path.clone();
compute_opts.config_path = config_path.clone();
frontend_opts.config_path = config_path.clone();
compactor_opts.config_path = config_path.clone();
}
// TODO(kwannoel): Also update state store URL
if let Some(data_directory) = &opts.data_directory {
meta_opts.data_directory = Some(data_directory.clone());
}
if let Some(meta_addr) = &opts.meta_addr {
meta_opts.listen_addr = meta_addr.clone();
meta_opts.advertise_addr = meta_addr.clone();

compute_opts.meta_address = meta_addr.parse().unwrap();
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();
}
if let Some(compute_addr) = &opts.compute_addr {
compute_opts.listen_addr = compute_addr.clone();
}
if let Some(frontend_addr) = &opts.frontend_addr {
frontend_opts.listen_addr = frontend_addr.clone();
}
if let Some(compactor_addr) = &opts.compactor_addr {
compactor_opts.listen_addr = compactor_addr.clone();
}
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
compactor_opts: Some(compactor_opts),
}
}

#[cfg(test)]
mod test {
use std::fmt::Debug;

use expect_test::{expect, Expect};

use super::*;

fn check(actual: impl Debug, expect: Expect) {
let actual = format!("{:#?}", actual);
expect.assert_eq(&actual);
}

#[test]
fn test_parse_opt_args() {}
}
12 changes: 12 additions & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use tokio::signal;
use crate::common::osstrs;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service"
)]
pub struct StandaloneOpts {
/// Compute node options
/// If missing, compute node won't start
Expand Down Expand Up @@ -161,6 +165,14 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
}
}

/// For `standalone` mode, we can configure and start multiple services in one process.
/// Note that this is different from `single` mode, where we start
/// pre-defined services all-in-one process,
/// the pre-defined services are not configurable.
/// `single` mode is meant to be user-facing, where users can just use `./risingwave`
/// to start the service.
/// `standalone` mode is meant to be used by our cloud service, where we can configure
/// and start multiple services in one process.
pub async fn standalone(
ParsedStandaloneOpts {
meta_opts,
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fs-err = "2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hex = "0.4.3"
home = "0.5"
http = "0.2"
humantime = "2.1"
hyper = "0.14"
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod opts;
pub mod range;
pub mod row;
pub mod session_config;
pub mod single_process_config;
pub mod system_param;
pub mod telemetry;
pub mod test_utils;
Expand Down
46 changes: 46 additions & 0 deletions src/common/src/single_process_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module defines default configurations for single node mode.

use std::sync::LazyLock;

use home::home_dir;

pub static DEFAULT_DATA_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
let home_path = home_path.to_str().unwrap();
home_path.to_string()
});

pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock<String> = LazyLock::new(|| {
format!(
"{}/meta_store/single_node.db",
DEFAULT_DATA_DIRECTORY.clone()
)
});

pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock<String> =
LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/state_store", DEFAULT_DATA_DIRECTORY.clone()));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock<String> = LazyLock::new(|| {
format!(
"hummock+fs://{}",
DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone()
)
});
23 changes: 23 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use serde::{Deserialize, Serialize};
use tonic::IntoStreamingRequest;

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
/// the system memory limit multiplied by this proportion
Expand Down Expand Up @@ -133,6 +134,28 @@ pub struct ComputeNodeOpts {
pub heap_profiling_dir: Option<String>,
}

impl ComputeNodeOpts {
pub fn new_for_single_node() -> Self {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
Self {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
}
}

impl risingwave_common::opts::Opts for ComputeNodeOpts {
fn name() -> &'static str {
"compute"
Expand Down
Loading
Loading