Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Dec 19, 2024
1 parent 5c8c644 commit d1f3cbb
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -318,7 +319,10 @@ impl LogProcessor for BatchLogProcessor {
}

impl BatchLogProcessor {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig) -> Self {
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
where
E: LogExporter + Send + Sync + 'static,
{
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
let max_queue_size = config.max_queue_size;

Expand Down Expand Up @@ -346,7 +350,7 @@ impl BatchLogProcessor {
{
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
Expand All @@ -355,7 +359,7 @@ impl BatchLogProcessor {
Ok(BatchMessage::ForceFlush(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
Expand All @@ -364,7 +368,7 @@ impl BatchLogProcessor {
Ok(BatchMessage::Shutdown(sender)) => {
let result = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
Expand All @@ -381,7 +385,7 @@ impl BatchLogProcessor {
Err(RecvTimeoutError::Timeout) => {
let _ = export_with_timeout_sync(
remaining_time,
exporter.as_mut(),
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
Expand Down Expand Up @@ -442,7 +446,8 @@ where
.iter()
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
let export = exporter.export(LogBatch::new(log_vec.as_slice()));
let log_batch = LogBatch::new(log_vec.as_slice());
let export = exporter.export(&log_batch);
let export_result = futures_executor::block_on(export);

match export_result {
Expand Down Expand Up @@ -477,7 +482,7 @@ where

/// Build a batch processor
pub fn build(self) -> BatchLogProcessor {
BatchLogProcessor::new(Box::new(self.exporter), self.config)
BatchLogProcessor::new(self.exporter, self.config)
}
}

Expand Down Expand Up @@ -569,7 +574,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessorWithAsyncRuntime<R> {

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
impl<R: RuntimeChannel> BatchLogProcessorWithAsyncRuntime<R> {
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
where
E: LogExporter + Send + Sync + 'static,
{
Expand Down Expand Up @@ -1125,7 +1130,7 @@ mod tests {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessor::new(exporter.clone(),, BatchConfig::default());
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(
Expand Down Expand Up @@ -1195,7 +1200,7 @@ mod tests {
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default());
let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());

processor.shutdown().unwrap();
}
Expand Down

0 comments on commit d1f3cbb

Please sign in to comment.