Skip to content

Commit

Permalink
feat(cmd_all): provide common options config-path and `prometheus-l…
Browse files Browse the repository at this point in the history
…isten-addr` (#12867)
  • Loading branch information
kwannoel authored Oct 17, 2023
1 parent b39aef3 commit d2ec83c
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 93 deletions.
1 change: 1 addition & 0 deletions src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum RisingWaveService {
Compute(Vec<OsString>),
Meta(Vec<OsString>),
Frontend(Vec<OsString>),
#[allow(dead_code)]
Compactor(Vec<OsString>),
ConnectorNode(Vec<OsString>),
}
Expand Down
277 changes: 184 additions & 93 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,112 @@

use anyhow::Result;
use clap::Parser;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta::MetaNodeOpts;
use shell_words::split;
use tokio::signal;

use crate::common::{osstrs, RisingWaveService};
use crate::common::osstrs;

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
pub struct StandaloneOpts {
/// Compute node options
#[clap(short, long, env = "STANDALONE_COMPUTE_OPTS")]
/// If missing, compute node won't start
#[clap(short, long, env = "RW_STANDALONE_COMPUTE_OPTS")]
compute_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_META_OPTS")]
#[clap(short, long, env = "RW_STANDALONE_META_OPTS")]
/// Meta node options
/// If missing, meta node won't start
meta_opts: Option<String>,

#[clap(short, long, env = "STANDALONE_FRONTEND_OPTS")]
#[clap(short, long, env = "RW_STANDALONE_FRONTEND_OPTS")]
/// Frontend node options
/// If missing, frontend node won't start
frontend_opts: Option<String>,

#[clap(long, env = "STANDALONE_COMPACTOR_OPTS")]
/// Frontend node options
#[clap(long, env = "RW_STANDALONE_COMPACTOR_OPTS")]
/// Compactor node options
/// If missing compactor node won't start
compactor_opts: Option<String>,

#[clap(long, env = "RW_STANDALONE_PROMETHEUS_LISTENER_ADDR")]
/// Prometheus listener address
/// If present, it will override prometheus listener address for
/// Frontend, Compute and Compactor nodes
prometheus_listener_addr: Option<String>,

#[clap(long, env = "RW_STANDALONE_CONFIG_PATH")]
/// Path to the config file
/// If present, it will override config path for
/// Frontend, Compute and Compactor nodes
config_path: Option<String>,
}

#[derive(Debug)]
pub struct ParsedStandaloneOpts {
pub meta_opts: Option<Vec<String>>,
pub compute_opts: Option<Vec<String>>,
pub frontend_opts: Option<Vec<String>>,
pub compactor_opts: Option<Vec<String>>,
pub meta_opts: Option<MetaNodeOpts>,
pub compute_opts: Option<ComputeNodeOpts>,
pub frontend_opts: Option<FrontendOpts>,
pub compactor_opts: Option<CompactorOpts>,
}

fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
let meta_opts = opts.meta_opts.as_ref().map(|s| split(s).unwrap());
let compute_opts = opts.compute_opts.as_ref().map(|s| split(s).unwrap());
let frontend_opts = opts.frontend_opts.as_ref().map(|s| split(s).unwrap());
let compactor_opts = opts.compactor_opts.as_ref().map(|s| split(s).unwrap());
let meta_opts = opts.meta_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "meta-node".into());
s
});
let mut meta_opts = meta_opts.map(|o| MetaNodeOpts::parse_from(osstrs(o)));

let compute_opts = opts.compute_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "compute-node".into());
s
});
let mut compute_opts = compute_opts.map(|o| ComputeNodeOpts::parse_from(osstrs(o)));

let frontend_opts = opts.frontend_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "frontend-node".into());
s
});
let mut frontend_opts = frontend_opts.map(|o| FrontendOpts::parse_from(osstrs(o)));

let compactor_opts = opts.compactor_opts.as_ref().map(|s| {
let mut s = split(s).unwrap();
s.insert(0, "compactor-node".into());
s
});
let mut compactor_opts = compactor_opts.map(|o| CompactorOpts::parse_from(osstrs(o)));

if let Some(config_path) = opts.config_path.as_ref() {
if let Some(meta_opts) = meta_opts.as_mut() {
meta_opts.config_path = config_path.clone();
}
if let Some(compute_opts) = compute_opts.as_mut() {
compute_opts.config_path = config_path.clone();
}
if let Some(frontend_opts) = frontend_opts.as_mut() {
frontend_opts.config_path = config_path.clone();
}
if let Some(compactor_opts) = compactor_opts.as_mut() {
compactor_opts.config_path = config_path.clone();
}
}
if let Some(prometheus_listener_addr) = opts.prometheus_listener_addr.as_ref() {
if let Some(compute_opts) = compute_opts.as_mut() {
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(frontend_opts) = frontend_opts.as_mut() {
frontend_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
if let Some(compactor_opts) = compactor_opts.as_mut() {
compactor_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
}
}
ParsedStandaloneOpts {
meta_opts,
compute_opts,
Expand All @@ -59,72 +128,38 @@ fn parse_opt_args(opts: &StandaloneOpts) -> ParsedStandaloneOpts {
}
}

fn get_services(opts: &StandaloneOpts) -> Vec<RisingWaveService> {
pub async fn standalone(opts: StandaloneOpts) -> Result<()> {
tracing::info!("launching Risingwave in standalone mode");

let ParsedStandaloneOpts {
meta_opts,
compute_opts,
frontend_opts,
compactor_opts,
} = parse_opt_args(opts);
let mut services = vec![];
if let Some(meta_opts) = meta_opts {
services.push(RisingWaveService::Meta(osstrs(meta_opts)));
}
if let Some(compute_opts) = compute_opts {
services.push(RisingWaveService::Compute(osstrs(compute_opts)));
} = parse_opt_args(&opts);

if let Some(opts) = meta_opts {
tracing::info!("starting meta-node thread with cli args: {:?}", opts);

let _meta_handle = tokio::spawn(async move {
risingwave_meta::start(opts).await;
tracing::warn!("meta is stopped, shutdown all nodes");
});
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if let Some(frontend_opts) = frontend_opts {
services.push(RisingWaveService::Frontend(osstrs(frontend_opts)));
if let Some(opts) = compute_opts {
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let _compute_handle = tokio::spawn(async move { risingwave_compute::start(opts).await });
}
if let Some(compactor_opts) = compactor_opts {
services.push(RisingWaveService::Compactor(osstrs(compactor_opts)));
if let Some(opts) = frontend_opts {
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let _frontend_handle = tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
services
}

pub async fn standalone(opts: StandaloneOpts) -> Result<()> {
tracing::info!("launching Risingwave in standalone mode");

let services = get_services(&opts);

for service in services {
match service {
RisingWaveService::Meta(mut opts) => {
opts.insert(0, "meta-node".into());
tracing::info!("starting meta-node thread with cli args: {:?}", opts);
let opts = risingwave_meta::MetaNodeOpts::parse_from(opts);
let _meta_handle = tokio::spawn(async move {
risingwave_meta::start(opts).await;
tracing::warn!("meta is stopped, shutdown all nodes");
});
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
RisingWaveService::Compute(mut opts) => {
opts.insert(0, "compute-node".into());
tracing::info!("starting compute-node thread with cli args: {:?}", opts);
let opts = risingwave_compute::ComputeNodeOpts::parse_from(opts);
let _compute_handle =
tokio::spawn(async move { risingwave_compute::start(opts).await });
}
RisingWaveService::Frontend(mut opts) => {
opts.insert(0, "frontend-node".into());
tracing::info!("starting frontend-node thread with cli args: {:?}", opts);
let opts = risingwave_frontend::FrontendOpts::parse_from(opts);
let _frontend_handle =
tokio::spawn(async move { risingwave_frontend::start(opts).await });
}
RisingWaveService::Compactor(mut opts) => {
opts.insert(0, "compactor-node".into());
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let opts = risingwave_compactor::CompactorOpts::parse_from(opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}
RisingWaveService::ConnectorNode(_) => {
panic!("Connector node unsupported in Risingwave standalone mode.");
}
}
if let Some(opts) = compactor_opts {
tracing::info!("starting compactor-node thread with cli args: {:?}", opts);
let _compactor_handle =
tokio::spawn(async move { risingwave_compactor::start(opts).await });
}

// wait for log messages to be flushed
Expand Down Expand Up @@ -159,16 +194,20 @@ mod test {
fn test_parse_opt_args() {
// Test parsing into standalone-level opts.
let raw_opts = "
--compute-opts=--listen-address 127.0.0.1 --port 8000
--meta-opts=--data-dir \"some path with spaces\" --port 8001
--frontend-opts=--some-option
--compute-opts=--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10
--meta-opts=--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001
--frontend-opts=--config-path=src/config/original.toml
--prometheus-listener-addr=127.0.0.1:1234
--config-path=src/config/test.toml
";
let actual = StandaloneOpts::parse_from(raw_opts.lines());
let opts = StandaloneOpts {
compute_opts: Some("--listen-address 127.0.0.1 --port 8000".into()),
meta_opts: Some("--data-dir \"some path with spaces\" --port 8001".into()),
frontend_opts: Some("--some-option".into()),
compute_opts: Some("--listen-addr 127.0.0.1:8000 --total-memory-bytes 34359738368 --parallelism 10".into()),
meta_opts: Some("--advertise-addr 127.0.0.1:9999 --data-directory \"some path with spaces\" --listen-addr 127.0.0.1:8001".into()),
frontend_opts: Some("--config-path=src/config/original.toml".into()),
compactor_opts: None,
prometheus_listener_addr: Some("127.0.0.1:1234".into()),
config_path: Some("src/config/test.toml".into()),
};
assert_eq!(actual, opts);

Expand All @@ -179,25 +218,77 @@ mod test {
expect![[r#"
ParsedStandaloneOpts {
meta_opts: Some(
[
"--data-dir",
"some path with spaces",
"--port",
"8001",
],
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "127.0.0.1:8001",
advertise_addr: "127.0.0.1:9999",
dashboard_host: None,
prometheus_host: None,
etcd_endpoints: "",
etcd_auth: false,
etcd_username: "",
etcd_password: "",
sql_endpoint: None,
dashboard_ui_path: None,
prometheus_endpoint: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "src/config/test.toml",
backend: None,
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: None,
data_directory: Some(
"some path with spaces",
),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
heap_profiling_dir: None,
},
),
compute_opts: Some(
[
"--listen-address",
"127.0.0.1",
"--port",
"8000",
],
ComputeNodeOpts {
listen_addr: "127.0.0.1:8000",
advertise_addr: None,
prometheus_listener_addr: "127.0.0.1:1234",
meta_address: "http://127.0.0.1:5690",
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
parallelism: 10,
role: Both,
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: None,
heap_profiling_dir: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
},
),
frontend_opts: Some(
[
"--some-option",
],
FrontendOpts {
listen_addr: "127.0.0.1:4566",
advertise_addr: None,
port: None,
meta_addr: "http://127.0.0.1:5690",
prometheus_listener_addr: "127.0.0.1:1234",
health_check_listener_addr: "127.0.0.1:6786",
config_path: "src/config/test.toml",
metrics_level: None,
enable_barrier_read: None,
},
),
compactor_opts: None,
}"#]],
Expand Down

0 comments on commit d2ec83c

Please sign in to comment.