Skip to content

Commit

Permalink
chore: bring back BatchLogProcessorWithAsyncRuntime unit tests (#2457)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Dec 19, 2024
1 parent 0fc0764 commit c617be7
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 2 deletions.
172 changes: 172 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,4 +1580,176 @@ mod tests {

assert_eq!(exporter.len(), 1);
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);

assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt_with_custom_config() {
let expected = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();

let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
)
.with_batch_config(expected);

let actual = &builder.config;
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_resource_batch_processor_rt() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown_rt() {
// assert we will receive an error
// setup
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

let mut record = LogRecord::default();
let instrumentation = InstrumentationScope::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

//
// deadloack happens in shutdown with tokio current_thread runtime
//
processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
{
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}
}
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex};
/// let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct LogDataWithResource {
/// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down

0 comments on commit c617be7

Please sign in to comment.