Skip to content

Commit

Permalink
Remove mut self reference from LogExporter::export() method. (#2380)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Dec 5, 2024
1 parent 957659f commit 96b7acc
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 29 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ mod tests {

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let client = self
.client
.lock()
Expand Down
11 changes: 7 additions & 4 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
Expand All @@ -20,7 +21,7 @@ pub(crate) struct TonicLogsClient {

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: BoxInterceptor,
interceptor: Mutex<BoxInterceptor>,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -45,7 +46,7 @@ impl TonicLogsClient {
TonicLogsClient {
inner: Some(ClientInner {
client,
interceptor,
interceptor: Mutex::new(interceptor),
}),
resource: Default::default(),
}
Expand All @@ -54,11 +55,13 @@ impl TonicLogsClient {

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl LogExporter {

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
self.client.export(batch).await
}

Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
- `ResourceDetector.detect()` no longer supports timeout option.
- `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`)

- *Breaking* The LogExporter::export() method no longer requires a mutable reference to self.:
Before:
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()>
After:
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()>
Custom exporters will need to internally synchronize any mutable state, if applicable.

## 0.27.1

Released 2024-Nov-27
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait LogExporter: Send + Sync + Debug {
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
///
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
.exporter
.lock()
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
.and_then(|exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
Expand Down Expand Up @@ -586,7 +586,7 @@ mod tests {

#[async_trait]
impl LogExporter for MockLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
Ok(())
}

Expand Down Expand Up @@ -1093,7 +1093,7 @@ mod tests {

#[async_trait::async_trait]
impl LogExporter for LogExporterThatRequiresTokio {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
tokio::time::sleep(Duration::from_millis(50)).await;

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl InMemoryLogExporter {

#[async_trait]
impl LogExporter for InMemoryLogExporter {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
for (log_record, instrumentation) in batch.iter() {
let owned_log = OwnedLogData {
Expand Down
14 changes: 9 additions & 5 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ use opentelemetry_sdk::export::logs::LogBatch;
use opentelemetry_sdk::logs::LogResult;
use opentelemetry_sdk::Resource;
use std::sync::atomic;
use std::sync::atomic::Ordering;

/// An OpenTelemetry exporter that writes Logs to stdout on export.
pub struct LogExporter {
resource: Resource,
is_shutdown: atomic::AtomicBool,
resource_emitted: bool,
resource_emitted: atomic::AtomicBool,
}

impl Default for LogExporter {
fn default() -> Self {
LogExporter {
resource: Resource::default(),
is_shutdown: atomic::AtomicBool::new(false),
resource_emitted: false,
resource_emitted: atomic::AtomicBool::new(false),
}
}
}
Expand All @@ -32,15 +33,18 @@ impl fmt::Debug for LogExporter {
#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
/// Export spans to stdout
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
return Err("exporter is shut down".into());
} else {
println!("Logs");
if self.resource_emitted {
if self
.resource_emitted
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
print_logs(batch);
} else {
self.resource_emitted = true;
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
println!("\t Resource SchemaUrl: {:?}", schema_url);
Expand Down
2 changes: 2 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.32", optional = true }
libc = "=0.2.164" # https://github.com/GuillaumeGomez/sysinfo/issues/1392
async-trait = "0.1.51"
futures-executor = { workspace = true }

[features]
stats = ["sysinfo"]
36 changes: 25 additions & 11 deletions stress/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,38 @@
~31 M/sec
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
~44 M /sec
~40 M /sec
*/

use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider};
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider};
use tracing::error;
use tracing_subscriber::prelude::*;

mod throughput;
use async_trait::async_trait;

#[derive(Debug, Clone)]
struct MockLogExporter;

#[async_trait]
impl LogExporter for MockLogExporter {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
}
}

#[derive(Debug)]
pub struct NoOpLogProcessor;

impl LogProcessor for NoOpLogProcessor {
fn emit(
&self,
_record: &mut opentelemetry_sdk::logs::LogRecord,
_scope: &InstrumentationScope,
) {
pub struct MockLogProcessor {
exporter: MockLogExporter,
}

impl LogProcessor for MockLogProcessor {
fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) {
let log_tuple = &[(record as &LogRecord, scope)];
let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
}

fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> {
Expand All @@ -40,7 +52,9 @@ impl LogProcessor for NoOpLogProcessor {
fn main() {
// LoggerProvider with a no-op processor.
let provider: LoggerProvider = LoggerProvider::builder()
.with_log_processor(NoOpLogProcessor {})
.with_log_processor(MockLogProcessor {
exporter: MockLogExporter {},
})
.build();

// Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.
Expand Down

0 comments on commit 96b7acc

Please sign in to comment.