diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 3ae4d245c9..ec237102ed 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -9,7 +9,8 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, @@ -318,7 +319,10 @@ impl LogProcessor for BatchLogProcessor { } impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + pub(crate) fn new(mut exporter: E, config: BatchConfig) -> Self + where + E: LogExporter + Send + Sync + 'static, + { let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; @@ -346,7 +350,7 @@ impl BatchLogProcessor { { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -355,7 +359,7 @@ impl BatchLogProcessor { Ok(BatchMessage::ForceFlush(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -364,7 +368,7 @@ impl BatchLogProcessor { Ok(BatchMessage::Shutdown(sender)) => { let result = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -381,7 +385,7 @@ impl BatchLogProcessor { Err(RecvTimeoutError::Timeout) => { let _ = export_with_timeout_sync( remaining_time, - exporter.as_mut(), + &mut exporter, logs.split_off(0), &mut last_export_time, ); @@ -442,7 +446,8 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let export = exporter.export(LogBatch::new(log_vec.as_slice())); + let log_batch = LogBatch::new(log_vec.as_slice()); + let export = exporter.export(&log_batch); let export_result = futures_executor::block_on(export); match export_result { @@ -477,7 +482,7 @@ where /// Build a batch processor pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config) + BatchLogProcessor::new(self.exporter, self.config) } } @@ -569,7 +574,7 @@ impl LogProcessor for BatchLogProcessorWithAsyncRuntime { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] impl BatchLogProcessorWithAsyncRuntime { - pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self + pub(crate) fn new(mut exporter: E, config: BatchConfig, runtime: R) -> Self where E: LogExporter + Send + Sync + 'static, { @@ -1125,7 +1130,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new(exporter.clone(),, BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource( @@ -1195,7 +1200,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); + let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default()); processor.shutdown().unwrap(); }