diff --git a/Cargo.lock b/Cargo.lock index 3572c04287aa..2fe59305bd03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1662,6 +1662,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "toml 0.8.12", + "tracing-appender", ] [[package]] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 10ee2c0f9c59..c1a7dcdaec9c 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -74,6 +74,7 @@ substrait.workspace = true table.workspace = true tokio.workspace = true toml.workspace = true +tracing-appender = "0.2" [target.'cfg(not(windows))'.dependencies] tikv-jemallocator = "0.5" diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 2f1ee2b67297..cf578e410fa6 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -31,6 +31,7 @@ use async_trait::async_trait; use bench::BenchTableMetadataCommand; use clap::Parser; use common_telemetry::logging::{LoggingOptions, TracingOptions}; +use tracing_appender::non_blocking::WorkerGuard; // pub use repl::Repl; use upgrade::UpgradeCommand; @@ -48,11 +49,15 @@ pub trait Tool: Send + Sync { pub struct Instance { tool: Box, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + guard: Vec, } impl Instance { - fn new(tool: Box) -> Self { - Self { tool } + fn new(tool: Box, guard: Vec) -> Self { + Self { tool, guard } } } @@ -83,14 +88,14 @@ pub struct Command { impl Command { pub async fn build(&self, opts: LoggingOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts, &TracingOptions::default(), None, ); - self.cmd.build().await + self.cmd.build(guard).await } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { @@ -115,12 +120,12 @@ enum SubCommand { } impl SubCommand { - async fn build(&self) -> Result { + async fn build(&self, guard: Vec) -> Result { match self { // SubCommand::Attach(cmd) => cmd.build().await, - SubCommand::Upgrade(cmd) => cmd.build().await, - SubCommand::Bench(cmd) => cmd.build().await, - SubCommand::Export(cmd) => cmd.build().await, + SubCommand::Upgrade(cmd) => cmd.build(guard).await, + SubCommand::Bench(cmd) => cmd.build(guard).await, + SubCommand::Export(cmd) => cmd.build(guard).await, } } } diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index e441e7643d98..7f0acfe378bf 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -30,6 +30,7 @@ use datatypes::schema::{ColumnSchema, RawSchema}; use rand::Rng; use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; +use tracing_appender::non_blocking::WorkerGuard; use self::metadata::TableMetadataBencher; use crate::cli::{Instance, Tool}; @@ -61,7 +62,7 @@ pub struct BenchTableMetadataCommand { } impl BenchTableMetadataCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) .await .unwrap(); @@ -72,7 +73,7 @@ impl BenchTableMetadataCommand { table_metadata_manager, count: self.count, }; - Ok(Instance::new(Box::new(tool))) + Ok(Instance::new(Box::new(tool), guard)) } } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 24b608689d0e..b83ac16bfefe 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -30,6 +30,7 @@ use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use tokio::time::Instant; +use tracing_appender::non_blocking::WorkerGuard; use crate::cli::{Instance, Tool}; use crate::error::{ @@ -80,7 +81,7 @@ pub struct ExportCommand { } impl ExportCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let (catalog, schema) = split_database(&self.database)?; let auth_header = if let Some(basic) = &self.auth_basic { @@ -90,15 +91,18 @@ impl ExportCommand { None }; - Ok(Instance::new(Box::new(Export { - addr: self.addr.clone(), - catalog, - schema, - output_dir: self.output_dir.clone(), - parallelism: self.export_jobs, - target: self.target.clone(), - auth_header, - }))) + Ok(Instance::new( + Box::new(Export { + addr: self.addr.clone(), + catalog, + schema, + output_dir: self.output_dir.clone(), + parallelism: self.export_jobs, + target: self.target.clone(), + auth_header, + }), + guard, + )) } } diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index ca362c8db4d9..f6f0d525c152 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -40,6 +40,7 @@ use etcd_client::Client; use futures::TryStreamExt; use prost::Message; use snafu::ResultExt; +use tracing_appender::non_blocking::WorkerGuard; use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue}; use crate::cli::{Instance, Tool}; @@ -63,7 +64,7 @@ pub struct UpgradeCommand { } impl UpgradeCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let client = Client::connect([&self.etcd_addr], None) .await .context(ConnectEtcdSnafu { @@ -77,7 +78,7 @@ impl UpgradeCommand { skip_schema_keys: self.skip_schema_keys, skip_table_route_keys: self.skip_table_route_keys, }; - Ok(Instance::new(Box::new(tool))) + Ok(Instance::new(Box::new(tool), guard)) } } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 9eb821dff56a..971ec0606864 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -29,6 +29,7 @@ use datanode::service::DatanodeServiceBuilder; use meta_client::MetaClientOptions; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, @@ -40,11 +41,15 @@ pub const APP_NAME: &str = "greptime-datanode"; pub struct Instance { datanode: Datanode, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + guard: Vec, } impl Instance { - pub fn new(datanode: Datanode) -> Self { - Self { datanode } + pub fn new(datanode: Datanode, guard: Vec) -> Self { + Self { datanode, guard } } pub fn datanode_mut(&mut self) -> &mut Datanode { @@ -228,7 +233,7 @@ impl StartCommand { } async fn build(&self, mut opts: DatanodeOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, @@ -274,7 +279,7 @@ impl StartCommand { .context(StartDatanodeSnafu)?; datanode.setup_services(services); - Ok(Instance::new(datanode)) + Ok(Instance::new(datanode, guard)) } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 159f50399cbb..ca188a9003af 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -42,6 +42,7 @@ use meta_client::MetaClientOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, @@ -51,13 +52,17 @@ use crate::{log_versions, App}; pub struct Instance { frontend: FeInstance, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + guard: Vec, } pub const APP_NAME: &str = "greptime-frontend"; impl Instance { - pub fn new(frontend: FeInstance) -> Self { - Self { frontend } + pub fn new(frontend: FeInstance, guard: Vec) -> Self { + Self { frontend, guard } } pub fn mut_inner(&mut self) -> &mut FeInstance { @@ -241,7 +246,7 @@ impl StartCommand { } async fn build(&self, mut opts: FrontendOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, @@ -358,7 +363,7 @@ impl StartCommand { .build_servers(opts, servers) .context(StartFrontendSnafu)?; - Ok(Instance::new(instance)) + Ok(Instance::new(instance, guard)) } } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 57157762ed4c..201fff2c6be7 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -23,6 +23,7 @@ use common_version::{short_version, version}; use meta_srv::bootstrap::MetasrvInstance; use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; use crate::options::GlobalOptions; @@ -32,11 +33,15 @@ pub const APP_NAME: &str = "greptime-metasrv"; pub struct Instance { instance: MetasrvInstance, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + guard: Vec, } impl Instance { - fn new(instance: MetasrvInstance) -> Self { - Self { instance } + fn new(instance: MetasrvInstance, guard: Vec) -> Self { + Self { instance, guard } } } @@ -214,7 +219,7 @@ impl StartCommand { } async fn build(&self, mut opts: MetasrvOptions) -> Result { - let _guard = + let guard = common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); log_versions(version!(), short_version!()); @@ -234,7 +239,7 @@ impl StartCommand { .await .context(error::BuildMetaServerSnafu)?; - Ok(Instance::new(instance)) + Ok(Instance::new(instance, guard)) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ba1660167ca6..9fb6ceec0d8b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -62,6 +62,7 @@ use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, @@ -210,6 +211,10 @@ pub struct Instance { frontend: FeInstance, procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + guard: Vec, } #[async_trait] @@ -375,7 +380,7 @@ impl StartCommand { #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] async fn build(&self, opts: StandaloneOptions) -> Result { - let _guard = + let guard = common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); log_versions(version!(), short_version!()); @@ -521,6 +526,7 @@ impl StartCommand { frontend, procedure_manager, wal_options_allocator, + guard, }) }