diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 0c5cd78003e7f..40694d951b956 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -956,8 +956,8 @@ impl SystemConfig { backup_storage_directory: self.backup_storage_directory, max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs, pause_on_next_bootstrap: self.pause_on_next_bootstrap, - enable_tracing: Some(true), // okay? - telemetry_enabled: None, // deprecated + enable_tracing: self.enable_tracing, + telemetry_enabled: None, // deprecated } } } diff --git a/src/common/src/system_param/common.rs b/src/common/src/system_param/common.rs index 8531273454e26..d1f40f9cda728 100644 --- a/src/common/src/system_param/common.rs +++ b/src/common/src/system_param/common.rs @@ -1,13 +1,17 @@ use std::sync::Mutex; use super::reader::SystemParamsReader; -use crate::util::tracing::toggle_otel_layer; +use crate::util::tracing::layer::toggle_otel_layer; +/// Node-independent handler for system parameter changes. +/// +/// Currently, it is only used to enable or disable the distributed tracing layer. pub struct CommonHandler { last_params: Mutex>, } impl CommonHandler { + /// Create a new handler with the initial parameters. pub fn new(initial: SystemParamsReader) -> Self { let this = Self { last_params: None.into(), @@ -16,6 +20,7 @@ impl CommonHandler { this } + /// Handle the change of system parameters. // TODO: directly call this method with the difference of old and new params. pub fn handle_change(&self, new_params: SystemParamsReader) { let mut last_params = self.last_params.lock().unwrap(); diff --git a/src/common/src/system_param/local_manager.rs b/src/common/src/system_param/local_manager.rs index 94c671dd33980..25363640bc4eb 100644 --- a/src/common/src/system_param/local_manager.rs +++ b/src/common/src/system_param/local_manager.rs @@ -45,6 +45,7 @@ impl LocalSystemParamsManager { let params = Arc::new(ArcSwap::from_pointee(initial_params.clone())); let (tx, _) = channel(params.clone()); + // Spawn a task to run the common handler. tokio::spawn({ let mut rx = tx.subscribe(); async move { diff --git a/src/common/src/util/tracing.rs b/src/common/src/util/tracing.rs index e6f8e52e2df39..e7da6e8e7d580 100644 --- a/src/common/src/util/tracing.rs +++ b/src/common/src/util/tracing.rs @@ -12,30 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod layer; + use std::collections::HashMap; use std::pin::Pin; -use std::sync::OnceLock; use std::task::{Context, Poll}; use opentelemetry::propagation::TextMapPropagator; use opentelemetry::sdk::propagation::TraceContextPropagator; use tracing_opentelemetry::OpenTelemetrySpanExt; -static TOGGLE_OTEL_LAYER: OnceLock> = OnceLock::new(); - -pub fn set_toggle_otel_layer_fn(f: impl Fn(bool) + Sync + Send + 'static) { - TOGGLE_OTEL_LAYER - .set(Box::new(f)) - .ok() - .expect("toggle otel layer fn set twice"); -} - -pub fn toggle_otel_layer(enabled: bool) { - if let Some(f) = TOGGLE_OTEL_LAYER.get() { - f(enabled); - } -} - /// Context for tracing used for propagating tracing information in a distributed system. /// /// Generally, the caller of a service should create a tracing context from the current tracing span diff --git a/src/common/src/util/tracing/layer.rs b/src/common/src/util/tracing/layer.rs new file mode 100644 index 0000000000000..a5268a55dc90e --- /dev/null +++ b/src/common/src/util/tracing/layer.rs @@ -0,0 +1,32 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; + +static TOGGLE_OTEL_LAYER: OnceLock> = OnceLock::new(); + +/// Set the function to toggle the opentelemetry tracing layer. Panics if called twice. +pub fn set_toggle_otel_layer_fn(f: impl Fn(bool) + Sync + Send + 'static) { + TOGGLE_OTEL_LAYER + .set(Box::new(f)) + .ok() + .expect("toggle otel layer fn set twice"); +} + +/// Toggle the opentelemetry tracing layer. +pub fn toggle_otel_layer(enabled: bool) { + if let Some(f) = TOGGLE_OTEL_LAYER.get() { + f(enabled); + } +} diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index 21ec154ee7dc4..fbcce97a97d9b 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -45,7 +45,7 @@ pub struct SystemParamsController { notification_manager: NotificationManagerRef, // Cached parameters. params: RwLock, - + /// Common handler for system params. common_handler: CommonHandler, } diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index d78e006ab4b52..14d0e311a2d89 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -44,7 +44,7 @@ pub struct SystemParamsManager { notification_manager: NotificationManagerRef, // Cached parameters. params: RwLock, - + /// Common handler for system params. common_handler: CommonHandler, } diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 6e26e56a10a29..a463985ad4def 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -20,6 +20,7 @@ use risingwave_common::metrics::MetricsLayer; use risingwave_common::util::deployment::Deployment; use risingwave_common::util::env_var::env_var_is_true; use risingwave_common::util::query_log::*; +use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn; use thiserror_ext::AsReport; use tracing::level_filters::LevelFilter as Level; use tracing_subscriber::filter::{FilterFn, Targets}; @@ -133,6 +134,11 @@ impl LoggerSettings { } } +/// Create a filter that disables all events or spans. +fn disabled_filter() -> filter::Targets { + filter::Targets::new() +} + /// Init logger for RisingWave binaries. /// /// ## Environment variables to configure logger dynamically @@ -436,15 +442,16 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { .unwrap() }; - // Disable all events and spans by default. - let (reload_filter, reload_handle) = reload::Layer::new(filter::Targets::new()); + // Disable by filtering out all events or spans by default. + // It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later. + let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter()); - risingwave_common::util::tracing::set_toggle_otel_layer_fn(move |enabled: bool| { + set_toggle_otel_layer_fn(move |enabled: bool| { let result = reload_handle.modify(|f| { *f = if enabled { default_filter.clone() } else { - filter::Targets::new() + disabled_filter() } });