diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 4614b2a7806f..8bbaba4aa162 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -13,51 +13,25 @@ // limitations under the License. use clap::Parser; -use common_config::KvBackendConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; -use common_wal::config::MetasrvWalConfig; use config::{Config, Environment, File, FileFormat}; -use datanode::config::{DatanodeOptions, ProcedureConfig}; -use frontend::error::{Result as FeResult, TomlFormatSnafu}; -use frontend::frontend::{FrontendOptions, TomlSerializable}; +use datanode::config::DatanodeOptions; +use frontend::frontend::FrontendOptions; use meta_srv::metasrv::MetasrvOptions; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu}; +use crate::standalone::StandaloneOptions; pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; -/// Options mixed up from datanode, frontend and metasrv. -#[derive(Serialize, Debug, Clone)] -pub struct MixOptions { - pub data_home: String, - pub procedure: ProcedureConfig, - pub metadata_store: KvBackendConfig, - pub frontend: FrontendOptions, - pub datanode: DatanodeOptions, - pub logging: LoggingOptions, - pub wal_meta: MetasrvWalConfig, -} - -impl From for FrontendOptions { - fn from(value: MixOptions) -> Self { - value.frontend - } -} - -impl TomlSerializable for MixOptions { - fn to_toml(&self) -> FeResult { - toml::to_string(self).context(TomlFormatSnafu) - } -} - pub enum Options { Datanode(Box), Frontend(Box), Metasrv(Box), - Standalone(Box), + Standalone(Box), Cli(Box), } @@ -158,10 +132,9 @@ impl Options { pub fn node_id(&self) -> Option { match self { - Options::Metasrv(_) | Options::Cli(_) => None, + Options::Metasrv(_) | Options::Cli(_) | Options::Standalone(_) => None, Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()), Options::Frontend(opt) => opt.node_id.clone(), - Options::Standalone(opt) => opt.frontend.node_id.clone(), } } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e55e98afab46..0c514512f69c 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -40,6 +40,7 @@ use common_wal::config::StandaloneWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; +use frontend::error::{Result as FeResult, TomlFormatSnafu}; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -61,7 +62,7 @@ use crate::error::{ Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; -use crate::options::{GlobalOptions, MixOptions, Options}; +use crate::options::{GlobalOptions, Options}; use crate::App; #[derive(Parser)] @@ -71,7 +72,7 @@ pub struct Command { } impl Command { - pub async fn build(self, opts: MixOptions) -> Result { + pub async fn build(self, opts: StandaloneOptions) -> Result { self.subcmd.build(opts).await } @@ -86,7 +87,7 @@ enum SubCommand { } impl SubCommand { - async fn build(self, opts: MixOptions) -> Result { + async fn build(self, opts: StandaloneOptions) -> Result { match self { SubCommand::Start(cmd) => cmd.build(opts).await, } @@ -158,37 +159,43 @@ impl Default for StandaloneOptions { } impl StandaloneOptions { - fn frontend_options(self) -> FrontendOptions { + pub fn frontend_options(&self) -> FrontendOptions { + let cloned_opts = self.clone(); FrontendOptions { - mode: self.mode, - default_timezone: self.default_timezone, - http: self.http, - grpc: self.grpc, - mysql: self.mysql, - postgres: self.postgres, - opentsdb: self.opentsdb, - influxdb: self.influxdb, - prom_store: self.prom_store, + mode: cloned_opts.mode, + default_timezone: cloned_opts.default_timezone, + http: cloned_opts.http, + grpc: cloned_opts.grpc, + mysql: cloned_opts.mysql, + postgres: cloned_opts.postgres, + opentsdb: cloned_opts.opentsdb, + influxdb: cloned_opts.influxdb, + prom_store: cloned_opts.prom_store, meta_client: None, - logging: self.logging, - user_provider: self.user_provider, + logging: cloned_opts.logging, + user_provider: cloned_opts.user_provider, // Handle the export metrics task run by standalone to frontend for execution - export_metrics: self.export_metrics, + export_metrics: cloned_opts.export_metrics, ..Default::default() } } - fn datanode_options(self) -> DatanodeOptions { + pub fn datanode_options(&self) -> DatanodeOptions { + let cloned_opts = self.clone(); DatanodeOptions { node_id: Some(0), - enable_telemetry: self.enable_telemetry, - wal: self.wal.into(), - storage: self.storage, - region_engine: self.region_engine, - rpc_addr: self.grpc.addr, + enable_telemetry: cloned_opts.enable_telemetry, + wal: cloned_opts.wal.into(), + storage: cloned_opts.storage, + region_engine: cloned_opts.region_engine, + rpc_addr: cloned_opts.grpc.addr, ..Default::default() } } + + pub fn to_toml(&self) -> FeResult { + toml::to_string(self).context(TomlFormatSnafu) + } } pub struct Instance { @@ -277,20 +284,12 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - let opts: StandaloneOptions = Options::load_layered_options( + let mut opts: StandaloneOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), StandaloneOptions::env_list_keys(), )?; - self.convert_options(global_options, opts) - } - - pub fn convert_options( - &self, - global_options: &GlobalOptions, - mut opts: StandaloneOptions, - ) -> Result { opts.mode = Mode::Standalone; if let Some(dir) = &global_options.log_dir { @@ -323,8 +322,7 @@ impl StartCommand { msg: format!( "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}", ), - } - .fail(); + }.fail(); } opts.grpc.addr.clone_from(addr) } @@ -347,47 +345,32 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - let metadata_store = opts.metadata_store.clone(); - let procedure = opts.procedure.clone(); - let frontend = opts.clone().frontend_options(); - let logging = opts.logging.clone(); - let wal_meta = opts.wal.clone().into(); - let datanode = opts.datanode_options().clone(); - - Ok(Options::Standalone(Box::new(MixOptions { - procedure, - metadata_store, - data_home: datanode.storage.data_home.to_string(), - frontend, - datanode, - logging, - wal_meta, - }))) + Ok(Options::Standalone(Box::new(opts))) } #[allow(unreachable_code)] #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] - async fn build(self, opts: MixOptions) -> Result { + async fn build(self, opts: StandaloneOptions) -> Result { info!("Standalone start command: {:#?}", self); info!("Building standalone instance with {opts:#?}"); - let mut fe_opts = opts.frontend; + let mut fe_opts = opts.frontend_options(); #[allow(clippy::unnecessary_mut_passed)] let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it .await .context(StartFrontendSnafu)?; - let dn_opts = opts.datanode; + let dn_opts = opts.datanode_options(); set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?; + let data_home = &dn_opts.storage.data_home; // Ensure the data_home directory exists. - fs::create_dir_all(path::Path::new(&opts.data_home)).context(CreateDirSnafu { - dir: &opts.data_home, - })?; + fs::create_dir_all(path::Path::new(data_home)) + .context(CreateDirSnafu { dir: data_home })?; - let metadata_dir = metadata_store_dir(&opts.data_home); + let metadata_dir = metadata_store_dir(data_home); let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components( metadata_dir, opts.metadata_store.clone(), @@ -424,7 +407,7 @@ impl StartCommand { .build(), ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - opts.wal_meta.clone(), + opts.wal.into(), kv_backend.clone(), )); let table_metadata_manager = @@ -621,8 +604,8 @@ mod tests { else { unreachable!() }; - let fe_opts = options.frontend; - let dn_opts = options.datanode; + let fe_opts = options.frontend_options(); + let dn_opts = options.datanode_options(); let logging_opts = options.logging; assert_eq!(Mode::Standalone, fe_opts.mode); assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr); @@ -759,11 +742,12 @@ mod tests { assert_eq!(opts.logging.level.as_ref().unwrap(), "debug"); // Should be read from cli, cli > config file > env > default values. - assert_eq!(opts.frontend.http.addr, "127.0.0.1:14000"); - assert_eq!(ReadableSize::mb(64), opts.frontend.http.body_limit); + let fe_opts = opts.frontend_options(); + assert_eq!(fe_opts.http.addr, "127.0.0.1:14000"); + assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit); // Should be default value. - assert_eq!(opts.frontend.grpc.addr, GrpcOptions::default().addr); + assert_eq!(fe_opts.grpc.addr, GrpcOptions::default().addr); }, ); } diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 6c4993b835bc..072763782750 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -75,6 +75,25 @@ impl From for MetasrvWalConfig { } } +impl From for StandaloneWalConfig { + fn from(config: MetasrvWalConfig) -> Self { + match config { + MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()), + MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig { + broker_endpoints: config.broker_endpoints, + num_topics: config.num_topics, + selector_type: config.selector_type, + topic_name_prefix: config.topic_name_prefix, + num_partitions: config.num_partitions, + replication_factor: config.replication_factor, + create_topic_timeout: config.create_topic_timeout, + backoff: config.backoff, + ..Default::default() + }), + } + } +} + impl From for DatanodeWalConfig { fn from(config: StandaloneWalConfig) -> Self { match config { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 6b15b4fad999..56e3f7c0ab58 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; -use cmd::options::MixOptions; +use cmd::standalone::StandaloneOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; @@ -32,10 +32,8 @@ use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::WalOptionsAllocator; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; -use common_telemetry::logging::LoggingOptions; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; -use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; @@ -45,7 +43,7 @@ use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, Test pub struct GreptimeDbStandalone { pub instance: Arc, - pub mix_options: MixOptions, + pub opts: StandaloneOptions, pub guard: TestGuard, // Used in rebuild. pub kv_backend: KvBackendRef, @@ -115,13 +113,13 @@ impl GreptimeDbStandaloneBuilder { &self, kv_backend: KvBackendRef, guard: TestGuard, - mix_options: MixOptions, + opts: StandaloneOptions, procedure_manager: ProcedureManagerRef, register_procedure_loaders: bool, ) -> GreptimeDbStandalone { let plugins = self.plugin.clone().unwrap_or_default(); - let datanode = DatanodeBuilder::new(mix_options.datanode.clone(), plugins.clone()) + let datanode = DatanodeBuilder::new(opts.datanode_options(), plugins.clone()) .with_kv_backend(kv_backend.clone()) .build() .await @@ -154,7 +152,7 @@ impl GreptimeDbStandaloneBuilder { .build(), ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - mix_options.wal_meta.clone(), + opts.wal.clone().into(), kv_backend.clone(), )); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( @@ -202,7 +200,7 @@ impl GreptimeDbStandaloneBuilder { GreptimeDbStandalone { instance: Arc::new(instance), - mix_options, + opts, guard, kv_backend, procedure_manager, @@ -231,18 +229,15 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let wal_meta = self.metasrv_wal_config.clone(); - let mix_options = MixOptions { - data_home: opts.storage.data_home.to_string(), + let standalone_opts = StandaloneOptions { + storage: opts.storage, procedure: procedure_config, metadata_store: kv_backend_config, - frontend: FrontendOptions::default(), - datanode: opts, - logging: LoggingOptions::default(), - wal_meta, + wal: self.metasrv_wal_config.clone().into(), + ..StandaloneOptions::default() }; - self.build_with(kv_backend, guard, mix_options, procedure_manager, true) + self.build_with(kv_backend, guard, standalone_opts, procedure_manager, true) .await } } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index e347c30e401b..3f9b44154166 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -33,7 +33,6 @@ use datanode::config::{ AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, }; -use frontend::frontend::TomlSerializable; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; use futures::future::BoxFuture; @@ -392,7 +391,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router None, ) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(instance.mix_options.datanode.to_toml_string()) + .with_greptime_config_options(instance.opts.datanode_options().to_toml_string()) .build(); (http_server.build(http_server.make_app()), instance.guard) } @@ -425,7 +424,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), Some(instance.instance.clone()), ) - .with_greptime_config_options(instance.mix_options.to_toml().unwrap()); + .with_greptime_config_options(instance.opts.to_toml().unwrap()); if let Some(user_provider) = user_provider { http_server = http_server.with_user_provider(user_provider); @@ -464,7 +463,7 @@ pub async fn setup_test_prom_app_with_frontend( ) .with_prom_handler(frontend_ref.clone(), true, is_strict_mode) .with_prometheus_handler(frontend_ref) - .with_greptime_config_options(instance.mix_options.datanode.to_toml_string()) + .with_greptime_config_options(instance.opts.datanode_options().to_toml_string()) .build(); let app = http_server.build(http_server.make_app()); (app, instance.guard) diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 01ca50f29f50..c052b668d8bc 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -106,7 +106,7 @@ impl MockInstanceBuilder { unreachable!() }; let GreptimeDbStandalone { - mix_options, + opts, guard, kv_backend, procedure_manager, @@ -114,7 +114,7 @@ impl MockInstanceBuilder { } = instance; MockInstanceImpl::Standalone( builder - .build_with(kv_backend, guard, mix_options, procedure_manager, false) + .build_with(kv_backend, guard, opts, procedure_manager, false) .await, ) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7ffd4dc4c6a4..2471e0a92059 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -729,103 +729,54 @@ pub async fn test_config_api(store_type: StorageType) { let expected_toml_str = format!( r#" -[procedure] -max_retry_times = 3 -retry_delay = "500ms" - -[metadata_store] -file_size = "256MiB" -purge_threshold = "4GiB" - -[frontend] mode = "standalone" +enable_telemetry = true -[frontend.heartbeat] -interval = "18s" -retry_interval = "3s" - -[frontend.http] +[http] addr = "127.0.0.1:4000" timeout = "30s" body_limit = "64MiB" is_strict_mode = false -[frontend.grpc] +[grpc] addr = "127.0.0.1:4001" runtime_size = 8 max_recv_message_size = "512MiB" max_send_message_size = "512MiB" -[frontend.mysql] +[mysql] enable = true addr = "127.0.0.1:4002" runtime_size = 2 -[frontend.mysql.tls] +[mysql.tls] mode = "disable" cert_path = "" key_path = "" watch = false -[frontend.postgres] +[postgres] enable = true addr = "127.0.0.1:4003" runtime_size = 2 -[frontend.postgres.tls] +[postgres.tls] mode = "disable" cert_path = "" key_path = "" watch = false -[frontend.opentsdb] +[opentsdb] enable = true -[frontend.influxdb] +[influxdb] enable = true -[frontend.prom_store] +[prom_store] enable = true with_metric_engine = true -[frontend.otlp] -enable = true - -[frontend.logging] -enable_otlp_tracing = false -append_stdout = true - -[frontend.datanode.client] -timeout = "10s" -connect_timeout = "1s" -tcp_nodelay = true - -[frontend.export_metrics] -enable = false -write_interval = "30s" - -[datanode] -mode = "standalone" -node_id = 0 -require_lease_before_startup = true -init_regions_in_background = false -rpc_addr = "127.0.0.1:3001" -rpc_runtime_size = 8 -rpc_max_recv_message_size = "512MiB" -rpc_max_send_message_size = "512MiB" -enable_telemetry = true - -[datanode.heartbeat] -interval = "3s" -retry_interval = "3s" - -[datanode.http] -addr = "127.0.0.1:4000" -timeout = "30s" -body_limit = "64MiB" -is_strict_mode = false - -[datanode.wal] +[wal] provider = "raft_engine" file_size = "256MiB" purge_threshold = "4GiB" @@ -835,13 +786,25 @@ sync_write = false enable_log_recycle = true prefill_log_files = false -[datanode.storage] +[storage] type = "{}" providers = [] -[[datanode.region_engine]] +[metadata_store] +file_size = "256MiB" +purge_threshold = "4GiB" + +[procedure] +max_retry_times = 3 +retry_delay = "500ms" + +[logging] +enable_otlp_tracing = false +append_stdout = true -[datanode.region_engine.mito] +[[region_engine]] + +[region_engine.mito] worker_channel_size = 128 worker_request_batch_size = 64 manifest_checkpoint_distance = 10 @@ -855,7 +818,7 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false -[datanode.region_engine.mito.inverted_index] +[region_engine.mito.inverted_index] create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" @@ -863,29 +826,20 @@ write_buffer_size = "8MiB" mem_threshold_on_create = "64.0MiB" intermediate_path = "" -[datanode.region_engine.mito.memtable] +[region_engine.mito.memtable] type = "time_series" -[[datanode.region_engine]] - -[datanode.region_engine.file] +[[region_engine]] -[datanode.logging] -enable_otlp_tracing = false -append_stdout = true +[region_engine.file] -[datanode.export_metrics] +[export_metrics] enable = false -write_interval = "30s" - -[logging] -enable_otlp_tracing = false -append_stdout = true - -[wal_meta] -provider = "raft_engine""#, - store_type, - ); +write_interval = "30s""#, + store_type + ) + .trim() + .to_string(); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); assert_eq!(body_text, expected_toml_str); }