Skip to content

Commit

Permalink
Minor followups to LogProcessor (#2464)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Dec 23, 2024
1 parent 1f35467 commit ef49833
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use opentelemetry_sdk::logs::LogError;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::metrics::MetricError;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{trace as sdktrace, Resource};
use std::error::Error;
use tracing::info;
use tracing_subscriber::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)
#[cfg(feature = "grpc-tonic")]
use opentelemetry::otel_debug;
use std::fmt::Debug;

Expand Down
15 changes: 11 additions & 4 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};

#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
Expand Down Expand Up @@ -207,7 +207,6 @@ impl LogProcessor for BatchLogProcessor {
instrumentation.clone(),
))));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
// emit a warning.
Expand Down Expand Up @@ -317,9 +316,14 @@ impl BatchLogProcessor {
let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchLogProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),
max_export_batch_size = config.max_export_batch_size,
max_queue_size = max_queue_size,
);
let mut last_export_time = Instant::now();
let mut logs = Vec::new();
logs.reserve(config.max_export_batch_size);
let mut logs = Vec::with_capacity(config.max_export_batch_size);

loop {
let remaining_time_option = config
Expand Down Expand Up @@ -387,6 +391,9 @@ impl BatchLogProcessor {
}
}
}
otel_info!(
name: "BatchLogProcessor.ThreadStopped"
);
})
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure

Expand Down
18 changes: 13 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
use crate::export::trace::{SpanData, SpanExporter};
use crate::resource::Resource;
use crate::trace::Span;
use opentelemetry::otel_error;
use opentelemetry::{otel_debug, otel_warn};
use opentelemetry::{otel_error, otel_info};
use opentelemetry::{
trace::{TraceError, TraceResult},
Context,
Expand Down Expand Up @@ -258,8 +258,14 @@ impl BatchSpanProcessor {
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);

let handle = thread::Builder::new()
.name("BatchSpanProcessorThread".to_string())
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
.spawn(move || {
otel_info!(
name: "BatchSpanProcessor.ThreadStarted",
interval_in_millisecs = config.scheduled_delay.as_millis(),
max_export_batch_size = config.max_export_batch_size,
max_queue_size = config.max_queue_size,
);
let mut spans = Vec::with_capacity(config.max_export_batch_size);
let mut last_export_time = Instant::now();

Expand Down Expand Up @@ -321,6 +327,9 @@ impl BatchSpanProcessor {
}
}
}
otel_info!(
name: "BatchSpanProcessor.ThreadStopped"
);
})
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure

Expand Down Expand Up @@ -363,13 +372,12 @@ impl SpanProcessor for BatchSpanProcessor {
}
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
// emit a warning.
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the reqwest-client feature
echo
echo ####
echo Integration Tests: Reqwest Client
echo "Integration Tests: Reqwest Client (Disabled now)"
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "reqwest-client","internal-logs"

# Run tests with the reqwest-client feature
# Run tests with the reqwest-blocking-client feature
echo
echo ####
echo Integration Tests: Reqwest Blocking Client
Expand All @@ -33,10 +33,10 @@ if [ -d "$TEST_DIR" ]; then
# Run tests with the hyper-client feature
echo
echo ####
echo Integration Tests: Hyper Client
echo "Integration Tests: Hyper Client (Disabled now)"
echo ####
echo
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
#cargo test --no-default-features --features "hyper-client","internal-logs"
else
echo "Directory $TEST_DIR does not exist. Skipping tests."
Expand Down

0 comments on commit ef49833

Please sign in to comment.