Skip to content

Commit

Permalink
Avoid redundant shutdown in TracerProvider::drop when already shut do…
Browse files Browse the repository at this point in the history
…wn (#2197)

Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
4 people authored Oct 23, 2024
1 parent 2bb53b4 commit a47b429
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 36 deletions.
262 changes: 226 additions & 36 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,67 @@
//! # Trace Provider SDK
//!
//! ## Tracer Creation
//!
//! New [`Tracer`] instances are always created through a [`TracerProvider`].
//!
//! All configuration objects and extension points (span processors,
//! propagators) are provided by the [`TracerProvider`]. [`Tracer`] instances do
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
/// # Trace Provider SDK
///
/// The `TracerProvider` handles the creation and management of [`Tracer`] instances and coordinates
/// span processing. It serves as the central configuration point for tracing, ensuring consistency
/// across all [`Tracer`] instances it creates.
///
/// ## Tracer Creation
///
/// New [`Tracer`] instances are always created through a `TracerProvider`. These `Tracer`s share
/// a common configuration, which includes the [`Resource`], span processors, sampling strategies,
/// and span limits. This avoids the need for each `Tracer` to maintain its own version of these
/// configurations, ensuring uniform behavior across all instances.
///
/// ## Cloning and Shutdown
///
/// The `TracerProvider` is designed to be clonable. Cloning a `TracerProvider` creates a
/// new reference to the same provider, not a new instance. Dropping the last reference
/// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider
/// will flush all remaining spans, ensuring they are passed to the configured processors.
/// Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown)
/// method, which will ensure the same behavior.
///
/// Once shut down, the `TracerProvider` transitions into a disabled state. In this state, further
/// operations on its associated `Tracer` instances will result in no-ops, ensuring that no spans
/// are processed or exported after shutdown.
///
/// ## Span Processing and Force Flush
///
/// The `TracerProvider` manages the lifecycle of span processors, which are responsible for
/// collecting, processing, and exporting spans. The [`force_flush`](TracerProvider::force_flush) method
/// invoked at any time will trigger an immediate flush of all pending spans (if any) to the exporters.
/// This will block the user thread till all the spans are passed to exporters.
///
/// # Examples
///
/// ```
/// use opentelemetry::global;
/// use opentelemetry_sdk::trace::TracerProvider;
/// use opentelemetry::trace::Tracer;
///
/// fn init_tracing() -> TracerProvider {
/// let provider = TracerProvider::default();
///
/// // Set the provider to be used globally
/// let _ = global::set_tracer_provider(provider.clone());
///
/// provider
/// }
///
/// fn main() {
/// let provider = init_tracing();
///
/// // create tracer..
/// let tracer = global::tracer("example/client");
///
/// // create span...
/// let span = tracer
/// .span_builder("test_span")
/// .start(&tracer);
///
/// // Explicitly shut down the provider
/// provider.shutdown();
/// }
/// ```
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
Expand All @@ -16,7 +70,7 @@ use crate::{export::trace::SpanExporter, trace::SpanProcessor};
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, trace::TraceResult};
use opentelemetry::{otel_debug, trace::TraceResult};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -36,36 +90,60 @@ static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider
span_limits: SpanLimits::default(),
resource: Cow::Owned(Resource::empty()),
},
is_shutdown: AtomicBool::new(true),
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

/// TracerProvider inner type
#[derive(Debug)]
pub(crate) struct TracerProviderInner {
processors: Vec<Box<dyn SpanProcessor>>,
config: crate::trace::Config,
is_shutdown: AtomicBool,
}

impl Drop for TracerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
impl TracerProviderInner {
/// Crate-private shutdown method to be called both from explicit shutdown
/// and from Drop when the last reference is released.
pub(crate) fn shutdown(&self) -> Vec<TraceError> {
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
// - Or the error occurs during `TracerProviderInner::Drop` as part of telemetry shutdown,
// which is non-actionable by the user
otel_debug!(name: "TracerProvider.Drop.ShutdownError",
error = format!("{err}"));
errs.push(err);
}
}
errs
}
}

impl Drop for TracerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "TracerProvider.Drop.AlreadyShutdown"
);
}
}
}

/// Creator and registry of named [`Tracer`] instances.
///
/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
/// Cloning and dropping them will not stop the span processing. To stop span processing, users
/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
/// `TracerProvider` is a container holding pointers to `SpanProcessor` and other components.
/// Cloning a `TracerProvider` instance and dropping it will not stop span processing. To stop span processing, users
/// must either call the `shutdown` method explicitly or allow the last reference to the `TracerProvider`
/// to be dropped. When the last reference is dropped, the shutdown process will be automatically triggered
/// to ensure proper cleanup.
#[derive(Clone, Debug)]
pub struct TracerProvider {
inner: Arc<TracerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

impl Default for TracerProvider {
Expand All @@ -79,7 +157,6 @@ impl TracerProvider {
pub(crate) fn new(inner: TracerProviderInner) -> Self {
TracerProvider {
inner: Arc::new(inner),
is_shutdown: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -101,7 +178,7 @@ impl TracerProvider {
/// true if the provider has been shutdown
/// Don't start span or export spans when provider is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::Relaxed)
self.inner.is_shutdown.load(Ordering::Relaxed)
}

/// Force flush all remaining spans in span processors and return results.
Expand Down Expand Up @@ -153,28 +230,20 @@ impl TracerProvider {
/// Note that shut down doesn't means the TracerProvider has dropped
pub fn shutdown(&self) -> TraceResult<()> {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new spans after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);
}
}

let errs = self.inner.shutdown();
if errs.is_empty() {
Ok(())
} else {
Err(TraceError::Other(format!("{errs:?}").into()))
}
} else {
Err(TraceError::Other(
"tracer provider already shut down".into(),
))
Err(TraceError::TracerProviderAlreadyShutdown)
}
}
}
Expand Down Expand Up @@ -215,7 +284,7 @@ impl opentelemetry::trace::TracerProvider for TracerProvider {
}

fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
}
Tracer::new(library, self.clone())
Expand Down Expand Up @@ -292,7 +361,12 @@ impl Builder {
p.set_resource(config.resource.as_ref());
}

TracerProvider::new(TracerProviderInner { processors, config })
let is_shutdown = AtomicBool::new(false);
TracerProvider::new(TracerProviderInner {
processors,
config,
is_shutdown,
})
}
}

Expand Down Expand Up @@ -391,6 +465,7 @@ mod tests {
Box::from(TestSpanProcessor::new(false)),
],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

let results = tracer_provider.force_flush();
Expand Down Expand Up @@ -534,6 +609,7 @@ mod tests {
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

let test_tracer_1 = tracer_provider.tracer("test1");
Expand All @@ -554,14 +630,128 @@ mod tests {

// after shutdown we should get noop tracer
let noop_tracer = tracer_provider.tracer("noop");

// noop tracer cannot start anything
let _ = noop_tracer.start("test");
assert!(assert_handle.started_span_count(2));
// noop tracer's tracer provider should be shutdown
assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));
assert!(noop_tracer.provider().is_shutdown());

// existing tracer becomes noops after shutdown
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));

// also existing tracer's tracer provider are in shutdown state
assert!(test_tracer_1.provider().is_shutdown());
}

#[derive(Debug)]
struct CountingShutdownProcessor {
shutdown_count: Arc<AtomicU32>,
}

impl CountingShutdownProcessor {
fn new(shutdown_count: Arc<AtomicU32>) -> Self {
CountingShutdownProcessor { shutdown_count }
}
}

impl SpanProcessor for CountingShutdownProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
// No operation needed for this processor
}

fn on_end(&self, _span: SpanData) {
// No operation needed for this processor
}

fn force_flush(&self) -> TraceResult<()> {
Ok(())
}

fn shutdown(&self) -> TraceResult<()> {
self.shutdown_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

#[test]
fn drop_test_with_multiple_providers() {
let shutdown_count = Arc::new(AtomicU32::new(0));

{
// Create a shared TracerProviderInner and use it across multiple providers
let shared_inner = Arc::new(TracerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_count.clone(),
))],
config: Config::default(),
is_shutdown: AtomicBool::new(false),
});

{
let tracer_provider1 = super::TracerProvider {
inner: shared_inner.clone(),
};
let tracer_provider2 = super::TracerProvider {
inner: shared_inner.clone(),
};

let tracer1 = tracer_provider1.tracer("test-tracer1");
let tracer2 = tracer_provider2.tracer("test-tracer2");

let _span1 = tracer1.start("span1");
let _span2 = tracer2.start("span2");

// TracerProviderInner should not be dropped yet, since both providers and `shared_inner`
// are still holding a reference.
}
// At this point, both `tracer_provider1` and `tracer_provider2` are dropped,
// but `shared_inner` still holds a reference, so `TracerProviderInner` is NOT dropped yet.
assert_eq!(shutdown_count.load(Ordering::SeqCst), 0);
}
// Verify shutdown was called during the drop of the shared TracerProviderInner
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
}

#[test]
fn drop_after_shutdown_test_with_multiple_providers() {
let shutdown_count = Arc::new(AtomicU32::new(0));

// Create a shared TracerProviderInner and use it across multiple providers
let shared_inner = Arc::new(TracerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_count.clone(),
))],
config: Config::default(),
is_shutdown: AtomicBool::new(false),
});

// Create a scope to test behavior when providers are dropped
{
let tracer_provider1 = super::TracerProvider {
inner: shared_inner.clone(),
};
let tracer_provider2 = super::TracerProvider {
inner: shared_inner.clone(),
};

// Explicitly shut down the tracer provider
let shutdown_result = tracer_provider1.shutdown();
assert!(shutdown_result.is_ok());

// Verify that shutdown was called exactly once
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);

// TracerProvider2 should observe the shutdown state but not trigger another shutdown
let shutdown_result2 = tracer_provider2.shutdown();
assert!(shutdown_result2.is_err());
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);

// Both tracer providers will be dropped at the end of this scope
}

// Verify that shutdown was only called once, even after drop
assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
}
}
4 changes: 4 additions & 0 deletions opentelemetry/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ pub enum TraceError {
#[error("Exporting timed out after {} seconds", .0.as_secs())]
ExportTimedOut(time::Duration),

/// already shutdown error
#[error("TracerProvider already shutdown")]
TracerProviderAlreadyShutdown,

/// Other errors propagated from trace SDK that weren't covered above
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down

0 comments on commit a47b429

Please sign in to comment.