Skip to content

Commit

Permalink
feat: flatten single-node args (#15611)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Mar 15, 2024
1 parent e73fc89 commit 9ec0a48
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 212 deletions.
3 changes: 1 addition & 2 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ fn standalone(opts: StandaloneOpts) {
/// high level options to standalone mode node-level options.
/// We will start a standalone instance, with all nodes in the same process.
fn single_node(opts: SingleNodeOpts) {
opts.create_store_directories().unwrap();
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(&opts);
let opts = risingwave_cmd_all::map_single_node_opts_to_standalone_opts(opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
Expand Down
297 changes: 126 additions & 171 deletions src/cmd_all/src/single_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use anyhow::Result;
use clap::Parser;
use home::home_dir;
use risingwave_common::config::{AsyncStackTraceOption, MetaBackend};
use risingwave_common::config::MetaBackend;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::{default_parallelism, default_total_memory_bytes, ComputeNodeOpts};
use risingwave_compute::ComputeNodeOpts;
use risingwave_frontend::FrontendOpts;
use risingwave_meta_node::MetaNodeOpts;

use crate::ParsedStandaloneOpts;

pub static DEFAULT_STORE_DIRECTORY: LazyLock<String> = LazyLock::new(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
let home_path = home_path.to_str().unwrap();
home_path.to_string()
});

pub static DEFAULT_SINGLE_NODE_SQLITE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/meta_store/single_node.db", &*DEFAULT_STORE_DIRECTORY));

pub static DEFAULT_SINGLE_NODE_SQL_ENDPOINT: LazyLock<String> =
LazyLock::new(|| format!("sqlite://{}?mode=rwc", *DEFAULT_SINGLE_NODE_SQLITE_PATH));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_PATH: LazyLock<String> =
LazyLock::new(|| format!("{}/state_store", DEFAULT_STORE_DIRECTORY.clone()));

pub static DEFAULT_SINGLE_NODE_STATE_STORE_URL: LazyLock<String> = LazyLock::new(|| {
format!(
"hummock+fs://{}",
DEFAULT_SINGLE_NODE_STATE_STORE_PATH.clone()
)
});

#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
#[command(
version,
Expand All @@ -65,41 +39,76 @@ pub struct SingleNodeOpts {

/// The store directory used by meta store and object store.
#[clap(long, env = "RW_SINGLE_NODE_STORE_DIRECTORY")]
pub store_directory: Option<String>,

/// The address of the meta node.
#[clap(long, env = "RW_SINGLE_NODE_META_ADDR")]
meta_addr: Option<String>,

/// The address of the compute node
#[clap(long, env = "RW_SINGLE_NODE_COMPUTE_ADDR")]
compute_addr: Option<String>,

/// The address of the frontend node
#[clap(long, env = "RW_SINGLE_NODE_FRONTEND_ADDR")]
frontend_addr: Option<String>,
store_directory: Option<String>,

/// The address of the compactor node
#[clap(long, env = "RW_SINGLE_NODE_COMPACTOR_ADDR")]
compactor_addr: Option<String>,
#[clap(flatten)]
node_opts: NodeSpecificOpts,
}

pub fn make_single_node_sql_endpoint(store_directory: &String) -> String {
format!(
"sqlite://{}/meta_store/single_node.db?mode=rwc",
store_directory
)
/// # Node-Specific Options
///
/// ## Which node-specific options should be here?
///
/// This only includes the options displayed in the CLI parameters.
///
/// 1. An option that will be forced to override by single-node deployment should not be here.
/// - e.g. `meta_addr` will be automatically set to localhost.
/// 2. An option defined in the config file and hidden in the command-line help should not be here.
/// - e.g. `etcd_endpoints` is encouraged to be set from config file or environment variables.
/// 3. An option that is only for cloud deployment should not be here.
/// - e.g. `proxy_rpc_endpoint` is used in cloud deployment only, and should not be used by open-source users.
///
/// ## How to add an new option here?
///
/// Options for the other nodes are defined with following convention:
///
/// 1. The option name is the same as the definition in node's `Opts` struct. May add a prefix to avoid conflicts when necessary.
/// 2. The option doesn't have a default value and must be `Option<T>`, so that the default value in the node's `Opts` struct can be used.
/// 3. The option doesn't need to read from environment variables, which will be done in the node's `Opts` struct.
#[derive(Eq, PartialOrd, PartialEq, Debug, Clone, Parser)]
pub struct NodeSpecificOpts {
// ------- Compute Node Options -------
/// Total available memory for the compute node in bytes. Used by both computing and storage.
#[clap(long)]
pub total_memory_bytes: Option<usize>,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long)]
pub parallelism: Option<usize>,

// ------- Frontend Node Options -------
/// The address that this service listens to.
/// Usually the localhost + desired port.
#[clap(long)]
pub listen_addr: Option<String>,

// ------- Meta Node Options -------
/// The HTTP REST-API address of the Prometheus instance associated to this cluster.
/// This address is used to serve PromQL queries to Prometheus.
/// It is also used by Grafana Dashboard Service to fetch metrics and visualize them.
#[clap(long)]
pub prometheus_endpoint: Option<String>,

/// The additional selector used when querying Prometheus.
///
/// The format is same as PromQL. Example: `instance="foo",namespace="bar"`
#[clap(long)]
pub prometheus_selector: Option<String>,

// ------- Compactor Node Options -------
#[clap(long)]
pub compaction_worker_threads_number: Option<usize>,
}

pub fn make_single_node_state_store_url(store_directory: &String) -> String {
format!("hummock+fs://{}/state_store", store_directory)
}
pub fn map_single_node_opts_to_standalone_opts(opts: SingleNodeOpts) -> ParsedStandaloneOpts {
// Parse from empty strings to get the default values.
// Note that environment variables will be used if they are set.
let empty_args = vec![] as Vec<String>;
let mut meta_opts = MetaNodeOpts::parse_from(&empty_args);
let mut compute_opts = ComputeNodeOpts::parse_from(&empty_args);
let mut frontend_opts = FrontendOpts::parse_from(&empty_args);
let mut compactor_opts = CompactorOpts::parse_from(&empty_args);

pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedStandaloneOpts {
let mut meta_opts = SingleNodeOpts::default_meta_opts();
let mut compute_opts = SingleNodeOpts::default_compute_opts();
let mut frontend_opts = SingleNodeOpts::default_frontend_opts();
let mut compactor_opts = SingleNodeOpts::default_compactor_opts();
if let Some(prometheus_listener_addr) = &opts.prometheus_listener_addr {
meta_opts.prometheus_listener_addr = Some(prometheus_listener_addr.clone());
compute_opts.prometheus_listener_addr = prometheus_listener_addr.clone();
Expand All @@ -112,132 +121,78 @@ pub fn map_single_node_opts_to_standalone_opts(opts: &SingleNodeOpts) -> ParsedS
frontend_opts.config_path = config_path.clone();
compactor_opts.config_path = config_path.clone();
}
if let Some(store_directory) = &opts.store_directory {
let state_store_url = make_single_node_state_store_url(store_directory);
let meta_store_endpoint = make_single_node_sql_endpoint(store_directory);

let store_directory = opts.store_directory.unwrap_or_else(|| {
let mut home_path = home_dir().unwrap();
home_path.push(".risingwave");
home_path.to_str().unwrap().to_string()
});

// Set state store for meta (if not set)
if meta_opts.state_store.is_none() {
let state_store_dir = format!("{}/state_store", &store_directory);
std::fs::create_dir_all(&state_store_dir).unwrap();
let state_store_url = format!("hummock+fs://{}", &state_store_dir);
meta_opts.state_store = Some(state_store_url);
meta_opts.sql_endpoint = Some(meta_store_endpoint);
}
if let Some(meta_addr) = &opts.meta_addr {
meta_opts.listen_addr = meta_addr.clone();
meta_opts.advertise_addr = meta_addr.clone();

compute_opts.meta_address = meta_addr.parse().unwrap();
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();
// FIXME: otherwise it reports: missing system param "data_directory", but I think it should be set by this way...
meta_opts.data_directory = Some("hummock_001".to_string());
}
if let Some(compute_addr) = &opts.compute_addr {
compute_opts.listen_addr = compute_addr.clone();

// Set meta store for meta (if not set)
let meta_backend_is_set = match meta_opts.backend {
Some(MetaBackend::Etcd) => !meta_opts.etcd_endpoints.is_empty(),
Some(MetaBackend::Sql) => meta_opts.sql_endpoint.is_some(),
Some(MetaBackend::Mem) => true,
None => false,
};
if !meta_backend_is_set {
meta_opts.backend = Some(MetaBackend::Sql);
let meta_store_dir = format!("{}/meta_store", &store_directory);
std::fs::create_dir_all(&meta_store_dir).unwrap();
let meta_store_endpoint = format!("sqlite://{}/single_node.db?mode=rwc", &meta_store_dir);
meta_opts.sql_endpoint = Some(meta_store_endpoint);
}
if let Some(frontend_addr) = &opts.frontend_addr {

// Set listen addresses (force to override)
meta_opts.listen_addr = "0.0.0.0:5690".to_string();
meta_opts.advertise_addr = "127.0.0.1:5690".to_string();
compute_opts.listen_addr = "0.0.0.0:5688".to_string();
compactor_opts.listen_addr = "0.0.0.0:6660".to_string();
if let Some(frontend_addr) = &opts.node_opts.listen_addr {
frontend_opts.listen_addr = frontend_addr.clone();
}
if let Some(compactor_addr) = &opts.compactor_addr {
compactor_opts.listen_addr = compactor_addr.clone();

// Set Meta addresses for all nodes (force to override)
let meta_addr = "http://127.0.0.1:5690".to_string();
compute_opts.meta_address = meta_addr.parse().unwrap();
frontend_opts.meta_addr = meta_addr.parse().unwrap();
compactor_opts.meta_address = meta_addr.parse().unwrap();

// Apply node-specific options
if let Some(total_memory_bytes) = opts.node_opts.total_memory_bytes {
compute_opts.total_memory_bytes = total_memory_bytes;
}
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
compactor_opts: Some(compactor_opts),
if let Some(parallelism) = opts.node_opts.parallelism {
compute_opts.parallelism = parallelism;
}
}

// Defaults
impl SingleNodeOpts {
fn default_frontend_opts() -> FrontendOpts {
FrontendOpts {
listen_addr: "0.0.0.0:4566".to_string(),
advertise_addr: Some("0.0.0.0:4566".to_string()),
port: None,
meta_addr: "http://0.0.0.0:5690".parse().unwrap(),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
health_check_listener_addr: "0.0.0.0:6786".to_string(),
config_path: "".to_string(),
metrics_level: None,
enable_barrier_read: None,
}
if let Some(listen_addr) = opts.node_opts.listen_addr {
frontend_opts.listen_addr = listen_addr;
}

fn default_meta_opts() -> MetaNodeOpts {
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "0.0.0.0:5690".to_string(),
advertise_addr: "0.0.0.0:5690".to_string(),
dashboard_host: Some("0.0.0.0:5691".to_string()),
prometheus_listener_addr: Some("0.0.0.0:1250".to_string()),
etcd_endpoints: Default::default(),
etcd_auth: false,
etcd_username: Default::default(),
etcd_password: Default::default(),
sql_endpoint: Some(DEFAULT_SINGLE_NODE_SQL_ENDPOINT.clone()),
dashboard_ui_path: None,
prometheus_endpoint: None,
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
config_path: "".to_string(),
backend: Some(MetaBackend::Sql),
barrier_interval_ms: None,
sstable_size_mb: None,
block_size_kb: None,
bloom_false_positive: None,
state_store: Some(DEFAULT_SINGLE_NODE_STATE_STORE_URL.clone()),
data_directory: Some("hummock_001".to_string()),
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
heap_profiling_dir: None,
}
if let Some(prometheus_endpoint) = opts.node_opts.prometheus_endpoint {
meta_opts.prometheus_endpoint = Some(prometheus_endpoint);
}

pub fn default_compute_opts() -> ComputeNodeOpts {
ComputeNodeOpts {
listen_addr: "0.0.0.0:5688".to_string(),
advertise_addr: Some("0.0.0.0:5688".to_string()),
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
connector_rpc_endpoint: None,
connector_rpc_sink_payload_format: None,
config_path: "".to_string(),
total_memory_bytes: default_total_memory_bytes(),
parallelism: default_parallelism(),
role: Default::default(),
metrics_level: None,
data_file_cache_dir: None,
meta_file_cache_dir: None,
async_stack_trace: Some(AsyncStackTraceOption::ReleaseVerbose),
heap_profiling_dir: None,
}
if let Some(prometheus_selector) = opts.node_opts.prometheus_selector {
meta_opts.prometheus_selector = Some(prometheus_selector);
}

fn default_compactor_opts() -> CompactorOpts {
CompactorOpts {
listen_addr: "0.0.0.0:6660".to_string(),
advertise_addr: Some("0.0.0.0:6660".to_string()),
port: None,
prometheus_listener_addr: "0.0.0.0:1250".to_string(),
meta_address: "http://0.0.0.0:5690".parse().unwrap(),
compaction_worker_threads_number: None,
config_path: "".to_string(),
metrics_level: None,
async_stack_trace: None,
heap_profiling_dir: None,
compactor_mode: None,
proxy_rpc_endpoint: "".to_string(),
}
if let Some(n) = opts.node_opts.compaction_worker_threads_number {
compactor_opts.compaction_worker_threads_number = Some(n);
}
}

impl SingleNodeOpts {
pub fn create_store_directories(&self) -> Result<()> {
let store_directory = self
.store_directory
.as_ref()
.unwrap_or_else(|| &*DEFAULT_STORE_DIRECTORY);
std::fs::create_dir_all(format!("{}/meta_store", store_directory))?;
std::fs::create_dir_all(format!("{}/state_store", store_directory))?;
Ok(())
ParsedStandaloneOpts {
meta_opts: Some(meta_opts),
compute_opts: Some(compute_opts),
frontend_opts: Some(frontend_opts),
compactor_opts: Some(compactor_opts),
}
}
4 changes: 2 additions & 2 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ mod test {
ParsedStandaloneOpts {
meta_opts: Some(
MetaNodeOpts {
vpc_id: None,
security_group_id: None,
listen_addr: "127.0.0.1:8001",
advertise_addr: "127.0.0.1:9999",
dashboard_host: None,
Expand All @@ -279,6 +277,8 @@ mod test {
prometheus_selector: None,
connector_rpc_endpoint: None,
privatelink_endpoint_default_tags: None,
vpc_id: None,
security_group_id: None,
config_path: "src/config/test.toml",
backend: None,
barrier_interval_ms: None,
Expand Down
Loading

0 comments on commit 9ec0a48

Please sign in to comment.