diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index dc5bacc813..e1faf255b7 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -7,7 +7,6 @@ use log::{error, info, warn, Level}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider}; -use opentelemetry_sdk::runtime; use opentelemetry_stdout::LogExporter; #[tokio::main] @@ -16,7 +15,7 @@ async fn main() { let exporter = LogExporter::default(); //Create a LoggerProvider and register the exporter let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .with_log_processor(BatchLogProcessor::builder(exporter).build()) .build(); // Setup Log Appender for the log crate. diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 9ba43f7d65..a8354822e1 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -296,7 +296,7 @@ mod tests { async fn batch_processor_no_deadlock() { let exporter: ReentrantLogExporter = ReentrantLogExporter; let logger_provider = LoggerProvider::builder() - .with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio) + .with_batch_exporter(exporter.clone()) .build(); let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index ff06ce6b49..6b3dee3f07 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -37,7 +37,7 @@ fn init_logs() -> Result Result { Ok(LoggerProvider::builder() .with_resource(RESOURCE.clone()) - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .build()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 8498a24913..5ae774cc76 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -8,7 +8,7 @@ use log::{info, Level}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; -use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; +use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; use std::os::unix::fs::MetadataExt; use std::time::Duration; @@ -28,7 +28,7 @@ fn init_logs() -> Result { let exporter = exporter_builder.build()?; Ok(LoggerProvider::builder() - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .with_resource( Resource::builder_empty() .with_service_name("logs-integration-test") @@ -48,6 +48,9 @@ pub async fn test_logs() -> Result<()> { log::set_max_level(Level::Info.to_level_filter()); info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + + // TODO: remove below wait before calling logger_provider.shutdown() + tokio::time::sleep(Duration::from_secs(10)).await; let _ = logger_provider.shutdown(); tokio::time::sleep(Duration::from_secs(10)).await; diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 06b5a76703..61991a9b75 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -95,7 +95,6 @@ - Bump msrv to 1.75.0. - - *Breaking* : [#2314](https://github.com/open-telemetry/opentelemetry-rust/pull/2314) - The LogRecord struct has been updated: - All fields are now pub(crate) instead of pub. @@ -105,6 +104,58 @@ - Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/open-telemetry/opentelemetry-rust/pull/2418) +- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) + + `BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated + background thread is created to do the batch processing and exporting. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_logs_batch_log_processor_with_async_runtime`**. + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter).build()) + .build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_logs_batch_log_processor_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessorWithAsyncRuntime::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_logs_batch_log_processor_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 88bb5af6ac..2c3643dee1 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,7 @@ internal-logs = ["tracing"] experimental_metrics_periodic_reader_no_runtime = ["metrics"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] +experimental_logs_batch_log_processor_with_async_runtime = ["logs"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 18f0fbb228..479ca36dd2 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,5 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; +use crate::{export::logs::LogExporter, Resource}; use crate::{logs::LogError, logs::LogResult}; use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope}; @@ -194,12 +194,8 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter( - self, - exporter: T, - runtime: R, - ) -> Self { - let batch = BatchLogProcessor::builder(exporter, runtime).build(); + pub fn with_batch_exporter(self, exporter: T) -> Self { + let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 0c95eae1e8..831ec1e522 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,14 +1,10 @@ use crate::{ export::logs::{ExportResult, LogBatch, LogExporter}, logs::{LogError, LogRecord, LogResult}, - runtime::{RuntimeChannel, TrySend}, Resource, }; -use futures_channel::oneshot; -use futures_util::{ - future::{self, Either}, - {pin_mut, stream, StreamExt as _}, -}; +use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; + #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; @@ -19,7 +15,19 @@ use std::{ fmt::{self, Debug, Formatter}, str::FromStr, sync::Arc, + thread, time::Duration, + time::Instant, +}; + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use crate::runtime::{RuntimeChannel, TrySend}; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use futures_channel::oneshot; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use futures_util::{ + future::{self, Either}, + {pin_mut, stream, StreamExt as _}, }; /// Delay interval between two consecutive exports. @@ -150,10 +158,345 @@ impl LogProcessor for SimpleLogProcessor { } } +/// Messages sent between application thread and batch log processor's work thread. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessage { + /// Export logs, usually called when the log is emitted. + ExportLog(Box<(LogRecord, InstrumentationScope)>), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + // Flush(Option>), + /// ForceFlush flushes the current buffer to the backend. + ForceFlush(mpsc::SyncSender), + /// Shut down the worker thread, push all logs in buffer to the backend. + Shutdown(mpsc::SyncSender), + /// Set the resource for the exporter. + SetResource(Arc), +} + +/// A [`LogProcessor`] that buffers log records and reports +/// them at a pre-configured interval from a dedicated background thread. +pub struct BatchLogProcessor { + message_sender: SyncSender, + handle: Mutex>>, + forceflush_timeout: Duration, + shutdown_timeout: Duration, + is_shutdown: AtomicBool, + + // Track dropped logs - we'll log this at shutdown + dropped_logs_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, +} + +impl Debug for BatchLogProcessor { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("BatchLogProcessor") + .field("message_sender", &self.message_sender) + .finish() + } +} + +impl LogProcessor for BatchLogProcessor { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "BatchLogProcessor.Emit.ProcessorShutdown", + message = "BatchLogProcessor has been shutdown. No further logs will be emitted." + ); + return; + } + + let result = self + .message_sender + .try_send(BatchMessage::ExportLog(Box::new(( + record.clone(), + instrumentation.clone(), + )))); + + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. + if result.is_err() { + // Increment dropped logs count. The first time we have to drop a log, + // emit a warning. + if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", + message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); + } + } + } + + fn force_flush(&self) -> LogResult<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return LogResult::Err(LogError::Other( + "BatchLogProcessor is already shutdown".into(), + )); + } + let (sender, receiver) = mpsc::sync_channel(1); + self.message_sender + .try_send(BatchMessage::ForceFlush(sender)) + .map_err(|err| LogError::Other(err.into()))?; + + receiver + .recv_timeout(self.forceflush_timeout) + .map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })? + } + + fn shutdown(&self) -> LogResult<()> { + // test and set is_shutdown flag if it is not set + if self + .is_shutdown + .swap(true, std::sync::atomic::Ordering::Relaxed) + { + otel_warn!( + name: "BatchLogProcessor.Shutdown.ProcessorShutdown", + message = "BatchLogProcessor has been shutdown. No further logs will be emitted." + ); + return LogResult::Err(LogError::AlreadyShutdown( + "BatchLogProcessor is already shutdown".into(), + )); + } + + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_logs > 0 { + otel_warn!( + name: "BatchLogProcessor.LogsDropped", + dropped_logs_count = dropped_logs, + max_queue_size = max_queue_size, + message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + + let (sender, receiver) = mpsc::sync_channel(1); + self.message_sender + .try_send(BatchMessage::Shutdown(sender)) + .map_err(|err| LogError::Other(err.into()))?; + + receiver + .recv_timeout(self.shutdown_timeout) + .map(|_| { + // join the background thread after receiving back the shutdown signal + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().unwrap(); + } + LogResult::Ok(()) + }) + .map_err(|err| match err { + RecvTimeoutError::Timeout => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Timeout", + message = "BatchLogProcessor shutdown timing out." + ); + LogError::ExportTimedOut(self.shutdown_timeout) + } + _ => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", err) + ); + LogError::Other(err.into()) + } + })? + } + + fn set_resource(&self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } +} + +impl BatchLogProcessor { + pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); + let max_queue_size = config.max_queue_size; + + let handle = thread::spawn(move || { + let mut last_export_time = Instant::now(); + let mut logs = Vec::new(); + logs.reserve(config.max_export_batch_size); + + loop { + let remaining_time_option = config + .scheduled_delay + .checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + + match message_receiver.recv_timeout(remaining_time) { + Ok(BatchMessage::ExportLog(log)) => { + logs.push(log); + if logs.len() == config.max_export_batch_size + || last_export_time.elapsed() >= config.scheduled_delay + { + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + } + } + Ok(BatchMessage::ForceFlush(sender)) => { + let result = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + let _ = sender.send(result); + } + Ok(BatchMessage::Shutdown(sender)) => { + let result = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + let _ = sender.send(result); + + // + // break out the loop and return from the current background thread. + // + break; + } + Ok(BatchMessage::SetResource(resource)) => { + exporter.set_resource(&resource); + } + Err(RecvTimeoutError::Timeout) => { + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + } + Err(err) => { + // TODO: this should not happen! Log the error and continue for now. + otel_error!( + name: "BatchLogProcessor.InternalError", + error = format!("{}", err) + ); + } + } + } + }); + + // Return batch processor with link to worker + BatchLogProcessor { + message_sender, + handle: Mutex::new(Some(handle)), + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable + is_shutdown: AtomicBool::new(false), + dropped_logs_count: AtomicUsize::new(0), + max_queue_size, + } + } + + /// Create a new batch processor builder + pub fn builder(exporter: E) -> BatchLogProcessorBuilder + where + E: LogExporter, + { + BatchLogProcessorBuilder { + exporter, + config: Default::default(), + } + } +} + +#[allow(clippy::vec_box)] +fn export_with_timeout_sync( + _: Duration, // TODO, enforcing timeout in exporter. + exporter: &mut E, + batch: Vec>, + last_export_time: &mut Instant, +) -> ExportResult +where + E: LogExporter + ?Sized, +{ + *last_export_time = Instant::now(); + + if batch.is_empty() { + return LogResult::Ok(()); + } + + let log_vec: Vec<(&LogRecord, &InstrumentationScope)> = batch + .iter() + .map(|log_data| (&log_data.0, &log_data.1)) + .collect(); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); + let export_result = futures_executor::block_on(export); + + match export_result { + Ok(_) => LogResult::Ok(()), + Err(err) => { + otel_error!( + name: "BatchLogProcessor.ExportError", + error = format!("{}", err) + ); + LogResult::Err(err) + } + } +} + +/// +/// A builder for creating [`BatchLogProcessor`] instances. +/// +#[derive(Debug)] +pub struct BatchLogProcessorBuilder { + exporter: E, + config: BatchConfig, +} + +impl BatchLogProcessorBuilder +where + E: LogExporter + 'static, +{ + /// Set the BatchConfig for [`BatchLogProcessorBuilder`] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchLogProcessorBuilder { config, ..self } + } + + /// Build a batch processor + pub fn build(self) -> BatchLogProcessor { + BatchLogProcessor::new(Box::new(self.exporter), self.config) + } +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessageWithAsyncRuntime { + /// Export logs, usually called when the log is emitted. + ExportLog((LogRecord, InstrumentationScope)), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + Flush(Option>), + /// Shut down the worker thread, push all logs in buffer to the backend. + Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), +} + /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a pre-configured interval. -pub struct BatchLogProcessor { - message_sender: R::Sender, +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub struct BatchLogProcessorWithAsyncRuntime { + message_sender: R::Sender, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -162,7 +505,8 @@ pub struct BatchLogProcessor { max_queue_size: usize, } -impl Debug for BatchLogProcessor { +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl Debug for BatchLogProcessorWithAsyncRuntime { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BatchLogProcessor") .field("message_sender", &self.message_sender) @@ -170,12 +514,15 @@ impl Debug for BatchLogProcessor { } } -impl LogProcessor for BatchLogProcessor { +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl LogProcessor for BatchLogProcessorWithAsyncRuntime { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - let result = self.message_sender.try_send(BatchMessage::ExportLog(( - record.clone(), - instrumentation.clone(), - ))); + let result = self + .message_sender + .try_send(BatchMessageWithAsyncRuntime::ExportLog(( + record.clone(), + instrumentation.clone(), + ))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { @@ -191,7 +538,7 @@ impl LogProcessor for BatchLogProcessor { fn force_flush(&self) -> LogResult<()> { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) + .try_send(BatchMessageWithAsyncRuntime::Flush(Some(res_sender))) .map_err(|err| LogError::Other(err.into()))?; futures_executor::block_on(res_receiver) @@ -213,7 +560,7 @@ impl LogProcessor for BatchLogProcessor { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) + .try_send(BatchMessageWithAsyncRuntime::Shutdown(res_sender)) .map_err(|err| LogError::Other(err.into()))?; futures_executor::block_on(res_receiver) @@ -225,11 +572,12 @@ impl LogProcessor for BatchLogProcessor { let resource = Arc::new(resource.clone()); let _ = self .message_sender - .try_send(BatchMessage::SetResource(resource)); + .try_send(BatchMessageWithAsyncRuntime::SetResource(resource)); } } -impl BatchLogProcessor { +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl BatchLogProcessorWithAsyncRuntime { pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); @@ -243,7 +591,7 @@ impl BatchLogProcessor { let ticker = inner_runtime .interval(config.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); + .map(|_| BatchMessageWithAsyncRuntime::Flush(None)); let timeout_runtime = inner_runtime.clone(); let mut logs = Vec::new(); let mut messages = Box::pin(stream::select(message_receiver, ticker)); @@ -251,7 +599,7 @@ impl BatchLogProcessor { while let Some(message) = messages.next().await { match message { // Log has finished, add to buffer of pending logs. - BatchMessage::ExportLog(log) => { + BatchMessageWithAsyncRuntime::ExportLog(log) => { logs.push(log); if logs.len() == config.max_export_batch_size { let result = export_with_timeout( @@ -271,7 +619,7 @@ impl BatchLogProcessor { } } // Log batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush(res_channel) => { + BatchMessageWithAsyncRuntime::Flush(res_channel) => { let result = export_with_timeout( config.max_export_timeout, exporter.as_mut(), @@ -290,7 +638,7 @@ impl BatchLogProcessor { } } // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { + BatchMessageWithAsyncRuntime::Shutdown(ch) => { let result = export_with_timeout( config.max_export_timeout, exporter.as_mut(), @@ -310,7 +658,7 @@ impl BatchLogProcessor { break; } // propagate the resource - BatchMessage::SetResource(resource) => { + BatchMessageWithAsyncRuntime::SetResource(resource) => { exporter.set_resource(&resource); } } @@ -318,7 +666,7 @@ impl BatchLogProcessor { })); // Return batch processor with link to worker - BatchLogProcessor { + BatchLogProcessorWithAsyncRuntime { message_sender, dropped_logs_count: AtomicUsize::new(0), max_queue_size, @@ -326,11 +674,11 @@ impl BatchLogProcessor { } /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorBuilder + pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorWithAsyncRuntimeBuilder where E: LogExporter, { - BatchLogProcessorBuilder { + BatchLogProcessorWithAsyncRuntimeBuilder { exporter, config: Default::default(), runtime, @@ -338,6 +686,7 @@ impl BatchLogProcessor { } } +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] async fn export_with_timeout( time_out: Duration, exporter: &mut E, @@ -370,6 +719,7 @@ where /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. #[derive(Debug)] +#[allow(dead_code)] pub struct BatchConfig { /// The maximum queue size to buffer logs for delayed processing. If the /// queue gets full it drops the logs. The default value of is 2048. @@ -507,46 +857,33 @@ impl BatchConfigBuilder { } } -/// A builder for creating [`BatchLogProcessor`] instances. +/// A builder for creating [`BatchLogProcessorWithAsyncRuntime`] instances. /// #[derive(Debug)] -pub struct BatchLogProcessorBuilder { +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub struct BatchLogProcessorWithAsyncRuntimeBuilder { exporter: E, config: BatchConfig, runtime: R, } -impl BatchLogProcessorBuilder +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl BatchLogProcessorWithAsyncRuntimeBuilder where E: LogExporter + 'static, R: RuntimeChannel, { - /// Set the BatchConfig for [`BatchLogProcessorBuilder`] + /// Set the BatchConfig for [`BatchLogProcessorWithAsyncRuntimeBuilder`] pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchLogProcessorBuilder { config, ..self } + BatchLogProcessorWithAsyncRuntimeBuilder { config, ..self } } /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) + pub fn build(self) -> BatchLogProcessorWithAsyncRuntime { + BatchLogProcessorWithAsyncRuntime::new(Box::new(self.exporter), self.config, self.runtime) } } -/// Messages sent between application thread and batch log processor's work thread. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export logs, usually called when the log is emitted. - ExportLog((LogRecord, InstrumentationScope)), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} - #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { use super::{ @@ -565,7 +902,6 @@ mod tests { }, BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, }, - runtime, testing::logs::InMemoryLogExporter, Resource, }; @@ -579,6 +915,11 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + use super::BatchLogProcessorWithAsyncRuntime; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + use crate::runtime; + #[derive(Debug, Clone)] struct MockLogExporter { resource: Arc>>, @@ -712,8 +1053,7 @@ mod tests { (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -733,8 +1073,7 @@ mod tests { env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); temp_env::with_vars(env_vars, || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); @@ -749,8 +1088,8 @@ mod tests { .with_max_queue_size(4) .build(); - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio) - .with_batch_config(expected); + let builder = + BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected); let actual = &builder.config; assert_eq!(actual.max_export_batch_size, 1); @@ -787,11 +1126,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource( @@ -806,7 +1141,10 @@ mod tests { .build(), ) .build(); - tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking? + + // wait for the batch processor to process the resource. + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); let _ = provider.shutdown(); } @@ -818,11 +1156,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default() .keep_records_on_shutdown() .build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); let mut record = LogRecord::default(); let instrumentation = InstrumentationScope::default(); @@ -859,11 +1193,21 @@ mod tests { assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) } + #[tokio::test(flavor = "current_thread")] + async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] #[tokio::test(flavor = "current_thread")] #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_multi_thread() { + async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread( + ) { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( + let processor = BatchLogProcessorWithAsyncRuntime::new( Box::new(exporter.clone()), BatchConfig::default(), runtime::Tokio, @@ -878,11 +1222,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -890,11 +1230,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::Tokio, - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -902,11 +1238,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - runtime::TokioCurrentThread, - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 3643f7ecf2..b1341b9e96 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -12,6 +12,11 @@ pub use log_processor::{ }; pub use record::{LogRecord, TraceContext}; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub use log_processor::{ + BatchLogProcessorWithAsyncRuntime, BatchLogProcessorWithAsyncRuntimeBuilder, +}; + #[cfg(all(test, feature = "testing"))] mod tests { use super::*; diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index f82bbd9638..fa94f47be1 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -59,7 +59,6 @@ let counter = meter.u64_counter("my_counter").build(); These changes shouldn't directly affect the users of OpenTelemetry crate, as these constructs are used in SDK and Exporters. If you are an author of an sdk component/plug-in, like an exporter etc. please use these types from sdk. Refer [CHANGELOG.md](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/CHANGELOG.md) for more details, under same version section. - **Breaking** [2291](https://github.com/open-telemetry/opentelemetry-rust/pull/2291) Rename `logs_level_enabled flag` to `spec_unstable_logs_enabled`. Please enable this updated flag if the feature is needed. This flag will be removed once the feature is stabilized in the specifications. - ## v0.26.0 Released 2024-Sep-30