From 181e16a11aed99d19caee1161039b81e48220ba0 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:44:29 +0800 Subject: [PATCH] refactor: make instance started separately (#2911) * refactor: make instance started separately, to support further integrated into other binaries * fix: resolve PR comments * fix: resolve PR comments --- src/cmd/src/bin/greptime.rs | 205 +++++++----------------------- src/cmd/src/cli.rs | 52 +++++--- src/cmd/src/cli/bench.rs | 2 +- src/cmd/src/cli/export.rs | 2 +- src/cmd/src/cli/upgrade.rs | 2 +- src/cmd/src/datanode.rs | 62 +++++---- src/cmd/src/frontend.rs | 60 +++++---- src/cmd/src/lib.rs | 101 +++++++++++++++ src/cmd/src/metasrv.rs | 56 +++++--- src/cmd/src/options.rs | 31 ++++- src/cmd/src/standalone.rs | 69 +++++----- src/datanode/src/region_server.rs | 17 ++- 12 files changed, 372 insertions(+), 287 deletions(-) diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index efaf26ec0fc9..d8f19df4b613 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -16,79 +16,12 @@ use std::fmt; -use clap::Parser; +use clap::{FromArgMatches, Parser, Subcommand}; use cmd::error::Result; -use cmd::options::{Options, TopLevelOptions}; -use cmd::{cli, datanode, frontend, metasrv, standalone}; -use common_telemetry::logging::{error, info, TracingOptions}; - -lazy_static::lazy_static! { - static ref APP_VERSION: prometheus::IntGaugeVec = - prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap(); -} - -#[derive(Parser)] -#[clap(name = "greptimedb", version = print_version())] -struct Command { - #[clap(long)] - log_dir: Option, - #[clap(long)] - log_level: Option, - #[clap(subcommand)] - subcmd: SubCommand, - - #[cfg(feature = "tokio-console")] - #[clap(long)] - tokio_console_addr: Option, -} - -pub enum Application { - Datanode(datanode::Instance), - Frontend(frontend::Instance), - Metasrv(metasrv::Instance), - Standalone(standalone::Instance), - Cli(cli::Instance), -} - -impl Application { - async fn start(&mut self) -> Result<()> { - match self { - Application::Datanode(instance) => instance.start().await, - Application::Frontend(instance) => instance.start().await, - Application::Metasrv(instance) => instance.start().await, - Application::Standalone(instance) => instance.start().await, - Application::Cli(instance) => instance.start().await, - } - } - - async fn stop(&self) -> Result<()> { - match self { - Application::Datanode(instance) => instance.stop().await, - Application::Frontend(instance) => instance.stop().await, - Application::Metasrv(instance) => instance.stop().await, - Application::Standalone(instance) => instance.stop().await, - Application::Cli(instance) => instance.stop().await, - } - } -} - -impl Command { - async fn build(self, opts: Options) -> Result { - self.subcmd.build(opts).await - } - - fn load_options(&self) -> Result { - let top_level_opts = self.top_level_options(); - self.subcmd.load_options(top_level_opts) - } - - fn top_level_options(&self) -> TopLevelOptions { - TopLevelOptions { - log_dir: self.log_dir.clone(), - log_level: self.log_level.clone(), - } - } -} +use cmd::options::{CliOptions, Options}; +use cmd::{ + cli, datanode, frontend, greptimedb_cli, log_versions, metasrv, standalone, start_app, App, +}; #[derive(Parser)] enum SubCommand { @@ -105,40 +38,41 @@ enum SubCommand { } impl SubCommand { - async fn build(self, opts: Options) -> Result { - match (self, opts) { + async fn build(self, opts: Options) -> Result> { + let app: Box = match (self, opts) { (SubCommand::Datanode(cmd), Options::Datanode(dn_opts)) => { let app = cmd.build(*dn_opts).await?; - Ok(Application::Datanode(app)) + Box::new(app) as _ } (SubCommand::Frontend(cmd), Options::Frontend(fe_opts)) => { let app = cmd.build(*fe_opts).await?; - Ok(Application::Frontend(app)) + Box::new(app) as _ } (SubCommand::Metasrv(cmd), Options::Metasrv(meta_opts)) => { let app = cmd.build(*meta_opts).await?; - Ok(Application::Metasrv(app)) + Box::new(app) as _ } (SubCommand::Standalone(cmd), Options::Standalone(opts)) => { let app = cmd.build(*opts).await?; - Ok(Application::Standalone(app)) + Box::new(app) as _ } (SubCommand::Cli(cmd), Options::Cli(_)) => { let app = cmd.build().await?; - Ok(Application::Cli(app)) + Box::new(app) as _ } _ => unreachable!(), - } + }; + Ok(app) } - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { match self { - SubCommand::Datanode(cmd) => cmd.load_options(top_level_opts), - SubCommand::Frontend(cmd) => cmd.load_options(top_level_opts), - SubCommand::Metasrv(cmd) => cmd.load_options(top_level_opts), - SubCommand::Standalone(cmd) => cmd.load_options(top_level_opts), - SubCommand::Cli(cmd) => cmd.load_options(top_level_opts), + SubCommand::Datanode(cmd) => cmd.load_options(cli_options), + SubCommand::Frontend(cmd) => cmd.load_options(cli_options), + SubCommand::Metasrv(cmd) => cmd.load_options(cli_options), + SubCommand::Standalone(cmd) => cmd.load_options(cli_options), + SubCommand::Cli(cmd) => cmd.load_options(cli_options), } } } @@ -155,90 +89,41 @@ impl fmt::Display for SubCommand { } } -fn print_version() -> &'static str { - concat!( - "\nbranch: ", - env!("GIT_BRANCH"), - "\ncommit: ", - env!("GIT_COMMIT"), - "\ndirty: ", - env!("GIT_DIRTY"), - "\nversion: ", - env!("CARGO_PKG_VERSION") - ) -} - -fn short_version() -> &'static str { - env!("CARGO_PKG_VERSION") -} - -// {app_name}-{branch_name}-{commit_short} -// The branch name (tag) of a release build should already contain the short -// version so the full version doesn't concat the short version explicitly. -fn full_version() -> &'static str { - concat!( - "greptimedb-", - env!("GIT_BRANCH"), - "-", - env!("GIT_COMMIT_SHORT") - ) -} - -fn log_env_flags() { - info!("command line arguments"); - for argument in std::env::args() { - info!("argument: {}", argument); - } -} - #[cfg(not(windows))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[tokio::main] async fn main() -> Result<()> { - let cmd = Command::parse(); - let app_name = &cmd.subcmd.to_string(); - - let opts = cmd.load_options()?; - let logging_opts = opts.logging_options(); - let tracing_opts = TracingOptions { - #[cfg(feature = "tokio-console")] - tokio_console_addr: cmd.tokio_console_addr.clone(), + common_telemetry::set_panic_hook(); + + let cli = greptimedb_cli(); + + let cli = SubCommand::augment_subcommands(cli); + + let args = cli.get_matches(); + + let subcmd = match SubCommand::from_arg_matches(&args) { + Ok(subcmd) => subcmd, + Err(e) => e.exit(), }; - common_telemetry::set_panic_hook(); - let _guard = - common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id()); - - // Report app version as gauge. - APP_VERSION - .with_label_values(&[short_version(), full_version()]) - .inc(); - - // Log version and argument flags. - info!( - "short_version: {}, full_version: {}", - short_version(), - full_version() + let app_name = subcmd.to_string(); + + let cli_options = CliOptions::new(&args); + + let opts = subcmd.load_options(&cli_options)?; + + let _guard = common_telemetry::init_global_logging( + &app_name, + opts.logging_options(), + cli_options.tracing_options(), + opts.node_id(), ); - log_env_flags(); - let mut app = cmd.build(opts).await?; + log_versions(); - tokio::select! { - result = app.start() => { - if let Err(err) = result { - error!(err; "Fatal error occurs!"); - } - } - _ = tokio::signal::ctrl_c() => { - if let Err(err) = app.stop().await { - error!(err; "Fatal error occurs!"); - } - info!("Goodbye!"); - } - } + let app = subcmd.build(opts).await?; - Ok(()) + start_app(app).await } diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index ca76e6591ae8..409c8e4d54d0 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -13,9 +13,15 @@ // limitations under the License. mod bench; + +// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 +#[allow(unused)] mod cmd; mod export; mod helper; + +// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 +#[allow(unused)] mod repl; // TODO(weny): Removes it #[allow(deprecated)] @@ -30,27 +36,35 @@ use upgrade::UpgradeCommand; use self::export::ExportCommand; use crate::error::Result; -use crate::options::{Options, TopLevelOptions}; +use crate::options::{CliOptions, Options}; +use crate::App; #[async_trait] -pub trait Tool { +pub trait Tool: Send + Sync { async fn do_work(&self) -> Result<()>; } -pub enum Instance { - Repl(Repl), - Tool(Box), +pub struct Instance { + tool: Box, } impl Instance { - pub async fn start(&mut self) -> Result<()> { - match self { - Instance::Repl(repl) => repl.run().await, - Instance::Tool(tool) => tool.do_work().await, - } + fn new(tool: Box) -> Self { + Self { tool } + } +} + +#[async_trait] +impl App for Instance { + fn name(&self) -> &str { + "greptime-cli" + } + + async fn start(&mut self) -> Result<()> { + self.tool.do_work().await } - pub async fn stop(&self) -> Result<()> { + async fn stop(&self) -> Result<()> { Ok(()) } } @@ -66,14 +80,15 @@ impl Command { self.cmd.build().await } - pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + pub fn load_options(&self, cli_options: &CliOptions) -> Result { let mut logging_opts = LoggingOptions::default(); - if let Some(dir) = top_level_opts.log_dir { - logging_opts.dir = dir; - } - if top_level_opts.log_level.is_some() { - logging_opts.level = top_level_opts.log_level; + + if let Some(dir) = &cli_options.log_dir { + logging_opts.dir = dir.clone(); } + + logging_opts.level = cli_options.log_level.clone(); + Ok(Options::Cli(Box::new(logging_opts))) } } @@ -110,7 +125,6 @@ pub(crate) struct AttachCommand { impl AttachCommand { #[allow(dead_code)] async fn build(self) -> Result { - let repl = Repl::try_new(&self).await?; - Ok(Instance::Repl(repl)) + unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373") } } diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 7ec5956adca7..481d22d0b536 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -69,7 +69,7 @@ impl BenchTableMetadataCommand { table_metadata_manager, count: self.count, }; - Ok(Instance::Tool(Box::new(tool))) + Ok(Instance::new(Box::new(tool))) } } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 5932df399cfc..79c1694a1c21 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -105,7 +105,7 @@ impl ExportCommand { })); } - Ok(Instance::Tool(Box::new(Export { + Ok(Instance::new(Box::new(Export { client: database_client, catalog, schema, diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 6a996bca6b20..ac52e94b2b5e 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -76,7 +76,7 @@ impl UpgradeCommand { skip_schema_keys: self.skip_schema_keys, skip_table_route_keys: self.skip_table_route_keys, }; - Ok(Instance::Tool(Box::new(tool))) + Ok(Instance::new(Box::new(tool))) } } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index e1d3163d0164..0d069fa5e296 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use catalog::kvbackend::MetaKvBackend; use clap::Parser; use common_telemetry::logging; @@ -25,14 +26,26 @@ use servers::Mode; use snafu::{OptionExt, ResultExt}; use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu}; -use crate::options::{Options, TopLevelOptions}; +use crate::options::{CliOptions, Options}; +use crate::App; pub struct Instance { datanode: Datanode, } impl Instance { - pub async fn start(&mut self) -> Result<()> { + fn new(datanode: Datanode) -> Self { + Self { datanode } + } +} + +#[async_trait] +impl App for Instance { + fn name(&self) -> &str { + "greptime-datanode" + } + + async fn start(&mut self) -> Result<()> { plugins::start_datanode_plugins(self.datanode.plugins()) .await .context(StartDatanodeSnafu)?; @@ -40,7 +53,7 @@ impl Instance { self.datanode.start().await.context(StartDatanodeSnafu) } - pub async fn stop(&self) -> Result<()> { + async fn stop(&self) -> Result<()> { self.datanode .shutdown() .await @@ -59,8 +72,8 @@ impl Command { self.subcmd.build(opts).await } - pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - self.subcmd.load_options(top_level_opts) + pub fn load_options(&self, cli_options: &CliOptions) -> Result { + self.subcmd.load_options(cli_options) } } @@ -76,9 +89,9 @@ impl SubCommand { } } - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { match self { - SubCommand::Start(cmd) => cmd.load_options(top_level_opts), + SubCommand::Start(cmd) => cmd.load_options(cli_options), } } } @@ -108,19 +121,19 @@ struct StartCommand { } impl StartCommand { - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { let mut opts: DatanodeOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), DatanodeOptions::env_list_keys(), )?; - if let Some(dir) = top_level_opts.log_dir { - opts.logging.dir = dir; + if let Some(dir) = &cli_options.log_dir { + opts.logging.dir = dir.clone(); } - if top_level_opts.log_level.is_some() { - opts.logging.level = top_level_opts.log_level; + if cli_options.log_level.is_some() { + opts.logging.level = cli_options.log_level.clone(); } if let Some(addr) = &self.rpc_addr { @@ -204,7 +217,7 @@ impl StartCommand { .await .context(StartDatanodeSnafu)?; - Ok(Instance { datanode }) + Ok(Instance::new(datanode)) } } @@ -219,7 +232,7 @@ mod tests { use servers::Mode; use super::*; - use crate::options::ENV_VAR_SEP; + use crate::options::{CliOptions, ENV_VAR_SEP}; #[test] fn test_read_from_config_file() { @@ -274,8 +287,7 @@ mod tests { ..Default::default() }; - let Options::Datanode(options) = cmd.load_options(TopLevelOptions::default()).unwrap() - else { + let Options::Datanode(options) = cmd.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; @@ -331,7 +343,7 @@ mod tests { #[test] fn test_try_from_cmd() { if let Options::Datanode(opt) = StartCommand::default() - .load_options(TopLevelOptions::default()) + .load_options(&CliOptions::default()) .unwrap() { assert_eq!(Mode::Standalone, opt.mode) @@ -342,7 +354,7 @@ mod tests { metasrv_addr: Some(vec!["127.0.0.1:3002".to_string()]), ..Default::default() }) - .load_options(TopLevelOptions::default()) + .load_options(&CliOptions::default()) .unwrap() { assert_eq!(Mode::Distributed, opt.mode) @@ -352,7 +364,7 @@ mod tests { metasrv_addr: Some(vec!["127.0.0.1:3002".to_string()]), ..Default::default() }) - .load_options(TopLevelOptions::default()) + .load_options(&CliOptions::default()) .is_err()); // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value @@ -360,18 +372,21 @@ mod tests { node_id: Some(42), ..Default::default() }) - .load_options(TopLevelOptions::default()) + .load_options(&CliOptions::default()) .is_ok()); } #[test] - fn test_top_level_options() { + fn test_load_log_options_from_cli() { let cmd = StartCommand::default(); let options = cmd - .load_options(TopLevelOptions { + .load_options(&CliOptions { log_dir: Some("/tmp/greptimedb/test/logs".to_string()), log_level: Some("debug".to_string()), + + #[cfg(feature = "tokio-console")] + tokio_console_addr: None, }) .unwrap(); @@ -454,8 +469,7 @@ mod tests { ..Default::default() }; - let Options::Datanode(opts) = - command.load_options(TopLevelOptions::default()).unwrap() + let Options::Datanode(opts) = command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 3362785da822..f863b41863eb 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use catalog::kvbackend::CachedMetaKvBackend; use clap::Parser; use client::client_manager::DatanodeClients; @@ -32,14 +33,26 @@ use servers::Mode; use snafu::{OptionExt, ResultExt}; use crate::error::{self, MissingConfigSnafu, Result, StartFrontendSnafu}; -use crate::options::{Options, TopLevelOptions}; +use crate::options::{CliOptions, Options}; +use crate::App; pub struct Instance { frontend: FeInstance, } impl Instance { - pub async fn start(&mut self) -> Result<()> { + fn new(frontend: FeInstance) -> Self { + Self { frontend } + } +} + +#[async_trait] +impl App for Instance { + fn name(&self) -> &str { + "greptime-frontend" + } + + async fn start(&mut self) -> Result<()> { plugins::start_frontend_plugins(self.frontend.plugins().clone()) .await .context(StartFrontendSnafu)?; @@ -47,7 +60,7 @@ impl Instance { self.frontend.start().await.context(StartFrontendSnafu) } - pub async fn stop(&self) -> Result<()> { + async fn stop(&self) -> Result<()> { self.frontend .shutdown() .await @@ -66,8 +79,8 @@ impl Command { self.subcmd.build(opts).await } - pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - self.subcmd.load_options(top_level_opts) + pub fn load_options(&self, cli_options: &CliOptions) -> Result { + self.subcmd.load_options(cli_options) } } @@ -83,9 +96,9 @@ impl SubCommand { } } - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { match self { - SubCommand::Start(cmd) => cmd.load_options(top_level_opts), + SubCommand::Start(cmd) => cmd.load_options(cli_options), } } } @@ -125,19 +138,19 @@ pub struct StartCommand { } impl StartCommand { - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { let mut opts: FrontendOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), FrontendOptions::env_list_keys(), )?; - if let Some(dir) = top_level_opts.log_dir { - opts.logging.dir = dir; + if let Some(dir) = &cli_options.log_dir { + opts.logging.dir = dir.clone(); } - if top_level_opts.log_level.is_some() { - opts.logging.level = top_level_opts.log_level; + if cli_options.log_level.is_some() { + opts.logging.level = cli_options.log_level.clone(); } let tls_opts = TlsOption::new( @@ -241,7 +254,7 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - Ok(Instance { frontend: instance }) + Ok(Instance::new(instance)) } } @@ -257,7 +270,7 @@ mod tests { use servers::http::HttpOptions; use super::*; - use crate::options::ENV_VAR_SEP; + use crate::options::{CliOptions, ENV_VAR_SEP}; #[test] fn test_try_from_start_command() { @@ -271,8 +284,7 @@ mod tests { ..Default::default() }; - let Options::Frontend(opts) = command.load_options(TopLevelOptions::default()).unwrap() - else { + let Options::Frontend(opts) = command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; @@ -324,7 +336,7 @@ mod tests { ..Default::default() }; - let Options::Frontend(fe_opts) = command.load_options(TopLevelOptions::default()).unwrap() + let Options::Frontend(fe_opts) = command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; @@ -363,16 +375,19 @@ mod tests { } #[test] - fn test_top_level_options() { + fn test_load_log_options_from_cli() { let cmd = StartCommand { disable_dashboard: Some(false), ..Default::default() }; let options = cmd - .load_options(TopLevelOptions { + .load_options(&CliOptions { log_dir: Some("/tmp/greptimedb/test/logs".to_string()), log_level: Some("debug".to_string()), + + #[cfg(feature = "tokio-console")] + tokio_console_addr: None, }) .unwrap(); @@ -452,11 +467,8 @@ mod tests { ..Default::default() }; - let top_level_opts = TopLevelOptions { - log_dir: None, - log_level: Some("error".to_string()), - }; - let Options::Frontend(fe_opts) = command.load_options(top_level_opts).unwrap() + let Options::Frontend(fe_opts) = + command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index b10af430954e..66143492936d 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -14,6 +14,10 @@ #![feature(assert_matches)] +use async_trait::async_trait; +use clap::arg; +use common_telemetry::{error, info}; + pub mod cli; pub mod datanode; pub mod error; @@ -21,3 +25,100 @@ pub mod frontend; pub mod metasrv; pub mod options; pub mod standalone; + +lazy_static::lazy_static! { + static ref APP_VERSION: prometheus::IntGaugeVec = + prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap(); +} + +#[async_trait] +pub trait App { + fn name(&self) -> &str; + + async fn start(&mut self) -> error::Result<()>; + + async fn stop(&self) -> error::Result<()>; +} + +pub async fn start_app(mut app: Box) -> error::Result<()> { + let name = app.name().to_string(); + + tokio::select! { + result = app.start() => { + if let Err(err) = result { + error!(err; "Failed to start app {name}!"); + } + } + _ = tokio::signal::ctrl_c() => { + if let Err(err) = app.stop().await { + error!(err; "Failed to stop app {name}!"); + } + info!("Goodbye!"); + } + } + + Ok(()) +} + +pub fn log_versions() { + // Report app version as gauge. + APP_VERSION + .with_label_values(&[short_version(), full_version()]) + .inc(); + + // Log version and argument flags. + info!( + "short_version: {}, full_version: {}", + short_version(), + full_version() + ); + + log_env_flags(); +} + +pub fn greptimedb_cli() -> clap::Command { + let cmd = clap::Command::new("greptimedb") + .version(print_version()) + .subcommand_required(true); + + #[cfg(feature = "tokio-console")] + let cmd = cmd.arg(arg!(--"tokio-console-addr"[TOKIO_CONSOLE_ADDR])); + + cmd.args([arg!(--"log-dir"[LOG_DIR]), arg!(--"log-level"[LOG_LEVEL])]) +} + +fn print_version() -> &'static str { + concat!( + "\nbranch: ", + env!("GIT_BRANCH"), + "\ncommit: ", + env!("GIT_COMMIT"), + "\ndirty: ", + env!("GIT_DIRTY"), + "\nversion: ", + env!("CARGO_PKG_VERSION") + ) +} + +fn short_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} + +// {app_name}-{branch_name}-{commit_short} +// The branch name (tag) of a release build should already contain the short +// version so the full version doesn't concat the short version explicitly. +fn full_version() -> &'static str { + concat!( + "greptimedb-", + env!("GIT_BRANCH"), + "-", + env!("GIT_COMMIT_SHORT") + ) +} + +fn log_env_flags() { + info!("command line arguments"); + for argument in std::env::args() { + info!("argument: {}", argument); + } +} diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 78b99bc0ac53..ba41649a8e9c 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -14,6 +14,7 @@ use std::time::Duration; +use async_trait::async_trait; use clap::Parser; use common_telemetry::logging; use meta_srv::bootstrap::MetaSrvInstance; @@ -21,21 +22,34 @@ use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; use crate::error::{self, Result, StartMetaServerSnafu}; -use crate::options::{Options, TopLevelOptions}; +use crate::options::{CliOptions, Options}; +use crate::App; pub struct Instance { instance: MetaSrvInstance, } impl Instance { - pub async fn start(&mut self) -> Result<()> { + fn new(instance: MetaSrvInstance) -> Self { + Self { instance } + } +} + +#[async_trait] +impl App for Instance { + fn name(&self) -> &str { + "greptime-metasrv" + } + + async fn start(&mut self) -> Result<()> { plugins::start_meta_srv_plugins(self.instance.plugins()) .await .context(StartMetaServerSnafu)?; + self.instance.start().await.context(StartMetaServerSnafu) } - pub async fn stop(&self) -> Result<()> { + async fn stop(&self) -> Result<()> { self.instance .shutdown() .await @@ -54,8 +68,8 @@ impl Command { self.subcmd.build(opts).await } - pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - self.subcmd.load_options(top_level_opts) + pub fn load_options(&self, cli_options: &CliOptions) -> Result { + self.subcmd.load_options(cli_options) } } @@ -71,9 +85,9 @@ impl SubCommand { } } - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { match self { - SubCommand::Start(cmd) => cmd.load_options(top_level_opts), + SubCommand::Start(cmd) => cmd.load_options(cli_options), } } } @@ -106,19 +120,19 @@ struct StartCommand { } impl StartCommand { - fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { let mut opts: MetaSrvOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), None, )?; - if let Some(dir) = top_level_opts.log_dir { - opts.logging.dir = dir; + if let Some(dir) = &cli_options.log_dir { + opts.logging.dir = dir.clone(); } - if top_level_opts.log_level.is_some() { - opts.logging.level = top_level_opts.log_level; + if cli_options.log_level.is_some() { + opts.logging.level = cli_options.log_level.clone(); } if let Some(addr) = &self.bind_addr { @@ -182,7 +196,7 @@ impl StartCommand { .await .context(error::BuildMetaServerSnafu)?; - Ok(Instance { instance }) + Ok(Instance::new(instance)) } } @@ -206,8 +220,7 @@ mod tests { ..Default::default() }; - let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap() - else { + let Options::Metasrv(options) = cmd.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); @@ -242,8 +255,7 @@ mod tests { ..Default::default() }; - let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap() - else { + let Options::Metasrv(options) = cmd.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); @@ -274,7 +286,7 @@ mod tests { } #[test] - fn test_top_level_options() { + fn test_load_log_options_from_cli() { let cmd = StartCommand { bind_addr: Some("127.0.0.1:3002".to_string()), server_addr: Some("127.0.0.1:3002".to_string()), @@ -284,9 +296,12 @@ mod tests { }; let options = cmd - .load_options(TopLevelOptions { + .load_options(&CliOptions { log_dir: Some("/tmp/greptimedb/test/logs".to_string()), log_level: Some("debug".to_string()), + + #[cfg(feature = "tokio-console")] + tokio_console_addr: None, }) .unwrap(); @@ -345,8 +360,7 @@ mod tests { ..Default::default() }; - let Options::Metasrv(opts) = - command.load_options(TopLevelOptions::default()).unwrap() + let Options::Metasrv(opts) = command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 5e730de7d2c5..39a3d94e6de3 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use clap::ArgMatches; use common_config::KvBackendConfig; -use common_telemetry::logging::LoggingOptions; +use common_telemetry::logging::{LoggingOptions, TracingOptions}; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; use frontend::error::{Result as FeResult, TomlFormatSnafu}; @@ -28,7 +29,7 @@ pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; /// Options mixed up from datanode, frontend and metasrv. -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Clone)] pub struct MixOptions { pub data_home: String, pub procedure: ProcedureConfig, @@ -58,10 +59,32 @@ pub enum Options { Cli(Box), } -#[derive(Clone, Debug, Default)] -pub struct TopLevelOptions { +#[derive(Default)] +pub struct CliOptions { pub log_dir: Option, pub log_level: Option, + + #[cfg(feature = "tokio-console")] + pub tokio_console_addr: Option, +} + +impl CliOptions { + pub fn new(args: &ArgMatches) -> Self { + Self { + log_dir: args.get_one::("log-dir").cloned(), + log_level: args.get_one::("log-level").cloned(), + + #[cfg(feature = "tokio-console")] + tokio_console_addr: args.get_one::("tokio-console-addr").cloned(), + } + } + + pub fn tracing_options(&self) -> TracingOptions { + TracingOptions { + #[cfg(feature = "tokio-console")] + tokio_console_addr: self.tokio_console_addr.clone(), + } + } } impl Options { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 63b1a8914806..e92d6a36f78b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -15,11 +15,12 @@ use std::sync::Arc; use std::{fs, path}; +use async_trait::async_trait; use clap::Parser; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; -use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; use common_meta::ddl_manager::DdlManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -49,7 +50,8 @@ use crate::error::{ ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; -use crate::options::{MixOptions, Options, TopLevelOptions}; +use crate::options::{CliOptions, MixOptions, Options}; +use crate::App; #[derive(Parser)] pub struct Command { @@ -62,8 +64,8 @@ impl Command { self.subcmd.build(opts).await } - pub fn load_options(&self, top_level_options: TopLevelOptions) -> Result { - self.subcmd.load_options(top_level_options) + pub fn load_options(&self, cli_options: &CliOptions) -> Result { + self.subcmd.load_options(cli_options) } } @@ -79,9 +81,9 @@ impl SubCommand { } } - fn load_options(&self, top_level_options: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { match self { - SubCommand::Start(cmd) => cmd.load_options(top_level_options), + SubCommand::Start(cmd) => cmd.load_options(cli_options), } } } @@ -171,8 +173,13 @@ pub struct Instance { procedure_manager: ProcedureManagerRef, } -impl Instance { - pub async fn start(&mut self) -> Result<()> { +#[async_trait] +impl App for Instance { + fn name(&self) -> &str { + "greptime-standalone" + } + + async fn start(&mut self) -> Result<()> { self.datanode.start_telemetry(); self.procedure_manager @@ -184,7 +191,7 @@ impl Instance { Ok(()) } - pub async fn stop(&self) -> Result<()> { + async fn stop(&self) -> Result<()> { self.frontend .shutdown() .await @@ -206,7 +213,7 @@ impl Instance { } #[derive(Debug, Default, Parser)] -struct StartCommand { +pub struct StartCommand { #[clap(long)] http_addr: Option, #[clap(long)] @@ -220,7 +227,7 @@ struct StartCommand { #[clap(short, long)] influxdb_enable: bool, #[clap(short, long)] - config_file: Option, + pub config_file: Option, #[clap(long)] tls_mode: Option, #[clap(long)] @@ -230,14 +237,14 @@ struct StartCommand { #[clap(long)] user_provider: Option, #[clap(long, default_value = "GREPTIMEDB_STANDALONE")] - env_prefix: String, + pub env_prefix: String, /// The working home directory of this standalone instance. #[clap(long)] data_home: Option, } impl StartCommand { - fn load_options(&self, top_level_options: TopLevelOptions) -> Result { + fn load_options(&self, cli_options: &CliOptions) -> Result { let mut opts: StandaloneOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), @@ -246,12 +253,12 @@ impl StartCommand { opts.mode = Mode::Standalone; - if let Some(dir) = top_level_options.log_dir { - opts.logging.dir = dir; + if let Some(dir) = &cli_options.log_dir { + opts.logging.dir = dir.clone(); } - if top_level_options.log_level.is_some() { - opts.logging.level = top_level_options.log_level; + if cli_options.log_level.is_some() { + opts.logging.level = cli_options.log_level.clone(); } let tls_opts = TlsOption::new( @@ -357,10 +364,14 @@ impl StartCommand { let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + let table_meta_allocator = + Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())); + let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), procedure_manager.clone(), datanode_manager.clone(), + table_meta_allocator, ) .await?; @@ -382,10 +393,11 @@ impl StartCommand { }) } - async fn create_ddl_task_executor( + pub async fn create_ddl_task_executor( kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, + table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; @@ -396,7 +408,7 @@ impl StartCommand { datanode_manager, Arc::new(DummyCacheInvalidator), table_metadata_manager, - Arc::new(StandaloneTableMetadataCreator::new(kv_backend)), + table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), ) .context(InitDdlManagerSnafu)?, @@ -432,7 +444,7 @@ mod tests { use servers::Mode; use super::*; - use crate::options::ENV_VAR_SEP; + use crate::options::{CliOptions, ENV_VAR_SEP}; #[tokio::test] async fn test_try_from_start_command_to_anymap() { @@ -515,8 +527,7 @@ mod tests { ..Default::default() }; - let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap() - else { + let Options::Standalone(options) = cmd.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; let fe_opts = options.frontend; @@ -561,16 +572,19 @@ mod tests { } #[test] - fn test_top_level_options() { + fn test_load_log_options_from_cli() { let cmd = StartCommand { user_provider: Some("static_user_provider:cmd:test=test".to_string()), ..Default::default() }; let Options::Standalone(opts) = cmd - .load_options(TopLevelOptions { + .load_options(&CliOptions { log_dir: Some("/tmp/greptimedb/test/logs".to_string()), log_level: Some("debug".to_string()), + + #[cfg(feature = "tokio-console")] + tokio_console_addr: None, }) .unwrap() else { @@ -637,11 +651,8 @@ mod tests { ..Default::default() }; - let top_level_opts = TopLevelOptions { - log_dir: None, - log_level: None, - }; - let Options::Standalone(opts) = command.load_options(top_level_opts).unwrap() + let Options::Standalone(opts) = + command.load_options(&CliOptions::default()).unwrap() else { unreachable!() }; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index b257bf57e6f7..6b820b85356d 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -399,9 +399,20 @@ impl RegionServerInner { } async fn stop(&self) -> Result<()> { - for region in self.region_map.iter() { - let region_id = *region.key(); - let engine = region.value(); + // Calling async functions while iterating inside the Dashmap could easily cause the Rust + // complains "higher-ranked lifetime error". Rust can't prove some future is legit. + // Possible related issue: https://github.com/rust-lang/rust/issues/102211 + // + // The walkaround is to put the async functions in the `common_runtime::spawn_bg`. Or like + // it here, collect the values first then use later separately. + + let regions = self + .region_map + .iter() + .map(|x| (*x.key(), x.value().clone())) + .collect::>(); + + for (region_id, engine) in regions { let closed = engine .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await;