Skip to content

Commit

Permalink
Merge branch 'main' into log-attributes-update
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Dec 19, 2024
2 parents c8fdd05 + c617be7 commit 814fafd
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn init_tracer() -> sdktrace::TracerProvider {

struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);

impl<'a> Injector for MetadataMap<'a> {
impl Injector for MetadataMap<'_> {
/// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod hello_world {

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMap<'a> {
impl Extractor for MetadataMap<'_> {
/// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod any_value {
pub(crate) fn serialize(value: log::kv::Value) -> Option<AnyValue> {
struct ValueVisitor(Option<AnyValue>);

impl<'kvs> log::kv::VisitValue<'kvs> for ValueVisitor {
impl log::kv::VisitValue<'_> for ValueVisitor {
fn visit_any(&mut self, value: log::kv::Value) -> Result<(), log::kv::Error> {
self.0 = Some(AnyValue::String(StringValue::from(value.to_string())));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs};
use std::fs::File;

Expand Down Expand Up @@ -96,9 +97,9 @@ impl std::fmt::Debug for LogRecordWrapper {
}

// read a file contains ResourceSpans in json format
pub fn read_logs_from_json(file: File) -> Vec<ResourceLogs> {
pub fn read_logs_from_json(file: File) -> Result<Vec<ResourceLogs>> {
let reader = std::io::BufReader::new(file);

let log_data: LogsData = serde_json::from_reader(reader).unwrap();
log_data.resource_logs
let log_data: LogsData = serde_json::from_reader(reader)?;
Ok(log_data.resource_logs)
}
59 changes: 51 additions & 8 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,33 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
mod logtests {
use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::{fs::File, time::Duration};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
pub fn test_assert_logs_eq_failure() {
let left = read_logs_from_json(File::open("./expected/logs.json").unwrap());
let right = read_logs_from_json(File::open("./expected/failed_logs.json").unwrap());
let left = read_logs_from_json(
File::open("./expected/logs.json").expect("failed to open expected file"),
)
.expect("Failed to read logs from expected file");

let right = read_logs_from_json(
File::open("./expected/failed_logs.json")
.expect("failed to open expected failed log file"),
)
.expect("Failed to read logs from expected failed log file");
LogsAsserter::new(right, left).assert();
}

#[test]
pub fn test_assert_logs_eq() {
let logs = read_logs_from_json(File::open("./expected/logs.json").unwrap());
pub fn test_assert_logs_eq() -> Result<()> {
let logs = read_logs_from_json(File::open("./expected/logs.json")?)?;
LogsAsserter::new(logs.clone(), logs).assert();

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down Expand Up @@ -84,15 +98,44 @@ mod logtests {

Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchLogProcessor.
let rt = tokio::runtime::Runtime::new()?;
let logger_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
test_utils::start_collector_container().await?;
init_logs()
})?;

info!("LoggerProvider created");
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
}
let _ = logger_provider.shutdown();
// tokio::time::sleep(Duration::from_secs(10)).await;
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json");

Ok(())
}
}

pub fn assert_logs_results(result: &str, expected: &str) {
let left = read_logs_from_json(File::open(expected).unwrap());
let right = read_logs_from_json(File::open(result).unwrap());
pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0)
assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
Ok(())
}

///
Expand Down
172 changes: 172 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,4 +1646,176 @@ mod tests {

assert_eq!(exporter.len(), 1);
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);

assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}

#[test]
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
fn test_build_batch_log_processor_builder_rt_with_custom_config() {
let expected = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();

let builder = BatchLogProcessorWithAsyncRuntime::builder(
InMemoryLogExporter::default(),
runtime::Tokio,
)
.with_batch_config(expected);

let actual = &builder.config;
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_resource_batch_processor_rt() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown_rt() {
// assert we will receive an error
// setup
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

let mut record = LogRecord::default();
let instrumentation = InstrumentationScope::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

//
// deadloack happens in shutdown with tokio current_thread runtime
//
processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
{
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);

processor.shutdown().unwrap();
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessorWithAsyncRuntime::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);

processor.shutdown().unwrap();
}
}
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::{Arc, Mutex};
/// let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct LogDataWithResource {
/// let exporter: InMemoryLogExporter = InMemoryLogExporterBuilder::default().build();
/// //Create a LoggerProvider and register the exporter
/// let logger_provider = LoggerProvider::builder()
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_log_processor(BatchLogProcessor::builder(exporter.clone()).build())
/// .build();
/// // Setup Log Appenders and emit logs. (Not shown here)
/// logger_provider.force_flush();
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-zipkin/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct ZipkinPipelineBuilder {
impl Default for ZipkinPipelineBuilder {
fn default() -> Self {
let timeout = env::get_timeout();

ZipkinPipelineBuilder {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Arc::new(
Expand Down
6 changes: 3 additions & 3 deletions stress/src/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ struct UnsafeSlice<'a> {
slice: &'a [UnsafeCell<WorkerStats>],
}

unsafe impl<'a> Send for UnsafeSlice<'a> {}
unsafe impl<'a> Sync for UnsafeSlice<'a> {}
unsafe impl Send for UnsafeSlice<'_> {}
unsafe impl Sync for UnsafeSlice<'_> {}

impl<'a> UnsafeSlice<'a> {
fn new(slice: &'a mut [WorkerStats]) -> Self {
Expand All @@ -155,7 +155,7 @@ impl<'a> UnsafeSlice<'a> {
#[inline(always)]
unsafe fn increment(&self, i: usize) {
let value = self.slice[i].get();
(*value).count = (*value).count + 1;
(*value).count += 1;
}

#[inline(always)]
Expand Down

0 comments on commit 814fafd

Please sign in to comment.