From 9cce7b985c488a378aaeb7a065611c4bdf299396 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Sat, 18 May 2024 15:29:30 +0800 Subject: [PATCH] refactor: init once for common_telemetry::init_global_logging and remove its return value which is unused --- src/cmd/src/cli.rs | 8 +- src/cmd/src/datanode.rs | 2 +- src/cmd/src/frontend.rs | 2 +- src/cmd/src/metasrv.rs | 3 +- src/cmd/src/standalone.rs | 3 +- src/common/telemetry/src/logging.rs | 253 ++++++++++++++-------------- 6 files changed, 130 insertions(+), 141 deletions(-) diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 2f1ee2b67297..c0f21981bfb5 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -83,13 +83,7 @@ pub struct Command { impl Command { pub async fn build(&self, opts: LoggingOptions) -> Result { - let _guard = common_telemetry::init_global_logging( - APP_NAME, - &opts, - &TracingOptions::default(), - None, - ); - + common_telemetry::init_global_logging(APP_NAME, &opts, &TracingOptions::default(), None); self.cmd.build().await } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f28f1195a6b3..90850baf021e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -227,7 +227,7 @@ impl StartCommand { } async fn build(&self, mut opts: DatanodeOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index afbc9054abc6..9a46b4e9b15d 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -240,7 +240,7 @@ impl StartCommand { } async fn build(&self, mut opts: FrontendOptions) -> Result { - let _guard = common_telemetry::init_global_logging( + common_telemetry::init_global_logging( APP_NAME, &opts.logging, &opts.tracing, diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 2d56661feb48..43115b0283fb 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -213,8 +213,7 @@ impl StartCommand { } async fn build(&self, mut opts: MetasrvOptions) -> Result { - let _guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); let plugins = plugins::setup_metasrv_plugins(&mut opts) .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b049352ccecb..8f36a993cdb8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -374,8 +374,7 @@ impl StartCommand { #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] async fn build(&self, opts: StandaloneOptions) -> Result { - let _guard = - common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); + common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None); info!("Standalone start command: {:#?}", self); info!("Building standalone instance with {opts:#?}"); diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index f155b15e2976..3a5153cdadb9 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -84,7 +84,7 @@ pub fn init_default_ut_logging() { static START: Once = Once::new(); START.call_once(|| { - let mut g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap(); + let _g = GLOBAL_UT_LOG_GUARD.as_ref().lock().unwrap(); // When running in Github's actions, env "UNITTEST_LOG_DIR" is set to a directory other // than "/tmp". @@ -101,12 +101,7 @@ pub fn init_default_ut_logging() { level: Some(level), ..Default::default() }; - *g = Some(init_global_logging( - "unittest", - &opts, - &TracingOptions::default(), - None - )); + init_global_logging("unittest", &opts, &TracingOptions::default(), None); crate::info!("logs dir = {}", dir); }); @@ -123,140 +118,142 @@ pub fn init_global_logging( opts: &LoggingOptions, tracing_opts: &TracingOptions, node_id: Option, -) -> Vec { - let mut guards = vec![]; - let dir = &opts.dir; - let level = &opts.level; - let enable_otlp_tracing = opts.enable_otlp_tracing; - - // Enable log compatible layer to convert log record to tracing span. - LogTracer::init().expect("log tracer must be valid"); - - // stdout log layer. - let stdout_logging_layer = if opts.append_stdout { - let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); - guards.push(stdout_guard); - - Some( - Layer::new() - .with_writer(stdout_writer) - .with_ansi(atty::is(atty::Stream::Stdout)), - ) - } else { - None - }; - - // file log layer. - let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name); - let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender); - let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false); - guards.push(rolling_writer_guard); +) { + static START: Once = Once::new(); + START.call_once(|| { + let mut guards = vec![]; + let dir = &opts.dir; + let level = &opts.level; + let enable_otlp_tracing = opts.enable_otlp_tracing; - // error file log layer. - let err_rolling_appender = - RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err")); - let (err_rolling_writer, err_rolling_writer_guard) = - tracing_appender::non_blocking(err_rolling_appender); - let err_file_logging_layer = Layer::new() - .with_writer(err_rolling_writer) - .with_ansi(false); - guards.push(err_rolling_writer_guard); + // Enable log compatible layer to convert log record to tracing span. + LogTracer::init().expect("log tracer must be valid"); - // resolve log level settings from: - // - options from command line or config files - // - environment variable: RUST_LOG - // - default settings - let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok(); - let targets_string = level - .as_deref() - .or(rust_log_env.as_deref()) - .unwrap_or(DEFAULT_LOG_TARGETS); - let filter = targets_string - .parse::() - .expect("error parsing log level string"); - let sampler = opts - .tracing_sample_ratio - .as_ref() - .map(create_sampler) - .map(Sampler::ParentBased) - .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); - // Must enable 'tokio_unstable' cfg to use this feature. - // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start` - #[cfg(feature = "tokio-console")] - let subscriber = { - let tokio_console_layer = if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr - { - let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| { - panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}"); - }); - println!("tokio-console listening on {addr}"); + // stdout log layer. + let stdout_logging_layer = if opts.append_stdout { + let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + guards.push(stdout_guard); Some( - console_subscriber::ConsoleLayer::builder() - .server_addr(addr) - .spawn(), + Layer::new() + .with_writer(stdout_writer) + .with_ansi(atty::is(atty::Stream::Stdout)), ) } else { None }; - let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone())); + // file log layer. + let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name); + let (rolling_writer, rolling_writer_guard) = + tracing_appender::non_blocking(rolling_appender); + let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false); + guards.push(rolling_writer_guard); - let file_logging_layer = file_logging_layer.with_filter(filter); + // error file log layer. + let err_rolling_appender = + RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err")); + let (err_rolling_writer, err_rolling_writer_guard) = + tracing_appender::non_blocking(err_rolling_appender); + let err_file_logging_layer = Layer::new() + .with_writer(err_rolling_writer) + .with_ansi(false); + guards.push(err_rolling_writer_guard); - Registry::default() - .with(tokio_console_layer) - .with(stdout_logging_layer) - .with(file_logging_layer) - .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)) - }; + // resolve log level settings from: + // - options from command line or config files + // - environment variable: RUST_LOG + // - default settings + let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok(); + let targets_string = level + .as_deref() + .or(rust_log_env.as_deref()) + .unwrap_or(DEFAULT_LOG_TARGETS); + let filter = targets_string + .parse::() + .expect("error parsing log level string"); + let sampler = opts + .tracing_sample_ratio + .as_ref() + .map(create_sampler) + .map(Sampler::ParentBased) + .unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))); + // Must enable 'tokio_unstable' cfg to use this feature. + // For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start` + #[cfg(feature = "tokio-console")] + let subscriber = { + let tokio_console_layer = + if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr { + let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| { + panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}"); + }); + println!("tokio-console listening on {addr}"); - // consume the `tracing_opts`, to avoid "unused" warnings - let _ = tracing_opts; + Some( + console_subscriber::ConsoleLayer::builder() + .server_addr(addr) + .spawn(), + ) + } else { + None + }; - #[cfg(not(feature = "tokio-console"))] - let subscriber = Registry::default() - .with(filter) - .with(stdout_logging_layer) - .with(file_logging_layer) - .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)); + let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone())); - if enable_otlp_tracing { - global::set_text_map_propagator(TraceContextPropagator::new()); - // otlp exporter - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter().tonic().with_endpoint( - opts.otlp_endpoint - .as_ref() - .map(|e| format!("http://{}", e)) - .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()), - ), - ) - .with_trace_config( - opentelemetry_sdk::trace::config() - .with_sampler(sampler) - .with_resource(opentelemetry_sdk::Resource::new(vec![ - KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), - KeyValue::new( - resource::SERVICE_INSTANCE_ID, - node_id.unwrap_or("none".to_string()), - ), - KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), - KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), - ])), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .expect("otlp tracer install failed"); - let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer)); - let subscriber = subscriber.with(tracing_layer); - tracing::subscriber::set_global_default(subscriber) - .expect("error setting global tracing subscriber"); - } else { - tracing::subscriber::set_global_default(subscriber) - .expect("error setting global tracing subscriber"); - } + let file_logging_layer = file_logging_layer.with_filter(filter); + + Registry::default() + .with(tokio_console_layer) + .with(stdout_logging_layer) + .with(file_logging_layer) + .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)) + }; + + // consume the `tracing_opts`, to avoid "unused" warnings + let _ = tracing_opts; - guards + #[cfg(not(feature = "tokio-console"))] + let subscriber = Registry::default() + .with(filter) + .with(stdout_logging_layer) + .with(file_logging_layer) + .with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR)); + + if enable_otlp_tracing { + global::set_text_map_propagator(TraceContextPropagator::new()); + // otlp exporter + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter().tonic().with_endpoint( + opts.otlp_endpoint + .as_ref() + .map(|e| format!("http://{}", e)) + .unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()), + ), + ) + .with_trace_config( + opentelemetry_sdk::trace::config() + .with_sampler(sampler) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + KeyValue::new(resource::SERVICE_NAME, app_name.to_string()), + KeyValue::new( + resource::SERVICE_INSTANCE_ID, + node_id.unwrap_or("none".to_string()), + ), + KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), + ])), + ) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("otlp tracer install failed"); + let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer)); + let subscriber = subscriber.with(tracing_layer); + tracing::subscriber::set_global_default(subscriber) + .expect("error setting global tracing subscriber"); + } else { + tracing::subscriber::set_global_default(subscriber) + .expect("error setting global tracing subscriber"); + } + }); }