-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Signals processing #1014
Signals processing #1014
Changes from 8 commits
7f9a765
27a8f2c
f86d71f
238080c
e683579
91a3c25
8a48848
1af3867
68f84f8
5d7aaff
4f1014e
9f5c6d9
266472f
33443fe
0d6edb3
8f61209
1faffe9
90fbc9d
8c93d6c
e044d7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Processors | ||
|
||
This module provides tools to intercept and process signals globally. | ||
|
||
## Component owners | ||
|
||
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic | ||
- ? | ||
|
||
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
plugins { | ||
id("otel.java-conventions") | ||
id("otel.publish-conventions") | ||
} | ||
|
||
description = "Tools to intercept and process signals globally." | ||
otelJava.moduleName.set("io.opentelemetry.contrib.processors") | ||
|
||
java { | ||
sourceCompatibility = JavaVersion.VERSION_1_8 | ||
targetCompatibility = JavaVersion.VERSION_1_8 | ||
} | ||
|
||
dependencies { | ||
api("io.opentelemetry:opentelemetry-sdk") | ||
testImplementation("io.opentelemetry:opentelemetry-sdk-testing") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor; | ||
|
||
import io.opentelemetry.contrib.interceptor.common.Interceptable; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.logs.data.LogRecordData; | ||
import io.opentelemetry.sdk.logs.export.LogRecordExporter; | ||
import java.util.Collection; | ||
|
||
/** Intercepts logs before delegating them to the real exporter. */ | ||
public class InterceptableLogRecordExporter extends Interceptable<LogRecordData> | ||
implements LogRecordExporter { | ||
private final LogRecordExporter delegate; | ||
|
||
public InterceptableLogRecordExporter(LogRecordExporter delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(Collection<LogRecordData> logs) { | ||
return delegate.export(interceptAll(logs)); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode flush() { | ||
return delegate.flush(); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
return delegate.shutdown(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor; | ||
|
||
import io.opentelemetry.contrib.interceptor.common.Interceptable; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.metrics.InstrumentType; | ||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; | ||
import io.opentelemetry.sdk.metrics.data.MetricData; | ||
import io.opentelemetry.sdk.metrics.export.MetricExporter; | ||
import java.util.Collection; | ||
|
||
/** Intercepts metrics before delegating them to the real exporter. */ | ||
public class InterceptableMetricExporter extends Interceptable<MetricData> | ||
implements MetricExporter { | ||
private final MetricExporter delegate; | ||
|
||
public InterceptableMetricExporter(MetricExporter delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(Collection<MetricData> metrics) { | ||
return delegate.export(interceptAll(metrics)); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode flush() { | ||
return delegate.flush(); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
return delegate.shutdown(); | ||
} | ||
|
||
@Override | ||
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { | ||
return delegate.getAggregationTemporality(instrumentType); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor; | ||
|
||
import io.opentelemetry.contrib.interceptor.common.Interceptable; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.trace.data.SpanData; | ||
import io.opentelemetry.sdk.trace.export.SpanExporter; | ||
import java.util.Collection; | ||
|
||
/** Intercepts spans before delegating them to the real exporter. */ | ||
public class InterceptableSpanExporter extends Interceptable<SpanData> implements SpanExporter { | ||
private final SpanExporter delegate; | ||
|
||
public InterceptableSpanExporter(SpanExporter delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode export(Collection<SpanData> spans) { | ||
return delegate.export(interceptAll(spans)); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode flush() { | ||
return delegate.flush(); | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
return delegate.shutdown(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor.api; | ||
|
||
/** | ||
* Intercepts a signal before it gets exported. The signal can get updated and/or filtered out based | ||
* on each interceptor implementation. | ||
*/ | ||
public interface Interceptor<T> { | ||
|
||
/** | ||
* Intercepts a signal. | ||
* | ||
* @param item The signal object. | ||
* @return The received signal modified (or null for excluding this signal from getting exported). | ||
* If there's no operation needed to be done for a specific signal, it should be returned as | ||
* is. | ||
*/ | ||
T intercept(T item); | ||
LikeTheSalad marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor.common; | ||
|
||
import io.opentelemetry.contrib.interceptor.api.Interceptor; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
/** Base class to reuse the code related to intercepting signals. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I strongly believe that we should not use object inheritance as a mechanism for reusing code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally speaking, I understand the benefits of composition over inheritance, although I still believe that there are cases where inheritance makes more sense. I think the problem comes when people start adding more than one responsibility to a base class. That being said though, after my recent changes to this PR I realized that it was actually better not to use inheritance for this use case 😅 so thanks for your input! Though I guess my point is that the best approach might vary depending on the use case. |
||
public class Interceptable<T> { | ||
private final Set<Interceptor<T>> interceptors = new HashSet<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could think of a few contrived scenarios where order might matter. I'd use a list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, I've added the changes into ComposableInterceptor. |
||
|
||
public void addInterceptor(Interceptor<T> interceptor) { | ||
interceptors.add(interceptor); | ||
} | ||
|
||
protected Collection<T> interceptAll(Collection<T> items) { | ||
List<T> result = new ArrayList<>(); | ||
|
||
for (T item : items) { | ||
T intercepted = intercept(item); | ||
if (intercepted != null) { | ||
result.add(intercepted); | ||
} | ||
} | ||
|
||
return result; | ||
} | ||
|
||
private T intercept(T item) { | ||
T intercepted = item; | ||
for (Interceptor<T> interceptor : interceptors) { | ||
intercepted = interceptor.intercept(intercepted); | ||
if (intercepted == null) { | ||
break; | ||
} | ||
} | ||
return intercepted; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.contrib.interceptor; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import io.opentelemetry.api.common.AttributeKey; | ||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.api.common.AttributesBuilder; | ||
import io.opentelemetry.api.logs.Logger; | ||
import io.opentelemetry.api.logs.Severity; | ||
import io.opentelemetry.api.trace.SpanContext; | ||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo; | ||
import io.opentelemetry.sdk.logs.SdkLoggerProvider; | ||
import io.opentelemetry.sdk.logs.data.Body; | ||
import io.opentelemetry.sdk.logs.data.LogRecordData; | ||
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; | ||
import io.opentelemetry.sdk.resources.Resource; | ||
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; | ||
import java.util.List; | ||
import javax.annotation.Nullable; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class InterceptableLogRecordExporterTest { | ||
private InMemoryLogRecordExporter memoryLogRecordExporter; | ||
private Logger logger; | ||
private InterceptableLogRecordExporter interceptable; | ||
|
||
@BeforeEach | ||
public void setUp() { | ||
LikeTheSalad marked this conversation as resolved.
Show resolved
Hide resolved
|
||
memoryLogRecordExporter = InMemoryLogRecordExporter.create(); | ||
interceptable = new InterceptableLogRecordExporter(memoryLogRecordExporter); | ||
logger = | ||
SdkLoggerProvider.builder() | ||
.addLogRecordProcessor(SimpleLogRecordProcessor.create(interceptable)) | ||
.build() | ||
.get("TestScope"); | ||
} | ||
|
||
@Test | ||
public void verifyLogModification() { | ||
interceptable.addInterceptor( | ||
item -> { | ||
ModifiableLogRecordData modified = new ModifiableLogRecordData(item); | ||
modified.attributes.put("global.attr", "from interceptor"); | ||
return modified; | ||
}); | ||
|
||
logger | ||
.logRecordBuilder() | ||
.setBody("One log") | ||
.setAttribute(AttributeKey.stringKey("local.attr"), "local") | ||
.emit(); | ||
|
||
List<LogRecordData> finishedLogRecordItems = | ||
memoryLogRecordExporter.getFinishedLogRecordItems(); | ||
assertEquals(1, finishedLogRecordItems.size()); | ||
LogRecordData logRecordData = finishedLogRecordItems.get(0); | ||
assertEquals(2, logRecordData.getAttributes().size()); | ||
assertEquals( | ||
"from interceptor", | ||
logRecordData.getAttributes().get(AttributeKey.stringKey("global.attr"))); | ||
assertEquals("local", logRecordData.getAttributes().get(AttributeKey.stringKey("local.attr"))); | ||
} | ||
|
||
@Test | ||
public void verifyLogFiltering() { | ||
interceptable.addInterceptor( | ||
item -> { | ||
if (item.getBody().asString().contains("deleted")) { | ||
return null; | ||
} | ||
return item; | ||
}); | ||
|
||
logger.logRecordBuilder().setBody("One log").emit(); | ||
logger.logRecordBuilder().setBody("This log will be deleted").emit(); | ||
logger.logRecordBuilder().setBody("Another log").emit(); | ||
|
||
List<LogRecordData> finishedLogRecordItems = | ||
memoryLogRecordExporter.getFinishedLogRecordItems(); | ||
assertEquals(2, finishedLogRecordItems.size()); | ||
assertEquals("One log", finishedLogRecordItems.get(0).getBody().asString()); | ||
assertEquals("Another log", finishedLogRecordItems.get(1).getBody().asString()); | ||
} | ||
|
||
private static class ModifiableLogRecordData implements LogRecordData { | ||
private final LogRecordData delegate; | ||
private final AttributesBuilder attributes = Attributes.builder(); | ||
|
||
private ModifiableLogRecordData(LogRecordData delegate) { | ||
this.delegate = delegate; | ||
} | ||
|
||
@Override | ||
public Resource getResource() { | ||
return delegate.getResource(); | ||
} | ||
|
||
@Override | ||
public InstrumentationScopeInfo getInstrumentationScopeInfo() { | ||
return delegate.getInstrumentationScopeInfo(); | ||
} | ||
|
||
@Override | ||
public long getTimestampEpochNanos() { | ||
return delegate.getTimestampEpochNanos(); | ||
} | ||
|
||
@Override | ||
public long getObservedTimestampEpochNanos() { | ||
return delegate.getObservedTimestampEpochNanos(); | ||
} | ||
|
||
@Override | ||
public SpanContext getSpanContext() { | ||
return delegate.getSpanContext(); | ||
} | ||
|
||
@Override | ||
public Severity getSeverity() { | ||
return delegate.getSeverity(); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getSeverityText() { | ||
return delegate.getSeverityText(); | ||
} | ||
|
||
@Override | ||
public Body getBody() { | ||
return delegate.getBody(); | ||
} | ||
|
||
@Override | ||
public Attributes getAttributes() { | ||
return attributes.putAll(delegate.getAttributes()).build(); | ||
} | ||
|
||
@Override | ||
public int getTotalAttributeCount() { | ||
return delegate.getTotalAttributeCount(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add me here. I think the literal
?
breaks things.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! You're added now.