diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 93d8c569c95d..eaf0032c7715 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -57,6 +57,7 @@ runs: greptime/greptimedb-cluster \ --create-namespace \ -n my-greptimedb \ + --values ./.github/actions/setup-greptimedb-cluster/values.yaml \ --wait \ --wait-for-jobs - name: Wait for GreptimeDB diff --git a/.github/actions/setup-greptimedb-cluster/values.yaml b/.github/actions/setup-greptimedb-cluster/values.yaml new file mode 100644 index 000000000000..b7ac1eb86e17 --- /dev/null +++ b/.github/actions/setup-greptimedb-cluster/values.yaml @@ -0,0 +1,18 @@ +meta: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 +datanode: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 +frontend: + config: |- + [runtime] + read_rt_size = 8 + write_rt_size = 8 + bg_rt_size = 8 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 44ad55172287..306bccd24f43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2080,9 +2080,11 @@ dependencies = [ "common-macro", "common-telemetry", "lazy_static", + "num_cpus", "once_cell", "paste", "prometheus", + "serde", "snafu 0.8.3", "tokio", "tokio-metrics", @@ -11049,8 +11051,7 @@ dependencies = [ [[package]] name = "tokio-metrics-collector" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767da47381602cc481653456823b3ebb600e83d5dd4e0293da9b5566c6c00f0" +source = "git+https://github.com/MichaelScofield/tokio-metrics-collector.git?rev=89d692d5753d28564a7aac73c6ac5aba22243ba0#89d692d5753d28564a7aac73c6ac5aba22243ba0" dependencies = [ "lazy_static", "parking_lot 0.12.3", diff --git a/config/config.md b/config/config.md index 912e8ca7508c..5c4878c9d37d 100644 --- a/config/config.md +++ b/config/config.md @@ -13,6 +13,10 @@ | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | | `default_timezone` | String | `None` | The default timezone of the server. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. | @@ -154,6 +158,10 @@ | --- | -----| ------- | ----------- | | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `default_timezone` | String | `None` | The default timezone of the server. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | @@ -240,6 +248,10 @@ | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `procedure` | -- | -- | Procedure storage options. | | `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. | | `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially | @@ -300,6 +312,10 @@ | `rpc_max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `rpc_max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | +| `runtime` | -- | -- | The runtime options. | +| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | +| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | +| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index d1849048778c..3a20d3ac5f16 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -32,6 +32,15 @@ rpc_max_send_message_size = "512MB" ## Enable telemetry to collect anonymous usage data. enable_telemetry = true +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The heartbeat options. [heartbeat] ## Interval for sending heartbeat messages to the metasrv. diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 728a3099f837..4f4bd5bf3d3d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -5,6 +5,15 @@ mode = "standalone" ## +toml2docs:none-default default_timezone = "UTC" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The heartbeat options. [heartbeat] ## Interval for sending heartbeat messages to the metasrv. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index bc6a5d119342..239533bd5886 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -25,6 +25,15 @@ enable_telemetry = true ## If it's not empty, the metasrv will store all data with this key prefix. store_key_prefix = "" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## Procedure storage options. [procedure] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 8386c7e1e61a..d6fcc3e8943e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -8,6 +8,15 @@ enable_telemetry = true ## +toml2docs:none-default default_timezone = "UTC" +## The runtime options. +[runtime] +## The number of threads to execute the runtime for global read operations. +read_rt_size = 8 +## The number of threads to execute the runtime for global write operations. +write_rt_size = 8 +## The number of threads to execute the runtime for global background operations. +bg_rt_size = 8 + ## The HTTP server options. [http] ## The address to bind the HTTP server. diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 3c189f2c3d07..d8680ed5294e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -23,7 +23,6 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; -use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::service::DatanodeServiceBuilder; use meta_client::MetaClientOptions; @@ -34,11 +33,13 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; pub const APP_NAME: &str = "greptime-datanode"; +type DatanodeOptions = GreptimeOptions; + pub struct Instance { datanode: Datanode, @@ -97,7 +98,9 @@ impl Command { } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.subcmd.load_options(global_options) + match &self.subcmd { + SubCommand::Start(cmd) => cmd.load_options(global_options), + } } } @@ -112,12 +115,6 @@ impl SubCommand { SubCommand::Start(cmd) => cmd.build(opts).await, } } - - fn load_options(&self, global_options: &GlobalOptions) -> Result { - match self { - SubCommand::Start(cmd) => cmd.load_options(global_options), - } - } } #[derive(Debug, Parser, Default)] @@ -146,22 +143,25 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - DatanodeOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = DatanodeOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: DatanodeOptions, - ) -> Result { + opts: &mut DatanodeOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -231,25 +231,28 @@ impl StartCommand { // Disable dashboard in datanode. opts.http.disable_dashboard = true; - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: DatanodeOptions) -> Result { + async fn build(&self, opts: DatanodeOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + let guard = common_telemetry::init_global_logging( APP_NAME, - &opts.logging, - &opts.tracing, - opts.node_id.map(|x| x.to_string()), + &opts.component.logging, + &opts.component.tracing, + opts.component.node_id.map(|x| x.to_string()), ); log_versions(version!(), short_version!()); + info!("Datanode start command: {:#?}", self); + info!("Datanode options: {:#?}", opts); + + let mut opts = opts.component; let plugins = plugins::setup_datanode_plugins(&mut opts) .await .context(StartDatanodeSnafu)?; - info!("Datanode start command: {:#?}", self); - info!("Datanode options: {:#?}", opts); - let node_id = opts .node_id .context(MissingConfigSnafu { msg: "'node_id'" })?; @@ -353,7 +356,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); @@ -414,7 +417,8 @@ mod tests { fn test_try_from_cmd() { let opt = StartCommand::default() .load_options(&GlobalOptions::default()) - .unwrap(); + .unwrap() + .component; assert_eq!(Mode::Standalone, opt.mode); let opt = (StartCommand { @@ -423,7 +427,8 @@ mod tests { ..Default::default() }) .load_options(&GlobalOptions::default()) - .unwrap(); + .unwrap() + .component; assert_eq!(Mode::Distributed, opt.mode); assert!((StartCommand { @@ -454,7 +459,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -536,7 +542,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { @@ -562,7 +568,10 @@ mod tests { assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir"); // Should be default value. - assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr); + assert_eq!( + opts.http.addr, + DatanodeOptions::default().component.http.addr + ); }, ); } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index a3e744e9c7ec..a7781e37a2ed 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -29,7 +29,6 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; -use frontend::frontend::FrontendOptions; use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; @@ -44,9 +43,11 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; +type FrontendOptions = GreptimeOptions; + pub struct Instance { frontend: FeInstance, @@ -164,22 +165,25 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - FrontendOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = FrontendOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: FrontendOptions, - ) -> Result { + opts: &mut FrontendOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -242,26 +246,29 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: FrontendOptions) -> Result { + async fn build(&self, opts: FrontendOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + let guard = common_telemetry::init_global_logging( APP_NAME, - &opts.logging, - &opts.tracing, - opts.node_id.clone(), + &opts.component.logging, + &opts.component.tracing, + opts.component.node_id.clone(), ); log_versions(version!(), short_version!()); + info!("Frontend start command: {:#?}", self); + info!("Frontend options: {:#?}", opts); + + let mut opts = opts.component; #[allow(clippy::unnecessary_mut_passed)] let plugins = plugins::setup_frontend_plugins(&mut opts) .await .context(StartFrontendSnafu)?; - info!("Frontend start command: {:#?}", self); - info!("Frontend options: {:#?}", opts); - set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?; let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu { @@ -380,14 +387,14 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; assert_eq!(opts.http.addr, "127.0.0.1:1234"); assert_eq!(ReadableSize::mb(64), opts.http.body_limit); assert_eq!(opts.mysql.addr, "127.0.0.1:5678"); assert_eq!(opts.postgres.addr, "127.0.0.1:5432"); - let default_opts = FrontendOptions::default(); + let default_opts = FrontendOptions::default().component; assert_eq!(opts.grpc.addr, default_opts.grpc.addr); assert!(opts.mysql.enable); @@ -428,7 +435,8 @@ mod tests { ..Default::default() }; - let fe_opts = command.load_options(&GlobalOptions::default()).unwrap(); + let fe_opts = command.load_options(&Default::default()).unwrap().component; + assert_eq!(Mode::Distributed, fe_opts.mode); assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr); assert_eq!(Duration::from_secs(30), fe_opts.http.timeout); @@ -442,7 +450,7 @@ mod tests { #[tokio::test] async fn test_try_from_start_command_to_anymap() { - let mut fe_opts = FrontendOptions { + let mut fe_opts = frontend::frontend::FrontendOptions { http: HttpOptions { disable_dashboard: false, ..Default::default() @@ -479,7 +487,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -557,7 +566,7 @@ mod tests { ..Default::default() }; - let fe_opts = command.load_options(&GlobalOptions::default()).unwrap(); + let fe_opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(fe_opts.mysql.runtime_size, 11); diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 8648f220f3ac..3b89fdce112e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -21,14 +21,15 @@ use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; use meta_srv::bootstrap::MetasrvInstance; -use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; +type MetasrvOptions = GreptimeOptions; + pub const APP_NAME: &str = "greptime-metasrv"; pub struct Instance { @@ -139,22 +140,25 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - MetasrvOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + let mut opts = MetasrvOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: MetasrvOptions, - ) -> Result { + opts: &mut MetasrvOptions, + ) -> Result<()> { + let opts = &mut opts.component; + if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -217,21 +221,28 @@ impl StartCommand { // Disable dashboard in metasrv. opts.http.disable_dashboard = true; - Ok(opts) + Ok(()) } - async fn build(&self, mut opts: MetasrvOptions) -> Result { - let guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + async fn build(&self, opts: MetasrvOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + + let guard = common_telemetry::init_global_logging( + APP_NAME, + &opts.component.logging, + &opts.component.tracing, + None, + ); log_versions(version!(), short_version!()); + info!("Metasrv start command: {:#?}", self); + info!("Metasrv options: {:#?}", opts); + + let mut opts = opts.component; let plugins = plugins::setup_metasrv_plugins(&mut opts) .await .context(StartMetaServerSnafu)?; - info!("Metasrv start command: {:#?}", self); - info!("Metasrv options: {:#?}", opts); - let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) .await .context(error::BuildMetaServerSnafu)?; @@ -266,7 +277,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs); assert_eq!(SelectorType::LoadBased, options.selector); @@ -299,7 +310,7 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd.load_options(&Default::default()).unwrap().component; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs); @@ -349,7 +360,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; let logging_opt = options.logging; assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); @@ -406,7 +418,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(opts.bind_addr, "127.0.0.1:14002"); diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 03ccbc536247..26ac9203a225 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -13,6 +13,9 @@ // limitations under the License. use clap::Parser; +use common_config::Configurable; +use common_runtime::global::RuntimeOptions; +use serde::{Deserialize, Serialize}; #[derive(Parser, Default, Debug, Clone)] pub struct GlobalOptions { @@ -29,3 +32,22 @@ pub struct GlobalOptions { #[arg(global = true)] pub tokio_console_addr: Option, } + +// TODO(LFC): Move logging and tracing options into global options, like the runtime options. +/// All the options of GreptimeDB. +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(default)] +pub struct GreptimeOptions { + /// The runtime options. + pub runtime: RuntimeOptions, + + /// The options of each component (like Datanode or Standalone) of GreptimeDB. + #[serde(flatten)] + pub component: T, +} + +impl Configurable for GreptimeOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { + T::env_list_keys() + } +} diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 90958baf1048..e1ac35c98b06 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -67,7 +67,7 @@ use crate::error::{ ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; -use crate::options::GlobalOptions; +use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; pub const APP_NAME: &str = "greptime-standalone"; @@ -79,11 +79,14 @@ pub struct Command { } impl Command { - pub async fn build(&self, opts: StandaloneOptions) -> Result { + pub async fn build(&self, opts: GreptimeOptions) -> Result { self.subcmd.build(opts).await } - pub fn load_options(&self, global_options: &GlobalOptions) -> Result { + pub fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { self.subcmd.load_options(global_options) } } @@ -94,20 +97,23 @@ enum SubCommand { } impl SubCommand { - async fn build(&self, opts: StandaloneOptions) -> Result { + async fn build(&self, opts: GreptimeOptions) -> Result { match self { SubCommand::Start(cmd) => cmd.build(opts).await, } } - fn load_options(&self, global_options: &GlobalOptions) -> Result { + fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { match self { SubCommand::Start(cmd) => cmd.load_options(global_options), } } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct StandaloneOptions { pub mode: Mode, @@ -161,7 +167,7 @@ impl Default for StandaloneOptions { } } -impl Configurable<'_> for StandaloneOptions { +impl Configurable for StandaloneOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["wal.broker_endpoints"]) } @@ -291,23 +297,27 @@ pub struct StartCommand { } impl StartCommand { - fn load_options(&self, global_options: &GlobalOptions) -> Result { - self.merge_with_cli_options( - global_options, - StandaloneOptions::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - ) - .context(LoadLayeredConfigSnafu)?, + fn load_options( + &self, + global_options: &GlobalOptions, + ) -> Result> { + let mut opts = GreptimeOptions::::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts.component)?; + + Ok(opts) } // The precedence order is: cli > config file > environment variables > default values. pub fn merge_with_cli_options( &self, global_options: &GlobalOptions, - mut opts: StandaloneOptions, - ) -> Result { + opts: &mut StandaloneOptions, + ) -> Result<()> { // Should always be standalone mode. opts.mode = Mode::Standalone; @@ -369,20 +379,27 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(opts) + Ok(()) } #[allow(unreachable_code)] #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] - async fn build(&self, opts: StandaloneOptions) -> Result { - let guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + async fn build(&self, opts: GreptimeOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + + let guard = common_telemetry::init_global_logging( + APP_NAME, + &opts.component.logging, + &opts.component.tracing, + None, + ); log_versions(version!(), short_version!()); info!("Standalone start command: {:#?}", self); - info!("Building standalone instance with {opts:#?}"); + info!("Standalone options: {opts:#?}"); + let opts = opts.component; let mut fe_opts = opts.frontend_options(); #[allow(clippy::unnecessary_mut_passed)] let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it @@ -664,7 +681,10 @@ mod tests { ..Default::default() }; - let options = cmd.load_options(&GlobalOptions::default()).unwrap(); + let options = cmd + .load_options(&GlobalOptions::default()) + .unwrap() + .component; let fe_opts = options.frontend_options(); let dn_opts = options.datanode_options(); let logging_opts = options.logging; @@ -725,7 +745,8 @@ mod tests { #[cfg(feature = "tokio-console")] tokio_console_addr: None, }) - .unwrap(); + .unwrap() + .component; assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir); assert_eq!("debug", opts.logging.level.unwrap()); @@ -787,7 +808,7 @@ mod tests { ..Default::default() }; - let opts = command.load_options(&GlobalOptions::default()).unwrap(); + let opts = command.load_options(&Default::default()).unwrap().component; // Should be read from env, env > default values. assert_eq!(opts.logging.dir, "/other/log/dir"); diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs new file mode 100644 index 000000000000..80075b846e51 --- /dev/null +++ b/src/cmd/tests/load_config_test.rs @@ -0,0 +1,218 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use cmd::options::GreptimeOptions; +use cmd::standalone::StandaloneOptions; +use common_config::Configurable; +use common_runtime::global::RuntimeOptions; +use common_telemetry::logging::LoggingOptions; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig}; +use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; +use frontend::frontend::FrontendOptions; +use frontend::service_config::datanode::DatanodeClientOptions; +use meta_client::MetaClientOptions; +use meta_srv::metasrv::MetasrvOptions; +use meta_srv::selector::SelectorType; +use mito2::config::MitoConfig; +use servers::export_metrics::ExportMetricsOption; + +#[test] +fn test_load_datanode_example_config() { + let example_config = common_test_util::find_workspace_path("config/datanode.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: DatanodeOptions { + node_id: Some(42), + rpc_hostname: Some("127.0.0.1".to_string()), + meta_client: Some(MetaClientOptions { + metasrv_addrs: vec!["127.0.0.1:3002".to_string()], + timeout: Duration::from_secs(3), + heartbeat_timeout: Duration::from_millis(500), + ddl_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + tcp_nodelay: true, + metadata_cache_max_capacity: 100000, + metadata_cache_ttl: Duration::from_secs(600), + metadata_cache_tti: Duration::from_secs(300), + }), + wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { + dir: Some("/tmp/greptimedb/wal".to_string()), + sync_period: Some(Duration::from_secs(10)), + ..Default::default() + }), + storage: StorageConfig { + data_home: "/tmp/greptimedb/".to_string(), + ..Default::default() + }, + region_engine: vec![RegionEngineConfig::Mito(MitoConfig { + num_workers: 8, + auto_flush_interval: Duration::from_secs(3600), + scan_parallelism: 0, + ..Default::default() + })], + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + + assert_eq!(options, expected); +} + +#[test] +fn test_load_frontend_example_config() { + let example_config = common_test_util::find_workspace_path("config/frontend.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: FrontendOptions { + default_timezone: Some("UTC".to_string()), + meta_client: Some(MetaClientOptions { + metasrv_addrs: vec!["127.0.0.1:3002".to_string()], + timeout: Duration::from_secs(3), + heartbeat_timeout: Duration::from_millis(500), + ddl_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + tcp_nodelay: true, + metadata_cache_max_capacity: 100000, + metadata_cache_ttl: Duration::from_secs(600), + metadata_cache_tti: Duration::from_secs(300), + }), + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + datanode: frontend::service_config::DatanodeOptions { + client: DatanodeClientOptions { + connect_timeout: Duration::from_secs(10), + tcp_nodelay: true, + }, + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} + +#[test] +fn test_load_metasrv_example_config() { + let example_config = common_test_util::find_workspace_path("config/metasrv.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: MetasrvOptions { + selector: SelectorType::LeaseBased, + data_home: "/tmp/metasrv/".to_string(), + logging: LoggingOptions { + dir: "/tmp/greptimedb/logs".to_string(), + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} + +#[test] +fn test_load_standalone_example_config() { + let example_config = common_test_util::find_workspace_path("config/standalone.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + runtime: RuntimeOptions { + read_rt_size: 8, + write_rt_size: 8, + bg_rt_size: 8, + }, + component: StandaloneOptions { + default_timezone: Some("UTC".to_string()), + wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig { + dir: Some("/tmp/greptimedb/wal".to_string()), + sync_period: Some(Duration::from_secs(10)), + ..Default::default() + }), + region_engine: vec![RegionEngineConfig::Mito(MitoConfig { + num_workers: 8, + auto_flush_interval: Duration::from_secs(3600), + scan_parallelism: 0, + ..Default::default() + })], + storage: StorageConfig { + data_home: "/tmp/greptimedb/".to_string(), + ..Default::default() + }, + logging: LoggingOptions { + level: Some("info".to_string()), + otlp_endpoint: Some("".to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + export_metrics: ExportMetricsOption { + self_import: Some(Default::default()), + remote_write: Some(Default::default()), + ..Default::default() + }, + ..Default::default() + }, + }; + assert_eq!(options, expected); +} diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs index c21735a059ea..e0816fbd5671 100644 --- a/src/common/config/src/config.rs +++ b/src/common/config/src/config.rs @@ -13,7 +13,8 @@ // limitations under the License. use config::{Environment, File, FileFormat}; -use serde::{Deserialize, Serialize}; +use serde::de::DeserializeOwned; +use serde::Serialize; use snafu::ResultExt; use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu}; @@ -25,7 +26,7 @@ pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; /// Configuration trait defines the common interface for configuration that can be loaded from multiple sources and serialized to TOML. -pub trait Configurable<'de>: Serialize + Deserialize<'de> + Default + Sized { +pub trait Configurable: Serialize + DeserializeOwned + Default + Sized { /// Load the configuration from multiple sources and merge them. /// The precedence order is: config file > environment variables > default values. /// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx". @@ -128,7 +129,7 @@ mod tests { } } - impl Configurable<'_> for TestDatanodeConfig { + impl Configurable for TestDatanodeConfig { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs"]) } diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index a6da1f571fc2..e5fa276c4bf1 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -13,13 +13,15 @@ common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true lazy_static.workspace = true +num_cpus.workspace = true once_cell.workspace = true paste.workspace = true prometheus.workspace = true +serde.workspace = true snafu.workspace = true tokio.workspace = true tokio-metrics = "0.3" -tokio-metrics-collector = "0.2" +tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" } tokio-util.workspace = true [dev-dependencies] diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 51bad13107c7..6b21851e1680 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -19,6 +19,7 @@ use std::sync::{Mutex, Once}; use common_telemetry::info; use once_cell::sync::Lazy; use paste::paste; +use serde::{Deserialize, Serialize}; use crate::{Builder, JoinHandle, Runtime}; @@ -26,6 +27,28 @@ const READ_WORKERS: usize = 8; const WRITE_WORKERS: usize = 8; const BG_WORKERS: usize = 8; +/// The options for the global runtimes. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct RuntimeOptions { + /// The number of threads to execute the runtime for global read operations. + pub read_rt_size: usize, + /// The number of threads to execute the runtime for global write operations. + pub write_rt_size: usize, + /// The number of threads to execute the runtime for global background operations. + pub bg_rt_size: usize, +} + +impl Default for RuntimeOptions { + fn default() -> Self { + let cpus = num_cpus::get(); + Self { + read_rt_size: cpus, + write_rt_size: cpus, + bg_rt_size: cpus, + } + } +} + pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime { info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}."); Builder::default() @@ -112,18 +135,26 @@ static CONFIG_RUNTIMES: Lazy> = /// # Panics /// Panics when the global runtimes are already initialized. /// You should call this function before using any runtime functions. -pub fn init_global_runtimes( - read: Option, - write: Option, - background: Option, -) { +pub fn init_global_runtimes(options: &RuntimeOptions) { static START: Once = Once::new(); START.call_once(move || { let mut c = CONFIG_RUNTIMES.lock().unwrap(); assert!(!c.already_init, "Global runtimes already initialized"); - c.read_runtime = read; - c.write_runtime = write; - c.bg_runtime = background; + c.read_runtime = Some(create_runtime( + "global-read", + "global-read-worker", + options.read_rt_size, + )); + c.write_runtime = Some(create_runtime( + "global-write", + "global-write-worker", + options.write_rt_size, + )); + c.bg_runtime = Some(create_runtime( + "global-bg", + "global-bg-worker", + options.bg_rt_size, + )); }); } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 08baed46cbd3..ba6f74c96cc6 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -13,7 +13,7 @@ // limitations under the License. pub mod error; -mod global; +pub mod global; mod metrics; mod repeated_task; pub mod runtime; diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index 2b66dd45ce3a..b8084a2a8b3e 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true workspace = true [dependencies] -client.workspace = true +client = { workspace = true, features = ["testing"] } common-query.workspace = true common-recordbatch.workspace = true once_cell.workspace = true diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs index 47c949d40715..eb666e167a31 100644 --- a/src/common/test-util/src/recordbatch.rs +++ b/src/common/test-util/src/recordbatch.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use client::Database; use common_query::OutputData; use common_recordbatch::util; @@ -29,3 +30,25 @@ pub async fn check_output_stream(output: OutputData, expected: &str) { let pretty_print = recordbatches.pretty_print().unwrap(); assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print); } + +pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { + let output = db.sql(sql).await.unwrap(); + let output = output.data; + + match (&output, expected) { + (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { + assert_eq!( + *x, y, + r#" +expected: {y} +actual: {x} +"# + ) + } + (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) + | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { + check_output_stream(output, x).await + } + _ => panic!(), + } +} diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index ec278d3c4247..7e76c7d68169 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -15,7 +15,7 @@ //! Datanode configurations use common_base::readable_size::ReadableSize; -use common_base::secrets::SecretString; +use common_base::secrets::{ExposeSecret, SecretString}; use common_config::Configurable; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, @@ -38,7 +38,7 @@ pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; /// Object storage config -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] pub enum ObjectStoreConfig { File(FileConfig), @@ -61,7 +61,7 @@ impl ObjectStoreConfig { } /// Storage engine config -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct StorageConfig { /// The working directory of database @@ -85,7 +85,7 @@ impl Default for StorageConfig { #[serde(default)] pub struct FileConfig {} -#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] #[serde(default)] pub struct ObjectStorageCacheConfig { /// The local file cache directory @@ -109,6 +109,18 @@ pub struct S3Config { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for S3Config { + fn eq(&self, other: &Self) -> bool { + self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() + && self.endpoint == other.endpoint + && self.region == other.region + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct OssConfig { @@ -123,6 +135,17 @@ pub struct OssConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for OssConfig { + fn eq(&self, other: &Self) -> bool { + self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct AzblobConfig { @@ -138,6 +161,18 @@ pub struct AzblobConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for AzblobConfig { + fn eq(&self, other: &Self) -> bool { + self.container == other.container + && self.root == other.root + && self.account_name.expose_secret() == other.account_name.expose_secret() + && self.account_key.expose_secret() == other.account_key.expose_secret() + && self.endpoint == other.endpoint + && self.sas_token == other.sas_token + && self.cache == other.cache + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct GcsConfig { @@ -151,6 +186,17 @@ pub struct GcsConfig { pub cache: ObjectStorageCacheConfig, } +impl PartialEq for GcsConfig { + fn eq(&self, other: &Self) -> bool { + self.root == other.root + && self.bucket == other.bucket + && self.scope == other.scope + && self.credential_path.expose_secret() == other.credential_path.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + } +} + impl Default for S3Config { fn default() -> Self { Self { @@ -211,7 +257,7 @@ impl Default for ObjectStoreConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct DatanodeOptions { pub mode: Mode, @@ -267,7 +313,7 @@ impl Default for DatanodeOptions { } } -impl Configurable<'_> for DatanodeOptions { +impl Configurable for DatanodeOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"]) } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index f0dfac1c7d5c..7907ff20ffe0 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -74,7 +74,7 @@ impl Default for FrontendOptions { } } -impl Configurable<'_> for FrontendOptions { +impl Configurable for FrontendOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs"]) } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a1cc8934270f..c04770313a11 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -188,7 +188,7 @@ impl Instance { pub fn build_servers( &mut self, - opts: impl Into + for<'de> Configurable<'de>, + opts: impl Into + Configurable, servers: ServerHandlers, ) -> Result<()> { let opts: FrontendOptions = opts.into(); diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index f5a0afb53016..268bc7db4ae6 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -39,7 +39,7 @@ use crate::service_config::GrpcOptions; pub struct Services where - T: Into + for<'de> Configurable<'de> + Clone, + T: Into + Configurable + Clone, U: FrontendInstance, { opts: T, @@ -51,7 +51,7 @@ where impl Services where - T: Into + for<'de> Configurable<'de> + Clone, + T: Into + Configurable + Clone, U: FrontendInstance, { pub fn new(opts: T, instance: Arc, plugins: Plugins) -> Self { diff --git a/src/frontend/src/service_config/datanode.rs b/src/frontend/src/service_config/datanode.rs index ccf2b2ebf4c7..3b4de67b48c1 100644 --- a/src/frontend/src/service_config/datanode.rs +++ b/src/frontend/src/service_config/datanode.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct DatanodeOptions { - client: DatanodeClientOptions, + pub client: DatanodeClientOptions, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 76fb794f797c..ce812cfba80f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -148,7 +148,7 @@ impl Default for MetasrvOptions { } } -impl Configurable<'_> for MetasrvOptions { +impl Configurable for MetasrvOptions { fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["wal.broker_endpoints"]) } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 7cbd640820b1..4c1b4641c9f4 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,16 +21,13 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; -use client::Database; use common_base::secrets::ExposeSecret; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_query::OutputData; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::warn; use common_test_util::ports; -use common_test_util::recordbatch::{check_output_stream, ExpectedOutput}; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use common_wal::config::DatanodeWalConfig; use datanode::config::{ @@ -690,25 +687,3 @@ where test(endpoints).await } - -pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) { - let output = db.sql(sql).await.unwrap(); - let output = output.data; - - match (&output, expected) { - (OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => { - assert_eq!( - *x, y, - r#" -expected: {y} -actual: {x} -"# - ) - } - (OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x)) - | (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => { - check_output_stream(output, x).await - } - _ => panic!(), - } -} diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7d1f9d57768f..33332170db16 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -28,6 +28,7 @@ use common_grpc::channel_manager::ClientTlsOption; use common_query::Output; use common_recordbatch::RecordBatches; use common_runtime::Runtime; +use common_test_util::find_workspace_path; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::GrpcServerConfig; use servers::http::prometheus::{ @@ -732,10 +733,7 @@ async fn to_batch(output: Output) -> String { } pub async fn test_grpc_tls_config(store_type: StorageType) { - let comm_dir = std::path::PathBuf::from_iter([ - std::env!("CARGO_RUSTC_CURRENT_DIR"), - "src/common/grpc/tests/tls", - ]); + let comm_dir = find_workspace_path("/src/common/grpc/tests/tls"); let ca_path = comm_dir.join("ca.pem").to_str().unwrap().to_string(); let server_cert_path = comm_dir.join("server.pem").to_str().unwrap().to_string(); let server_key_path = comm_dir.join("server.key").to_str().unwrap().to_string();