diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index c871e9ca8d..29a9353621 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -22,7 +22,7 @@ fn init_tracer() -> sdktrace::TracerProvider { struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); -impl<'a> Injector for MetadataMap<'a> { +impl Injector for MetadataMap<'_> { /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs fn set(&mut self, key: &str, value: String) { if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index aadb77b6e6..24a9e09481 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -29,7 +29,7 @@ pub mod hello_world { struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); -impl<'a> Extractor for MetadataMap<'a> { +impl Extractor for MetadataMap<'_> { /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None fn get(&self, key: &str) -> Option<&str> { self.0.get(key).and_then(|metadata| metadata.to_str().ok()) diff --git a/opentelemetry-appender-log/src/lib.rs b/opentelemetry-appender-log/src/lib.rs index 2cc8b1e0fe..81ec10d129 100644 --- a/opentelemetry-appender-log/src/lib.rs +++ b/opentelemetry-appender-log/src/lib.rs @@ -239,7 +239,7 @@ mod any_value { pub(crate) fn serialize(value: log::kv::Value) -> Option { struct ValueVisitor(Option); - impl<'kvs> log::kv::VisitValue<'kvs> for ValueVisitor { + impl log::kv::VisitValue<'_> for ValueVisitor { fn visit_any(&mut self, value: log::kv::Value) -> Result<(), log::kv::Error> { self.0 = Some(AnyValue::String(StringValue::from(value.to_string()))); diff --git a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs index 8caa49393d..887ecb2a67 100644 --- a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs}; use std::fs::File; @@ -96,9 +97,9 @@ impl std::fmt::Debug for LogRecordWrapper { } // read a file contains ResourceSpans in json format -pub fn read_logs_from_json(file: File) -> Vec { +pub fn read_logs_from_json(file: File) -> Result> { let reader = std::io::BufReader::new(file); - let log_data: LogsData = serde_json::from_reader(reader).unwrap(); - log_data.resource_logs + let log_data: LogsData = serde_json::from_reader(reader)?; + Ok(log_data.resource_logs) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index a20f84475f..39eb88a1e4 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -38,19 +38,33 @@ fn init_logs() -> Result { mod logtests { use super::*; use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; + use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::{fs::File, time::Duration}; + use tracing::info; + use tracing_subscriber::layer::SubscriberExt; + #[test] #[should_panic(expected = "assertion `left == right` failed: body does not match")] pub fn test_assert_logs_eq_failure() { - let left = read_logs_from_json(File::open("./expected/logs.json").unwrap()); - let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap()); + let left = read_logs_from_json( + File::open("./expected/logs.json").expect("failed to open expected file"), + ) + .expect("Failed to read logs from expected file"); + + let right = read_logs_from_json( + File::open("./expected/failed_logs.json") + .expect("failed to open expected failed log file"), + ) + .expect("Failed to read logs from expected failed log file"); LogsAsserter::new(right, left).assert(); } #[test] - pub fn test_assert_logs_eq() { - let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap()); + pub fn test_assert_logs_eq() -> Result<()> { + let logs = read_logs_from_json(File::open("./expected/logs.json")?)?; LogsAsserter::new(logs.clone(), logs).assert(); + + Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -84,15 +98,44 @@ mod logtests { Ok(()) } + + #[test] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub fn logs_batch_non_tokio_main() -> Result<()> { + // Initialize the logger provider inside a tokio runtime + // as this allows tonic client to capture the runtime, + // but actual export occurs from the dedicated std::thread + // created by BatchLogProcessor. + let rt = tokio::runtime::Runtime::new()?; + let logger_provider = rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + test_utils::start_collector_container().await?; + init_logs() + })?; + + info!("LoggerProvider created"); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + } + let _ = logger_provider.shutdown(); + // tokio::time::sleep(Duration::from_secs(10)).await; + assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + + Ok(()) + } } -pub fn assert_logs_results(result: &str, expected: &str) { - let left = read_logs_from_json(File::open(expected).unwrap()); - let right = read_logs_from_json(File::open(result).unwrap()); +pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> { + let left = read_logs_from_json(File::open(expected)?)?; + let right = read_logs_from_json(File::open(result)?)?; LogsAsserter::new(left, right).assert(); - assert!(File::open(result).unwrap().metadata().unwrap().size() > 0) + assert!(File::open(result).unwrap().metadata().unwrap().size() > 0); + Ok(()) } /// diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index dab1dde2ac..8ced33163b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1646,4 +1646,176 @@ mod tests { assert_eq!(exporter.len(), 1); } + + #[test] + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + fn test_build_batch_log_processor_builder_rt() { + let mut env_vars = vec![ + (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")), + (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), + (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), + ]; + temp_env::with_vars(env_vars.clone(), || { + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ); + + assert_eq!(builder.config.max_export_batch_size, 500); + assert_eq!( + builder.config.scheduled_delay, + Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT + ); + assert_eq!( + builder.config.max_export_timeout, + Duration::from_millis(2046) + ); + }); + + env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); + + temp_env::with_vars(env_vars, || { + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ); + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); + }); + } + + #[test] + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + fn test_build_batch_log_processor_builder_rt_with_custom_config() { + let expected = BatchConfigBuilder::default() + .with_max_export_batch_size(1) + .with_scheduled_delay(Duration::from_millis(2)) + .with_max_export_timeout(Duration::from_millis(3)) + .with_max_queue_size(4) + .build(); + + let builder = BatchLogProcessorWithAsyncRuntime::builder( + InMemoryLogExporter::default(), + runtime::Tokio, + ) + .with_batch_config(expected); + + let actual = &builder.config; + assert_eq!(actual.max_export_batch_size, 1); + assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); + assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); + assert_eq!(actual.max_queue_size, 4); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_set_resource_batch_processor_rt() { + let exporter = MockLogExporter { + resource: Arc::new(Mutex::new(None)), + }; + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + let provider = LoggerProvider::builder() + .with_log_processor(processor) + .with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + KeyValue::new("k5", "v5"), + ])) + .build(); + tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking? + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); + let _ = provider.shutdown(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_shutdown_rt() { + // assert we will receive an error + // setup + let exporter = InMemoryLogExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + let mut record = LogRecord::default(); + let instrumentation = InstrumentationScope::default(); + + processor.emit(&mut record, &instrumentation); + processor.force_flush().unwrap(); + processor.shutdown().unwrap(); + // todo: expect to see errors here. How should we assert this? + processor.emit(&mut record, &instrumentation); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + // + // deadloack happens in shutdown with tokio current_thread runtime + // + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "current_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread() + { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::TokioCurrentThread, + ); + + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + processor.shutdown().unwrap(); + } + + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::TokioCurrentThread, + ); + + processor.shutdown().unwrap(); + } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 4ed62e90a1..cbc16bdfa3 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex}; /// let exporter: InMemoryLogExporter = InMemoryLogExporter::default(); /// //Create a LoggerProvider and register the exporter /// let logger_provider = LoggerProvider::builder() -/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build()) /// .build(); /// // Setup Log Appenders and emit logs. (Not shown here) /// logger_provider.force_flush(); @@ -84,7 +84,7 @@ pub struct LogDataWithResource { /// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build(); /// //Create a LoggerProvider and register the exporter /// let logger_provider = LoggerProvider::builder() -/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build()) /// .build(); /// // Setup Log Appenders and emit logs. (Not shown here) /// logger_provider.force_flush(); diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 889061a40e..bb18c1473b 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -54,6 +54,7 @@ pub struct ZipkinPipelineBuilder { impl Default for ZipkinPipelineBuilder { fn default() -> Self { let timeout = env::get_timeout(); + ZipkinPipelineBuilder { #[cfg(feature = "reqwest-blocking-client")] client: Some(Arc::new( diff --git a/stress/src/throughput.rs b/stress/src/throughput.rs index 8116f904ee..131762fab7 100644 --- a/stress/src/throughput.rs +++ b/stress/src/throughput.rs @@ -140,8 +140,8 @@ struct UnsafeSlice<'a> { slice: &'a [UnsafeCell], } -unsafe impl<'a> Send for UnsafeSlice<'a> {} -unsafe impl<'a> Sync for UnsafeSlice<'a> {} +unsafe impl Send for UnsafeSlice<'_> {} +unsafe impl Sync for UnsafeSlice<'_> {} impl<'a> UnsafeSlice<'a> { fn new(slice: &'a mut [WorkerStats]) -> Self { @@ -155,7 +155,7 @@ impl<'a> UnsafeSlice<'a> { #[inline(always)] unsafe fn increment(&self, i: usize) { let value = self.slice[i].get(); - (*value).count = (*value).count + 1; + (*value).count += 1; } #[inline(always)]