Skip to content

Commit

Permalink
Use static dispatch in SimpleLogProcessor (#2338)
Browse files Browse the repository at this point in the history
Co-authored-by: Lalit Kumar Bhasin <[email protected]>
  • Loading branch information
cijothomas and lalitb authored Dec 2, 2024
1 parent a3c469b commit 6b71301
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 18 deletions.
6 changes: 6 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## vNext

- *Breaking* SimpleLogProcessor modified to be generic over `LogExporter` to
avoid dynamic dispatch to invoke exporter. If you were using
`with_simple_exporter` to add `LogExporter` with SimpleLogProcessor, this is a
transparent change.
[#2338](https://github.com/open-telemetry/opentelemetry-rust/pull/2338)

## 0.27.1

Released 2024-Nov-27
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Builder {
/// The `LogExporter` that this provider should use.
pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
let mut processors = self.processors;
processors.push(Box::new(SimpleLogProcessor::new(Box::new(exporter))));
processors.push(Box::new(SimpleLogProcessor::new(exporter)));

Builder { processors, ..self }
}
Expand Down
32 changes: 16 additions & 16 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,21 @@ pub trait LogProcessor: Send + Sync + Debug {
/// debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchLogProcessor].
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
pub struct SimpleLogProcessor<T: LogExporter> {
exporter: Mutex<T>,
is_shutdown: AtomicBool,
}

impl SimpleLogProcessor {
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
impl<T: LogExporter> SimpleLogProcessor<T> {
pub(crate) fn new(exporter: T) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
}
}
}

impl LogProcessor for SimpleLogProcessor {
impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
Expand Down Expand Up @@ -764,7 +764,7 @@ mod tests {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());
let _ = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
Expand Down Expand Up @@ -832,7 +832,7 @@ mod tests {
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
#[test]
fn test_simple_processor_sync_exporter_without_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand All @@ -1026,7 +1026,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_sync_exporter_with_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand All @@ -1039,7 +1039,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));

let mut handles = vec![];
for _ in 0..10 {
Expand All @@ -1062,7 +1062,7 @@ mod tests {
#[tokio::test(flavor = "current_thread")]
async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand Down Expand Up @@ -1109,7 +1109,7 @@ mod tests {
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
let result = std::panic::catch_unwind(|| {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand Down Expand Up @@ -1158,7 +1158,7 @@ mod tests {
// tasks nor the exporter can proceed.
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
let processor = Arc::new(SimpleLogProcessor::new(exporter.clone()));

let concurrent_emit = 4; // number of worker threads

Expand Down Expand Up @@ -1189,7 +1189,7 @@ mod tests {
// tasks occupy the runtime.
async fn test_simple_processor_async_exporter_with_runtime() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand All @@ -1208,7 +1208,7 @@ mod tests {
async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
let exporter = LogExporterThatRequiresTokio::new();

let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand All @@ -1228,7 +1228,7 @@ mod tests {
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
let exporter = LogExporterThatRequiresTokio::new();

let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let processor = SimpleLogProcessor::new(exporter.clone());

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod tests {
let exporter: InMemoryLogExporter = InMemoryLogExporter::default();
let logger_provider = LoggerProvider::builder()
.with_resource(resource.clone())
.with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone())))
.with_log_processor(SimpleLogProcessor::new(exporter.clone()))
.build();

// Act
Expand Down

0 comments on commit 6b71301

Please sign in to comment.