Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cmd_all): provide common options config-path and prometheus-listen-addr #12867

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum RisingWaveService {
Compute(Vec<OsString>),
Meta(Vec<OsString>),
Frontend(Vec<OsString>),
#[allow(dead_code)]
Compactor(Vec<OsString>),
ConnectorNode(Vec<OsString>),
}
Expand Down
277 changes: 184 additions & 93 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,112 @@

use anyhow::Result;
use clap::Parser;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::{osstrs, RisingWaveService};
use crate::common::osstrs;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
pub struct StandaloneOpts {
/// Compute node options
#[clap(short, long, env = "STANDALONE_COMPUTE_OPTS")]
/// If missing, compute node won't start
#[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
compute_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_META_OPTS")]
#[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
/// Meta node options
/// If missing, meta node won't start
meta_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_FRONTEND_OPTS")]
#[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
/// Frontend node options
/// If missing, frontend node won't start
frontend_opts: Option<String>,

#[clap(long, env = "STANDALONE_COMPACTOR_OPTS")]
/// Frontend node options
#[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
/// Compactor node options
/// If missing compactor node won't start
compactor_opts: Option<String>,

#[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
/// Prometheus listener address
/// If present, it will override prometheus listener address for
/// Frontend, Compute and Compactor nodes
prometheus_listener_addr: Option<String>,

#[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
/// Path to the config file
/// If present, it will override config path for
/// Frontend, Compute and Compactor nodes
config_path: Option<String>,
}

#[derive(Debug)]
pub struct ParsedStandaloneOpts {
pub meta_opts: Option<Vec<String>>,
pub compute_opts: Option<Vec<String>>,
pub frontend_opts: Option<Vec<String>>,
pub compactor_opts: Option<Vec<String>>,
pub meta_opts: Option<MetaNodeOpts>,
pub compute_opts: Option<ComputeNodeOpts>,
pub frontend_opts: Option<FrontendOpts>,
pub compactor_opts: Option<CompactorOpts>,
}

fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
let meta_opts = opts.meta_opts.as_ref().map(|s| split(s).unwrap());
let compute_opts = opts.compute_opts.as_ref().map(|s| split(s).unwrap());
let frontend_opts = opts.frontend_opts.as_ref().map(|s| split(s).unwrap());
let compactor_opts = opts.compactor_opts.as_ref().map(|s| split(s).unwrap());
let meta_opts = opts.meta_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "meta-node".into());
s
});
let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));

let compute_opts = opts.compute_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "compute-node".into());
s
});
let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));

let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "frontend-node".into());
s
});
let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));

let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "compactor-node".into());
s
});
let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));

if let Some(config_path) = opts.config_path.as_ref() {
if let Some(meta_opts) = meta_opts.as_mut() {
meta_opts.config_path = config_path.clone();
}
if let Some(compute_opts) = compute_opts.as_mut() {
compute_opts.config_path = config_path.clone();
}
if let Some(frontend_opts) = frontend_opts.as_mut() {
frontend_opts.config_path = config_path.clone();
}
if let Some(compactor_opts) = compactor_opts.as_mut() {
compactor_opts.config_path = config_path.clone();
}
}
if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
if let Some(compute_opts) = compute_opts.as_mut() {
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(frontend_opts) = frontend_opts.as_mut() {
frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(compactor_opts) = compactor_opts.as_mut() {
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
}
ParsedStandaloneOpts {
meta_opts,
compute_opts,
Expand All @@ -59,72 +128,38 @@ fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
}
}

fn get_services(opts: &StandaloneOpts) -> Vec<RisingWaveService> {
pub async fn standalone(opts: StandaloneOpts) -> Result<()> {
tracing::info!("launching Risingwave in standalone mode");

let ParsedStandaloneOpts {
meta_opts,
compute_opts,
frontend_opts,
compactor_opts,
} = parse_opt_args(opts);
let mut services = vec![];
if let Some(meta_opts) = meta_opts {
services.push(RisingWaveService::Meta(osstrs(meta_opts)));
}
if let Some(compute_opts) = compute_opts {
services.push(RisingWaveService::Compute(osstrs(compute_opts)));
} = parse_opt_args(&opts);

if let Some(opts) = meta_opts {
tracing::info!("starting meta-node thread with cli args: {:?}", opts);

let _meta_handle = tokio::spawn(async move {
risingwave_meta::start(opts).await;
tracing::warn!("meta is stopped, shutdown all nodes");
});
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if let Some(frontend_opts) = frontend_opts {
services.push(RisingWaveService::Frontend(osstrs(frontend_opts)));
if let Some(opts) = compute_opts {
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await });
}
if let Some(compactor_opts) = compactor_opts {
services.push(RisingWaveService::Compactor(osstrs(compactor_opts)));
if let Some(opts) = frontend_opts {
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
services
}

pub async fn standalone(opts: StandaloneOpts) -> Result<()> {
tracing::info!("launching Risingwave in standalone mode");

let services = get_services(&opts);

for service in services {
match service {
RisingWaveService::Meta(mut opts) => {
opts.insert(0, "meta-node".into());
tracing::info!("starting meta-node thread with cli args: {:?}", opts);
let opts = risingwave_meta::MetaNodeOpts::parse_from(opts);
let _meta_handle = tokio::spawn(async move {
risingwave_meta::start(opts).await;
tracing::warn!("meta is stopped, shutdown all nodes");
});
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
RisingWaveService::Compute(mut opts) => {
opts.insert(0, "compute-node".into());
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts);
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts).await });
}
RisingWaveService::Frontend(mut opts) => {
opts.insert(0, "frontend-node".into());
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let opts = risingwave_frontend::FrontendOpts::parse_from(opts);
let _frontend_handle =
tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
RisingWaveService::Compactor(mut opts) => {
opts.insert(0, "compactor-node".into());
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let opts = risingwave_compactor::CompactorOpts::parse_from(opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}
RisingWaveService::ConnectorNode(_) => {
panic!("Connector node unsupported in Risingwave standalone mode.");
}
}
if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}

// wait for log messages to be flushed
Expand Down Expand Up @@ -159,16 +194,20 @@ mod test {
fn test_parse_opt_args() {
// Test parsing into standalone-level opts.
let raw_opts = "
--compute-opts=--listen-address 127.0.0.1 --port 8000
--meta-opts=--data-dir \"some path with spaces\" --port 8001
--frontend-opts=--some-option
--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10
--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001
--frontend-opts=--config-path=src/config/original.toml
--prometheus-listener-addr=127.0.0.1:1234
--config-path=src/config/test.toml
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: Some("--listen-address 127.0.0.1 --port 8000".into()),
meta_opts: Some("--data-dir \"some path with spaces\" --port 8001".into()),
frontend_opts: Some("--some-option".into()),
compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10".into()),
meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001".into()),
frontend_opts: Some("--config-path=src/config/original.toml".into()),
compactor_opts: None,
prometheus_listener_addr: Some("127.0.0.1:1234".into()),
config_path: Some("src/config/test.toml".into()),
};
assert_eq!(actual, opts);

Expand All @@ -179,25 +218,77 @@ mod test {
expect![[r#"
ParsedStandaloneOpts {
meta_opts: Some(
[
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "127.0.0.1:8001",
advertise_addr: "127.0.0.1:9999",
dashboard_host: None,
prometheus_host: None,
etcd_endpoints: "",
etcd_auth: false,
etcd_username: "",
etcd_password: "",
sql_endpoint: None,
dashboard_ui_path: None,
prometheus_endpoint: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "src/config/test.toml",
backend: None,
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: None,
data_directory: Some(
"some path with spaces",
),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
heap_profiling_dir: None,
},
),
compute_opts: Some(
[
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
ComputeNodeOpts {
listen_addr: "127.0.0.1:8000",
advertise_addr: None,
prometheus_listener_addr: "127.0.0.1:1234",
meta_address: "http://127.0.0.1:5690",
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
parallelism: 10,
role: Both,
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: None,
heap_profiling_dir: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
},
),
frontend_opts: Some(
[
"--some-option",
],
FrontendOpts {
listen_addr: "127.0.0.1:4566",
advertise_addr: None,
port: None,
meta_addr: "http://127.0.0.1:5690",
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
},
),
compactor_opts: None,
}"#]],
Expand Down
Loading