From acf16ed451f721b33b6baa94958399b9bcab6ac5 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 18 Dec 2024 18:33:15 -0800 Subject: [PATCH] Tweaks to logs integration test (#2453) --- .../tests/integration_test/Cargo.toml | 5 +- .../expected/failed_logs.json | 4 +- .../tests/integration_test/expected/logs.json | 4 +- .../integration_test/src/logs_asserter.rs | 2 + .../tests/integration_test/tests/logs.rs | 88 ++++++------ opentelemetry-sdk/src/logs/log_processor.rs | 129 +++++++++--------- 6 files changed, 124 insertions(+), 108 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 60b4869f1e..5314c1fe61 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -8,7 +8,6 @@ publish = false opentelemetry = { path = "../../../opentelemetry", features = [] } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "testing"] } opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "metrics", "with-serde"] } -log = { workspace = true } tokio = { version = "1.0", features = ["full"] } serde_json = "1" testcontainers = { version = "0.23.1", features = ["http_wait"]} @@ -16,10 +15,10 @@ once_cell.workspace = true anyhow = "1.0.94" ctor = "0.2.9" tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } -tracing = "0.1.41" +tracing = {workspace = true} [target.'cfg(unix)'.dependencies] -opentelemetry-appender-log = { path = "../../../opentelemetry-appender-log", default-features = false} +opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} opentelemetry-otlp = { path = "../../../opentelemetry-otlp", default-features = false } opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } diff --git a/opentelemetry-otlp/tests/integration_test/expected/failed_logs.json b/opentelemetry-otlp/tests/integration_test/expected/failed_logs.json index 923316dfed..97d5fc407d 100644 --- a/opentelemetry-otlp/tests/integration_test/expected/failed_logs.json +++ b/opentelemetry-otlp/tests/integration_test/expected/failed_logs.json @@ -14,8 +14,8 @@ "scopeLogs": [ { "scope": { - "name": "opentelemetry-log-appender", - "version": "0.3.0" + "name": "my-target", + "version": "" }, "logRecords": [ { diff --git a/opentelemetry-otlp/tests/integration_test/expected/logs.json b/opentelemetry-otlp/tests/integration_test/expected/logs.json index 4653189c82..5c1ee6cf2b 100644 --- a/opentelemetry-otlp/tests/integration_test/expected/logs.json +++ b/opentelemetry-otlp/tests/integration_test/expected/logs.json @@ -14,8 +14,8 @@ "scopeLogs": [ { "scope": { - "name": "opentelemetry-log-appender", - "version": "0.3.0" + "name": "my-target", + "version": "" }, "logRecords": [ { diff --git a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs index da045691f5..8caa49393d 100644 --- a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs @@ -43,6 +43,8 @@ impl LogsAsserter { let result_scope_logs = &result_resource_logs.scope_logs[i]; let expected_scope_logs = &expected_resource_logs.scope_logs[i]; + assert_eq!(result_scope_logs.scope, expected_scope_logs.scope); + results_logs.extend(result_scope_logs.log_records.clone()); expected_logs.extend(expected_scope_logs.log_records.clone()); } diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 5ae774cc76..a20f84475f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -4,14 +4,11 @@ use anyhow::Result; use ctor::dtor; use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; use integration_test_runner::test_utils; -use log::{info, Level}; -use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; use std::os::unix::fs::MetadataExt; -use std::time::Duration; fn init_logs() -> Result { let exporter_builder = LogExporter::builder(); @@ -37,27 +34,56 @@ fn init_logs() -> Result { .build()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -pub async fn test_logs() -> Result<()> { - // Make sure the container is running - test_utils::start_collector_container().await?; - - let logger_provider = init_logs().unwrap(); - let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); - log::set_boxed_logger(Box::new(otel_log_appender))?; - log::set_max_level(Level::Info.to_level_filter()); - - info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); - - // TODO: remove below wait before calling logger_provider.shutdown() - tokio::time::sleep(Duration::from_secs(10)).await; - let _ = logger_provider.shutdown(); - - tokio::time::sleep(Duration::from_secs(10)).await; - - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); - - Ok(()) +#[cfg(test)] +mod logtests { + use super::*; + use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; + use std::{fs::File, time::Duration}; + #[test] + #[should_panic(expected = "assertion `left == right` failed: body does not match")] + pub fn test_assert_logs_eq_failure() { + let left = read_logs_from_json(File::open("./expected/logs.json").unwrap()); + let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap()); + LogsAsserter::new(right, left).assert(); + } + + #[test] + pub fn test_assert_logs_eq() { + let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap()); + LogsAsserter::new(logs.clone(), logs).assert(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[cfg(not(feature = "hyper-client"))] + #[cfg(not(feature = "reqwest-client"))] + pub async fn test_logs() -> Result<()> { + // Make sure the container is running + + use integration_test_runner::test_utils; + use opentelemetry_appender_tracing::layer; + use tracing::info; + use tracing_subscriber::layer::SubscriberExt; + + use crate::{assert_logs_results, init_logs}; + test_utils::start_collector_container().await?; + + let logger_provider = init_logs().unwrap(); + let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + } + // TODO: remove below wait before calling logger_provider.shutdown() + // tokio::time::sleep(Duration::from_secs(10)).await; + let _ = logger_provider.shutdown(); + + tokio::time::sleep(Duration::from_secs(10)).await; + + assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + + Ok(()) + } } pub fn assert_logs_results(result: &str, expected: &str) { @@ -69,20 +95,6 @@ pub fn assert_logs_results(result: &str, expected: &str) { assert!(File::open(result).unwrap().metadata().unwrap().size() > 0) } -#[test] -#[should_panic(expected = "assertion `left == right` failed: body does not match")] -pub fn test_assert_logs_eq_failure() { - let left = read_logs_from_json(File::open("./expected/logs.json").unwrap()); - let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap()); - LogsAsserter::new(right, left).assert(); -} - -#[test] -pub fn test_assert_logs_eq() { - let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap()); - LogsAsserter::new(logs.clone(), logs).assert(); -} - /// /// Make sure we stop the collector container, otherwise it will sit around hogging our /// ports and subsequent test runs will fail. diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 831ec1e522..cff0d44be2 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -321,78 +321,81 @@ impl BatchLogProcessor { let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; - let handle = thread::spawn(move || { - let mut last_export_time = Instant::now(); - let mut logs = Vec::new(); - logs.reserve(config.max_export_batch_size); - - loop { - let remaining_time_option = config - .scheduled_delay - .checked_sub(last_export_time.elapsed()); - let remaining_time = match remaining_time_option { - Some(remaining_time) => remaining_time, - None => config.scheduled_delay, - }; - - match message_receiver.recv_timeout(remaining_time) { - Ok(BatchMessage::ExportLog(log)) => { - logs.push(log); - if logs.len() == config.max_export_batch_size - || last_export_time.elapsed() >= config.scheduled_delay - { - let _ = export_with_timeout_sync( + let handle = thread::Builder::new() + .name("OpenTelemetry.Logs.BatchProcessor".to_string()) + .spawn(move || { + let mut last_export_time = Instant::now(); + let mut logs = Vec::new(); + logs.reserve(config.max_export_batch_size); + + loop { + let remaining_time_option = config + .scheduled_delay + .checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + + match message_receiver.recv_timeout(remaining_time) { + Ok(BatchMessage::ExportLog(log)) => { + logs.push(log); + if logs.len() == config.max_export_batch_size + || last_export_time.elapsed() >= config.scheduled_delay + { + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + } + } + Ok(BatchMessage::ForceFlush(sender)) => { + let result = export_with_timeout_sync( remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time, ); + let _ = sender.send(result); } - } - Ok(BatchMessage::ForceFlush(sender)) => { - let result = export_with_timeout_sync( - remaining_time, - exporter.as_mut(), - logs.split_off(0), - &mut last_export_time, - ); - let _ = sender.send(result); - } - Ok(BatchMessage::Shutdown(sender)) => { - let result = export_with_timeout_sync( - remaining_time, - exporter.as_mut(), - logs.split_off(0), - &mut last_export_time, - ); - let _ = sender.send(result); + Ok(BatchMessage::Shutdown(sender)) => { + let result = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + let _ = sender.send(result); - // - // break out the loop and return from the current background thread. - // - break; - } - Ok(BatchMessage::SetResource(resource)) => { - exporter.set_resource(&resource); - } - Err(RecvTimeoutError::Timeout) => { - let _ = export_with_timeout_sync( - remaining_time, - exporter.as_mut(), - logs.split_off(0), - &mut last_export_time, - ); - } - Err(err) => { - // TODO: this should not happen! Log the error and continue for now. - otel_error!( - name: "BatchLogProcessor.InternalError", - error = format!("{}", err) - ); + // + // break out the loop and return from the current background thread. + // + break; + } + Ok(BatchMessage::SetResource(resource)) => { + exporter.set_resource(&resource); + } + Err(RecvTimeoutError::Timeout) => { + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); + } + Err(err) => { + // TODO: this should not happen! Log the error and continue for now. + otel_error!( + name: "BatchLogProcessor.InternalError", + error = format!("{}", err) + ); + } } } - } - }); + }) + .expect("Thread spawn failed."); //TODO: Handle thread spawn failure // Return batch processor with link to worker BatchLogProcessor {