From c617be744971ebbf7b5dc6a8ec2264cd1095afeb Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 19 Dec 2024 22:57:42 +0530 Subject: [PATCH] chore: bring back BatchLogProcessorWithAsyncRuntime unit tests (#2457) --- opentelemetry-sdk/src/logs/log_processor.rs | 172 ++++++++++++++++++ .../src/testing/logs/in_memory_exporter.rs | 4 +- 2 files changed, 174 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index cff0d44be2..47d410c381 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1580,4 +1580,176 @@ mod tests { assert_eq!(exporter.len(), 1); } + + #[test] + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + fn test_build_batch_log_processor_builder_rt() { + let mut env_vars = vec![ + (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), + (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), + (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), + ]; + temp_env::with_vars(env_vars.clone(), || { + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ); + + assert_eq!(builder.config.max_export_batch_size, 500); + assert_eq!( + builder.config.scheduled_delay, + Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT + ); + assert_eq!( + builder.config.max_export_timeout, + Duration::from_millis(2046) + ); + }); + + env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); + + temp_env::with_vars(env_vars, || { + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ); + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); + }); + } + + #[test] + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + fn test_build_batch_log_processor_builder_rt_with_custom_config() { + let expected = BatchConfigBuilder::default() + .with_max_export_batch_size(1) + .with_scheduled_delay(Duration::from_millis(2)) + .with_max_export_timeout(Duration::from_millis(3)) + .with_max_queue_size(4) + .build(); + + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ) + .with_batch_config(expected); + + let actual = &builder.config; + assert_eq!(actual.max_export_batch_size, 1); + assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); + assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); + assert_eq!(actual.max_queue_size, 4); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_set_resource_batch_processor_rt() { + let exporter = MockLogExporter { + resource: Arc::new(Mutex::new(None)), + }; + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + let provider = LoggerProvider::builder() + .with_log_processor(processor) + .with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + KeyValue::new("k5", "v5"), + ])) + .build(); + tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking? + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); + let _ = provider.shutdown(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_shutdown_rt() { + // assert we will receive an error + // setup + let exporter = InMemoryLogExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + let mut record = LogRecord::default(); + let instrumentation = InstrumentationScope::default(); + + processor.emit(&mut record, &instrumentation); + processor.force_flush().unwrap(); + processor.shutdown().unwrap(); + // todo: expect to see errors here. How should we assert this? + processor.emit(&mut record, &instrumentation); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + // + // deadloack happens in shutdown with tokio current_thread runtime + // + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "current_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread() + { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::TokioCurrentThread, + ); + + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::TokioCurrentThread, + ); + + processor.shutdown().unwrap(); + } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 4ed62e90a1..cbc16bdfa3 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex}; /// let exporter: InMemoryLogExporter = InMemoryLogExporter::default(); /// //Create a LoggerProvider and register the exporter /// let logger_provider = LoggerProvider::builder() -/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build()) /// .build(); /// // Setup Log Appenders and emit logs. (Not shown here) /// logger_provider.force_flush(); @@ -84,7 +84,7 @@ pub struct LogDataWithResource { /// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build(); /// //Create a LoggerProvider and register the exporter /// let logger_provider = LoggerProvider::builder() -/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build()) /// .build(); /// // Setup Log Appenders and emit logs. (Not shown here) /// logger_provider.force_flush();