From 80bc39d0f45f2e1a0c9a3ebfd3ed5ef45bbaca5a Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 21 May 2024 18:52:21 +0800 Subject: [PATCH] fix: avoid logging guard drop --- Cargo.lock | 1 + src/cmd/Cargo.toml | 1 + src/cmd/src/cli.rs | 24 ++++++++++++++++-------- src/cmd/src/cli/bench.rs | 5 +++-- src/cmd/src/cli/export.rs | 24 ++++++++++++++---------- src/cmd/src/cli/upgrade.rs | 5 +++-- src/cmd/src/datanode.rs | 16 ++++++++++++---- src/cmd/src/frontend.rs | 16 ++++++++++++---- src/cmd/src/metasrv.rs | 16 ++++++++++++---- src/cmd/src/standalone.rs | 8 +++++++- 10 files changed, 81 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3572c04287aa..2fe59305bd03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1662,6 +1662,7 @@ dependencies = [ "tikv-jemallocator", "tokio", "toml 0.8.12", + "tracing-appender", ] [[package]] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 10ee2c0f9c59..c1a7dcdaec9c 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -74,6 +74,7 @@ substrait.workspace = true table.workspace = true tokio.workspace = true toml.workspace = true +tracing-appender = "0.2" [target.'cfg(not(windows))'.dependencies] tikv-jemallocator = "0.5" diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 2f1ee2b67297..d97d9544545f 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -31,6 +31,7 @@ use async_trait::async_trait; use bench::BenchTableMetadataCommand; use clap::Parser; use common_telemetry::logging::{LoggingOptions, TracingOptions}; +use tracing_appender::non_blocking::WorkerGuard; // pub use repl::Repl; use upgrade::UpgradeCommand; @@ -48,11 +49,18 @@ pub trait Tool: Send + Sync { pub struct Instance { tool: Box, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + _guard: Vec, } impl Instance { - fn new(tool: Box) -> Self { - Self { tool } + fn new(tool: Box, guard: Vec) -> Self { + Self { + tool, + _guard: guard, + } } } @@ -83,14 +91,14 @@ pub struct Command { impl Command { pub async fn build(&self, opts: LoggingOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts, &TracingOptions::default(), None, ); - self.cmd.build().await + self.cmd.build(guard).await } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { @@ -115,12 +123,12 @@ enum SubCommand { } impl SubCommand { - async fn build(&self) -> Result { + async fn build(&self, guard: Vec) -> Result { match self { // SubCommand::Attach(cmd) => cmd.build().await, - SubCommand::Upgrade(cmd) => cmd.build().await, - SubCommand::Bench(cmd) => cmd.build().await, - SubCommand::Export(cmd) => cmd.build().await, + SubCommand::Upgrade(cmd) => cmd.build(guard).await, + SubCommand::Bench(cmd) => cmd.build(guard).await, + SubCommand::Export(cmd) => cmd.build(guard).await, } } } diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index e441e7643d98..7f0acfe378bf 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -30,6 +30,7 @@ use datatypes::schema::{ColumnSchema, RawSchema}; use rand::Rng; use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; +use tracing_appender::non_blocking::WorkerGuard; use self::metadata::TableMetadataBencher; use crate::cli::{Instance, Tool}; @@ -61,7 +62,7 @@ pub struct BenchTableMetadataCommand { } impl BenchTableMetadataCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) .await .unwrap(); @@ -72,7 +73,7 @@ impl BenchTableMetadataCommand { table_metadata_manager, count: self.count, }; - Ok(Instance::new(Box::new(tool))) + Ok(Instance::new(Box::new(tool), guard)) } } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 24b608689d0e..b83ac16bfefe 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -30,6 +30,7 @@ use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; use tokio::time::Instant; +use tracing_appender::non_blocking::WorkerGuard; use crate::cli::{Instance, Tool}; use crate::error::{ @@ -80,7 +81,7 @@ pub struct ExportCommand { } impl ExportCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let (catalog, schema) = split_database(&self.database)?; let auth_header = if let Some(basic) = &self.auth_basic { @@ -90,15 +91,18 @@ impl ExportCommand { None }; - Ok(Instance::new(Box::new(Export { - addr: self.addr.clone(), - catalog, - schema, - output_dir: self.output_dir.clone(), - parallelism: self.export_jobs, - target: self.target.clone(), - auth_header, - }))) + Ok(Instance::new( + Box::new(Export { + addr: self.addr.clone(), + catalog, + schema, + output_dir: self.output_dir.clone(), + parallelism: self.export_jobs, + target: self.target.clone(), + auth_header, + }), + guard, + )) } } diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index ca362c8db4d9..f6f0d525c152 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -40,6 +40,7 @@ use etcd_client::Client; use futures::TryStreamExt; use prost::Message; use snafu::ResultExt; +use tracing_appender::non_blocking::WorkerGuard; use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue}; use crate::cli::{Instance, Tool}; @@ -63,7 +64,7 @@ pub struct UpgradeCommand { } impl UpgradeCommand { - pub async fn build(&self) -> Result { + pub async fn build(&self, guard: Vec) -> Result { let client = Client::connect([&self.etcd_addr], None) .await .context(ConnectEtcdSnafu { @@ -77,7 +78,7 @@ impl UpgradeCommand { skip_schema_keys: self.skip_schema_keys, skip_table_route_keys: self.skip_table_route_keys, }; - Ok(Instance::new(Box::new(tool))) + Ok(Instance::new(Box::new(tool), guard)) } } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 9eb821dff56a..0479d6811cd8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -29,6 +29,7 @@ use datanode::service::DatanodeServiceBuilder; use meta_client::MetaClientOptions; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, @@ -40,11 +41,18 @@ pub const APP_NAME: &str = "greptime-datanode"; pub struct Instance { datanode: Datanode, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + _guard: Vec, } impl Instance { - pub fn new(datanode: Datanode) -> Self { - Self { datanode } + pub fn new(datanode: Datanode, guard: Vec) -> Self { + Self { + datanode, + _guard: guard, + } } pub fn datanode_mut(&mut self) -> &mut Datanode { @@ -228,7 +236,7 @@ impl StartCommand { } async fn build(&self, mut opts: DatanodeOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, @@ -274,7 +282,7 @@ impl StartCommand { .context(StartDatanodeSnafu)?; datanode.setup_services(services); - Ok(Instance::new(datanode)) + Ok(Instance::new(datanode, guard)) } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 159f50399cbb..39e37c483709 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -42,6 +42,7 @@ use meta_client::MetaClientOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, @@ -51,13 +52,20 @@ use crate::{log_versions, App}; pub struct Instance { frontend: FeInstance, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + _guard: Vec, } pub const APP_NAME: &str = "greptime-frontend"; impl Instance { - pub fn new(frontend: FeInstance) -> Self { - Self { frontend } + pub fn new(frontend: FeInstance, guard: Vec) -> Self { + Self { + frontend, + _guard: guard, + } } pub fn mut_inner(&mut self) -> &mut FeInstance { @@ -241,7 +249,7 @@ impl StartCommand { } async fn build(&self, mut opts: FrontendOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + let guard = common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, @@ -358,7 +366,7 @@ impl StartCommand { .build_servers(opts, servers) .context(StartFrontendSnafu)?; - Ok(Instance::new(instance)) + Ok(Instance::new(instance, guard)) } } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 57157762ed4c..0e63bef31ade 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -23,6 +23,7 @@ use common_version::{short_version, version}; use meta_srv::bootstrap::MetasrvInstance; use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; use crate::options::GlobalOptions; @@ -32,11 +33,18 @@ pub const APP_NAME: &str = "greptime-metasrv"; pub struct Instance { instance: MetasrvInstance, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + _guard: Vec, } impl Instance { - fn new(instance: MetasrvInstance) -> Self { - Self { instance } + fn new(instance: MetasrvInstance, guard: Vec) -> Self { + Self { + instance, + _guard: guard, + } } } @@ -214,7 +222,7 @@ impl StartCommand { } async fn build(&self, mut opts: MetasrvOptions) -> Result { - let _guard = + let guard = common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); log_versions(version!(), short_version!()); @@ -234,7 +242,7 @@ impl StartCommand { .await .context(error::BuildMetaServerSnafu)?; - Ok(Instance::new(instance)) + Ok(Instance::new(instance, guard)) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ba1660167ca6..0b7a1335507e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -62,6 +62,7 @@ use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, @@ -210,6 +211,10 @@ pub struct Instance { frontend: FeInstance, procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, + + // Keep the logging guard to prevent the worker from being dropped. + #[allow(dead_code)] + _guard: Vec, } #[async_trait] @@ -375,7 +380,7 @@ impl StartCommand { #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] async fn build(&self, opts: StandaloneOptions) -> Result { - let _guard = + let guard = common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); log_versions(version!(), short_version!()); @@ -521,6 +526,7 @@ impl StartCommand { frontend, procedure_manager, wal_options_allocator, + _guard: guard, }) }