Skip to content

Commit

Permalink
Handle batch log processing in a dedicated background thread (#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomsonTan authored Dec 19, 2024
1 parent 0605341 commit 938893c
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 99 deletions.
3 changes: 1 addition & 2 deletions opentelemetry-appender-log/examples/logs-basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use log::{error, info, warn, Level};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider};
use opentelemetry_sdk::runtime;
use opentelemetry_stdout::LogExporter;

#[tokio::main]
Expand All @@ -16,7 +15,7 @@ async fn main() {
let exporter = LogExporter::default();
//Create a LoggerProvider and register the exporter
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build())
.with_log_processor(BatchLogProcessor::builder(exporter).build())
.build();

// Setup Log Appender for the log crate.
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ mod tests {
async fn batch_processor_no_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter;
let logger_provider = LoggerProvider::builder()
.with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio)
.with_batch_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn init_logs() -> Result<sdklogs::LoggerProvider, opentelemetry_sdk::logs::LogEr
.build()?;

Ok(LoggerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(RESOURCE.clone())
.build())
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn init_logs() -> Result<opentelemetry_sdk::logs::LoggerProvider, LogError> {

Ok(LoggerProvider::builder()
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.build())
}

Expand Down
7 changes: 5 additions & 2 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::{info, Level};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, runtime, Resource};
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use std::time::Duration;
Expand All @@ -28,7 +28,7 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {
let exporter = exporter_builder.build()?;

Ok(LoggerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder_empty()
.with_service_name("logs-integration-test")
Expand All @@ -48,6 +48,9 @@ pub async fn test_logs() -> Result<()> {
log::set_max_level(Level::Info.to_level_filter());

info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);

// TODO: remove below wait before calling logger_provider.shutdown()
tokio::time::sleep(Duration::from_secs(10)).await;
let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;
Expand Down
53 changes: 52 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@

- Bump msrv to 1.75.0.


- *Breaking* : [#2314](https://github.com/open-telemetry/opentelemetry-rust/pull/2314)
- The LogRecord struct has been updated:
- All fields are now pub(crate) instead of pub.
Expand All @@ -105,6 +104,58 @@
- Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as
metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/open-telemetry/opentelemetry-rust/pull/2418)

- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436)

`BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated
background thread is created to do the batch processing and exporting.

For users who prefer the previous behavior of relying on a specific
`Runtime`, they can do so by enabling the feature flag
**`experimental_logs_batch_log_processor_with_async_runtime`**.

1. *Default Implementation, requires no async runtime* (**Recommended**) The
new default implementation does not require a runtime argument. Replace the
builder method accordingly:
- *Before:*
```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*
```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter).build())
.build();
```

2. *Async Runtime Support*
If your application cannot spin up new threads or you prefer using async
runtimes, enable the
"experimental_logs_batch_log_processor_with_async_runtime" feature flag and
adjust code as below.

- *Before:*
```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*
```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessorWithAsyncRuntime::builder(exporter, runtime::Tokio).build())
.build();
```

*Requirements:*
- Enable the feature flag:
`experimental_logs_batch_log_processor_with_async_runtime`.
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
`rt-tokio-current-thread`, or `rt-async-std`.

## 0.27.1

Released 2024-Nov-27
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ internal-logs = ["tracing"]
experimental_metrics_periodic_reader_no_runtime = ["metrics"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]

[[bench]]
name = "context"
Expand Down
10 changes: 3 additions & 7 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use crate::{export::logs::LogExporter, Resource};
use crate::{logs::LogError, logs::LogResult};
use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope};

Expand Down Expand Up @@ -194,12 +194,8 @@ impl Builder {
}

/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
) -> Self {
let batch = BatchLogProcessor::builder(exporter, runtime).build();
pub fn with_batch_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
let batch = BatchLogProcessor::builder(exporter).build();
self.with_log_processor(batch)
}

Expand Down
Loading

0 comments on commit 938893c

Please sign in to comment.