Skip to content

Commit

Permalink
refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 12, 2024
1 parent 392c03c commit 376846c
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,8 @@ impl SystemConfig {
backup_storage_directory: self.backup_storage_directory,
max_concurrent_creating_streaming_jobs: self.max_concurrent_creating_streaming_jobs,
pause_on_next_bootstrap: self.pause_on_next_bootstrap,
enable_tracing: Some(true), // okay?
telemetry_enabled: None, // deprecated
enable_tracing: self.enable_tracing,
telemetry_enabled: None, // deprecated
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/system_param/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::sync::Mutex;

use super::reader::SystemParamsReader;
use crate::util::tracing::toggle_otel_layer;
use crate::util::tracing::layer::toggle_otel_layer;

/// Node-independent handler for system parameter changes.
///
/// Currently, it is only used to enable or disable the distributed tracing layer.
pub struct CommonHandler {
last_params: Mutex<Option<SystemParamsReader>>,
}

impl CommonHandler {
/// Create a new handler with the initial parameters.
pub fn new(initial: SystemParamsReader) -> Self {
let this = Self {
last_params: None.into(),
Expand All @@ -16,6 +20,7 @@ impl CommonHandler {
this
}

/// Handle the change of system parameters.
// TODO: directly call this method with the difference of old and new params.
pub fn handle_change(&self, new_params: SystemParamsReader) {
let mut last_params = self.last_params.lock().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl LocalSystemParamsManager {
let params = Arc::new(ArcSwap::from_pointee(initial_params.clone()));
let (tx, _) = channel(params.clone());

// Spawn a task to run the common handler.
tokio::spawn({
let mut rx = tx.subscribe();
async move {
Expand Down
18 changes: 2 additions & 16 deletions src/common/src/util/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod layer;

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::OnceLock;
use std::task::{Context, Poll};

use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;

static TOGGLE_OTEL_LAYER: OnceLock<Box<dyn Fn(bool) + Sync + Send>> = OnceLock::new();

pub fn set_toggle_otel_layer_fn(f: impl Fn(bool) + Sync + Send + 'static) {
TOGGLE_OTEL_LAYER
.set(Box::new(f))
.ok()
.expect("toggle otel layer fn set twice");
}

pub fn toggle_otel_layer(enabled: bool) {
if let Some(f) = TOGGLE_OTEL_LAYER.get() {
f(enabled);
}
}

/// Context for tracing used for propagating tracing information in a distributed system.
///
/// Generally, the caller of a service should create a tracing context from the current tracing span
Expand Down
32 changes: 32 additions & 0 deletions src/common/src/util/tracing/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::OnceLock;

static TOGGLE_OTEL_LAYER: OnceLock<Box<dyn Fn(bool) + Sync + Send>> = OnceLock::new();

/// Set the function to toggle the opentelemetry tracing layer. Panics if called twice.
pub fn set_toggle_otel_layer_fn(f: impl Fn(bool) + Sync + Send + 'static) {
TOGGLE_OTEL_LAYER
.set(Box::new(f))
.ok()
.expect("toggle otel layer fn set twice");
}

/// Toggle the opentelemetry tracing layer.
pub fn toggle_otel_layer(enabled: bool) {
if let Some(f) = TOGGLE_OTEL_LAYER.get() {
f(enabled);
}
}
2 changes: 1 addition & 1 deletion src/meta/src/controller/system_param.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct SystemParamsController {
notification_manager: NotificationManagerRef,
// Cached parameters.
params: RwLock<PbSystemParams>,

/// Common handler for system params.
common_handler: CommonHandler,
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct SystemParamsManager {
notification_manager: NotificationManagerRef,
// Cached parameters.
params: RwLock<SystemParams>,

/// Common handler for system params.
common_handler: CommonHandler,
}

Expand Down
15 changes: 11 additions & 4 deletions src/utils/runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::metrics::MetricsLayer;
use risingwave_common::util::deployment::Deployment;
use risingwave_common::util::env_var::env_var_is_true;
use risingwave_common::util::query_log::*;
use risingwave_common::util::tracing::layer::set_toggle_otel_layer_fn;
use thiserror_ext::AsReport;
use tracing::level_filters::LevelFilter as Level;
use tracing_subscriber::filter::{FilterFn, Targets};
Expand Down Expand Up @@ -133,6 +134,11 @@ impl LoggerSettings {
}
}

/// Create a filter that disables all events or spans.
fn disabled_filter() -> filter::Targets {
filter::Targets::new()
}

/// Init logger for RisingWave binaries.
///
/// ## Environment variables to configure logger dynamically
Expand Down Expand Up @@ -436,15 +442,16 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {
.unwrap()
};

// Disable all events and spans by default.
let (reload_filter, reload_handle) = reload::Layer::new(filter::Targets::new());
// Disable by filtering out all events or spans by default.
// It'll be enabled with `toggle_otel_layer` based on the system parameter `enable_tracing` later.
let (reload_filter, reload_handle) = reload::Layer::new(disabled_filter());

risingwave_common::util::tracing::set_toggle_otel_layer_fn(move |enabled: bool| {
set_toggle_otel_layer_fn(move |enabled: bool| {
let result = reload_handle.modify(|f| {
*f = if enabled {
default_filter.clone()
} else {
filter::Targets::new()
disabled_filter()
}
});

Expand Down

0 comments on commit 376846c

Please sign in to comment.