Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cmd_all): support single_node mode #14951

Merged
merged 17 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "derive"] }
console = "0.15"
const-str = "0.5"
home = "0.5"
prometheus = { version = "0.13" }
risingwave_cmd = { workspace = true }
risingwave_common = { workspace = true }
Expand Down
43 changes: 41 additions & 2 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
use std::str::FromStr;

use anyhow::Result;
use clap::error::ErrorKind;
use clap::{command, ArgMatches, Args, Command, FromArgMatches};
use risingwave_cmd::{compactor, compute, ctl, frontend, meta};
use risingwave_cmd_all::{PlaygroundOpts, StandaloneOpts};
use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts};
use risingwave_common::git_sha;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
Expand Down Expand Up @@ -98,7 +99,14 @@ enum Component {
Compactor,
Ctl,
Playground,
/// Used by cloud to bundle different components into a single node.
/// It exposes the low level configuration options of each node.
Standalone,
/// Used by users to run a single node.
/// The low level configuration options are hidden.
/// We only expose high-level configuration options,
/// which map across multiple nodes.
SingleNode,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

impl Component {
Expand All @@ -117,6 +125,7 @@ impl Component {
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Standalone => standalone(parse_opts(matches)),
Self::SingleNode => single_node(parse_opts(matches)),
}
}

Expand All @@ -130,6 +139,7 @@ impl Component {
Component::Ctl => vec!["risectl"],
Component::Playground => vec!["play"],
Component::Standalone => vec![],
Component::SingleNode => vec!["single-node", "single"],
}
}

Expand All @@ -143,6 +153,7 @@ impl Component {
Component::Ctl => CtlOpts::augment_args(cmd),
Component::Playground => PlaygroundOpts::augment_args(cmd),
Component::Standalone => StandaloneOpts::augment_args(cmd),
Component::SingleNode => SingleNodeOpts::augment_args(cmd),
}
}

Expand Down Expand Up @@ -179,7 +190,23 @@ fn main() -> Result<()> {
.subcommands(Component::commands()),
);

let matches = command.get_matches();
let matches = match command.try_get_matches() {
Ok(m) => m,
Err(e) if e.kind() == ErrorKind::MissingSubcommand => {
// `$ ./risingwave`
// NOTE(kwannoel): This is a hack to make `risingwave`
// work as an alias of `risingwave single-process`.
// If invocation is not a multicall and there's no subcommand,
// we will try to invoke it as a single node.
let command = Component::SingleNode.augment_args(risingwave());
let matches = command.get_matches();
Component::SingleNode.start(&matches);
return Ok(());
}
Err(e) => {
e.exit();
}
};

let multicall = matches.subcommand().unwrap();
let argv_1 = multicall.1.subcommand();
Expand Down Expand Up @@ -207,3 +234,15 @@ fn standalone(opts: StandaloneOpts) {
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}

/// For single node, the internals are just a config mapping from its
/// high level options to standalone mode node-level options.
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::standalone(opts)).unwrap();
}
3 changes: 3 additions & 0 deletions src/cmd_all/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ mod common;
pub mod playground;
mod standalone;

pub mod single_node;

pub use playground::*;
pub use single_node::*;
pub use standalone::*;

risingwave_expr_impl::enable!();
231 changes: 231 additions & 0 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use clap::Parser;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
use risingwave_compactor::CompactorOpts;
use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts};
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;

use crate::ParsedStandaloneOpts;

pub static DEFAULT_STORE_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
let home_path = home_path.to_str().unwrap();
home_path.to_string()
});

pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY));

pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock<String> =
LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone()));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock<String> = LazyLock::new(|| {
format!(
"hummock+fs://{}",
DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone()
)
});

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "[default] The Single Node mode. Start all services in one process, with process-level options. This will be executed if no subcommand is specified"
)]
/// Here we define our own defaults for the single node mode.
pub struct SingleNodeOpts {
/// The prometheus address used by the single-node cluster.
/// If you have a prometheus instance,
/// it will poll the metrics from this address.
#[clap(long, env = "RW_SINGLE_NODE_PROMETHEUS_LISTENER_ADDR")]
prometheus_listener_addr: Option<String>,

/// The path to the cluster configuration file.
#[clap(long, env = "RW_SINGLE_NODE_CONFIG_PATH")]
config_path: Option<String>,

/// The store directory used by meta store and object store.
#[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
store_directory: Option<String>,

/// The address of the meta node.
#[clap(long, env = "RW_SINGLE_NODE_META_ADDR")]
meta_addr: Option<String>,

/// The address of the compute node
#[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")]
compute_addr: Option<String>,

/// The address of the frontend node
#[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")]
frontend_addr: Option<String>,

/// The address of the compactor node
#[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")]
compactor_addr: Option<String>,
}

pub fn make_single_node_sql_endpoint(store_directory: &String) -> String {
format!(
"sqlite://{}/meta_store/single_node.db?mode=rwc",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"sqlite://{}/meta_store/single_node.db?mode=rwc",
"sqlite://{}/meta_store.db?mode=rwc",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why this change? IMO this structure looks more consistent:

noelkwan@Noels-MacBook-Pro risingwave % tree ~/.risingwave/ 
/Users/noelkwan/.risingwave/
├── meta_store
│   └── single_node.db
└── state_store
    └── hummock_001
        ├── checkpoint
        │   └── 0
        └── cluster_id
            └── 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Than this:

noelkwan@Noels-MacBook-Pro risingwave % tree ~/.risingwave/ 
/Users/noelkwan/.risingwave/
├── single_node.db
└── state_store
    └── hummock_001
        ├── checkpoint
        │   └── 0
        └── cluster_id
            └── 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Eric's idea is:

/Users/noelkwan/.risingwave/
├── meta_store.db
└── state_store
    └── hummock_001
        ├── checkpoint
        │   └── 0
        └── cluster_id
            └── 0

store_directory
)
}

pub fn make_single_node_state_store_url(store_directory: &String) -> String {
format!("hummock+fs://{}/state_store", store_directory)
}

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = SingleNodeOpts::default_meta_opts();
let mut compute_opts = SingleNodeOpts::default_compute_opts();
let mut frontend_opts = SingleNodeOpts::default_frontend_opts();
let mut compactor_opts = SingleNodeOpts::default_compactor_opts();
if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(config_path) = &opts.config_path {
meta_opts.config_path = config_path.clone();
compute_opts.config_path = config_path.clone();
frontend_opts.config_path = config_path.clone();
compactor_opts.config_path = config_path.clone();
}
if let Some(store_directory) = &opts.store_directory {
let state_store_url = make_single_node_state_store_url(store_directory);
let meta_store_endpoint = make_single_node_sql_endpoint(store_directory);
meta_opts.state_store = Some(state_store_url);
meta_opts.sql_endpoint = Some(meta_store_endpoint);
}
if let Some(meta_addr) = &opts.meta_addr {
meta_opts.listen_addr = meta_addr.clone();
meta_opts.advertise_addr = meta_addr.clone();

compute_opts.meta_address = meta_addr.parse().unwrap();
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();
}
if let Some(compute_addr) = &opts.compute_addr {
compute_opts.listen_addr = compute_addr.clone();
}
if let Some(frontend_addr) = &opts.frontend_addr {
frontend_opts.listen_addr = frontend_addr.clone();
}
if let Some(compactor_addr) = &opts.compactor_addr {
compactor_opts.listen_addr = compactor_addr.clone();
}
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
compactor_opts: Some(compactor_opts),
}
}

impl SingleNodeOpts {
fn default_frontend_opts() -> FrontendOpts {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these methods could be painful to maintain. I'd suggest specifying the mapped fields only and leave other fields default with ..Default::default().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure how to expose clap's defaults. e.g. the default values specified by default_value.

I think I tried ..Default::default() for FrontendOpts previously, but it gives an error that the trait is not implemented for FrontendOpts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Thought it's serde.

FrontendOpts {
listen_addr: "0.0.0.0:4566".to_string(),
advertise_addr: Some("0.0.0.0:4566".to_string()),
port: None,
meta_addr: "http://0.0.0.0:5690".parse().unwrap(),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
health_check_listener_addr: "0.0.0.0:6786".to_string(),
config_path: "".to_string(),
metrics_level: None,
enable_barrier_read: None,
}
}

fn default_meta_opts() -> MetaNodeOpts {
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "0.0.0.0:5690".to_string(),
advertise_addr: "0.0.0.0:5690".to_string(),
dashboard_host: Some("0.0.0.0:5691".to_string()),
prometheus_listener_addr: Some("0.0.0.0:1250".to_string()),
etcd_endpoints: Default::default(),
etcd_auth: false,
etcd_username: Default::default(),
etcd_password: Default::default(),
sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()),
dashboard_ui_path: None,
prometheus_endpoint: None,
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "".to_string(),
backend: Some(MetaBackend::Sql),
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()),
data_directory: Some("hummock_001".to_string()),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
heap_profiling_dir: None,
}
}

pub fn default_compute_opts() -> ComputeNodeOpts {
ComputeNodeOpts {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
}

fn default_compactor_opts() -> CompactorOpts {
CompactorOpts {
listen_addr: "0.0.0.0:6660".to_string(),
advertise_addr: Some("0.0.0.0:6660".to_string()),
port: None,
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
compaction_worker_threads_number: None,
config_path: "".to_string(),
metrics_level: None,
async_stack_trace: None,
heap_profiling_dir: None,
compactor_mode: None,
proxy_rpc_endpoint: "".to_string(),
}
}
}
7 changes: 7 additions & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use tokio::signal;
use crate::common::osstrs;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
about = "The Standalone mode allows users to start multiple services in one process, it exposes node-level options for each service"
)]
pub struct StandaloneOpts {
/// Compute node options
/// If missing, compute node won't start
Expand Down Expand Up @@ -161,6 +165,9 @@ pub fn parse_standalone_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts
}
}

/// For `standalone` mode, we can configure and start multiple services in one process.
/// `standalone` mode is meant to be used by our cloud service and docker,
/// where we can configure and start multiple services in one process.
pub async fn standalone(
ParsedStandaloneOpts {
meta_opts,
Expand Down
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub mod test_utils;
pub mod transaction;
pub mod types;
pub mod vnode_mapping;

pub mod test_prelude {
pub use super::array::{DataChunkTestExt, StreamChunkTestExt};
pub use super::catalog::test_utils::ColumnDescTestExt;
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,14 @@ pub fn start(opts: ComputeNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>>
})
}

fn default_total_memory_bytes() -> usize {
pub fn default_total_memory_bytes() -> usize {
(system_memory_available_bytes() as f64 * DEFAULT_MEMORY_PROPORTION) as usize
}

fn default_parallelism() -> usize {
pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

fn default_role() -> Role {
pub fn default_role() -> Role {
Role::Both
}
Loading
Loading