From 2f6490b636cc119cce1d48e1c5de6c5f09f364c2 Mon Sep 17 00:00:00 2001 From: Prashant Srivastava <50466688+srprash@users.noreply.github.com> Date: Tue, 12 Nov 2024 09:08:33 -0800 Subject: [PATCH] Adding AwsUnsampledOnlySpanProcessor to export batches of unsampled spans (#948) --- .../javaagent/providers/AwsAttributeKeys.java | 3 + .../AwsUnsampledOnlySpanProcessor.java | 85 +++++++++ .../AwsUnsampledOnlySpanProcessorBuilder.java | 46 +++++ .../AwsUnsampledOnlySpanProcessorTest.java | 163 ++++++++++++++++++ 4 files changed, 297 insertions(+) create mode 100644 awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessor.java create mode 100644 awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorBuilder.java create mode 100644 awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorTest.java diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java index b73794b9d..f9791a31e 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java @@ -70,6 +70,9 @@ private AwsAttributeKeys() {} static final AttributeKey AWS_LAMBDA_RESOURCE_ID = AttributeKey.stringKey("aws.lambda.resource_mapping.id"); + static final AttributeKey AWS_TRACE_FLAG_SAMPLED = + AttributeKey.booleanKey("aws.trace.flag.sampled"); + // use the same AWS Resource attribute name defined by OTel java auto-instr for aws_sdk_v_1_1 // TODO: all AWS specific attributes should be defined in semconv package and reused cross all // otel packages. Related sim - diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessor.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessor.java new file mode 100644 index 000000000..3848016f3 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessor.java @@ -0,0 +1,85 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SpanProcessor; + +/** + * {@link SpanProcessor} that only exports unsampled spans in a batch via a delegated @{link + * BatchSpanProcessor}. The processor also adds an attribute to each processed span to indicate that + * it was sampled or not. + */ +final class AwsUnsampledOnlySpanProcessor implements SpanProcessor { + + private final SpanProcessor delegate; + + AwsUnsampledOnlySpanProcessor(SpanProcessor delegate) { + this.delegate = delegate; + } + + public static AwsUnsampledOnlySpanProcessorBuilder builder() { + return new AwsUnsampledOnlySpanProcessorBuilder(); + } + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + if (!span.getSpanContext().isSampled()) { + span.setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false); + } + delegate.onStart(parentContext, span); + } + + @Override + public void onEnd(ReadableSpan span) { + if (!span.getSpanContext().isSampled()) { + delegate.onEnd(span); + } + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public boolean isEndRequired() { + return true; + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return delegate.forceFlush(); + } + + @Override + public void close() { + delegate.close(); + } + + // Visible for testing + SpanProcessor getDelegate() { + return delegate; + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorBuilder.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorBuilder.java new file mode 100644 index 000000000..89efbcf3b --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorBuilder.java @@ -0,0 +1,46 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; + +final class AwsUnsampledOnlySpanProcessorBuilder { + + // Default exporter is OtlpUdpSpanExporter with unsampled payload prefix + private SpanExporter exporter = + new OtlpUdpSpanExporterBuilder() + .setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED) + .build(); + + public AwsUnsampledOnlySpanProcessorBuilder setSpanExporter(SpanExporter exporter) { + requireNonNull(exporter, "exporter cannot be null"); + this.exporter = exporter; + return this; + } + + public AwsUnsampledOnlySpanProcessor build() { + BatchSpanProcessor bsp = + BatchSpanProcessor.builder(exporter).setExportUnsampledSpans(true).build(); + return new AwsUnsampledOnlySpanProcessor(bsp); + } + + SpanExporter getSpanExporter() { + return exporter; + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorTest.java new file mode 100644 index 000000000..ba41740bd --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsUnsampledOnlySpanProcessorTest.java @@ -0,0 +1,163 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import org.junit.jupiter.api.Test; + +public class AwsUnsampledOnlySpanProcessorTest { + + @Test + public void testIsStartRequired() { + SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build(); + assertThat(processor.isStartRequired()).isTrue(); + } + + @Test + public void testIsEndRequired() { + SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build(); + assertThat(processor.isEndRequired()).isTrue(); + } + + @Test + public void testDefaultSpanProcessor() { + AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessor.builder(); + AwsUnsampledOnlySpanProcessor unsampledSP = builder.build(); + + assertThat(builder.getSpanExporter()).isInstanceOf(OtlpUdpSpanExporter.class); + SpanProcessor delegate = unsampledSP.getDelegate(); + assertThat(delegate).isInstanceOf(BatchSpanProcessor.class); + BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate; + String delegateBspString = delegateBsp.toString(); + assertThat(delegateBspString) + .contains( + "spanExporter=software.amazon.opentelemetry.javaagent.providers.OtlpUdpSpanExporter"); + assertThat(delegateBspString).contains("exportUnsampledSpans=true"); + } + + @Test + public void testSpanProcessorWithExporter() { + AwsUnsampledOnlySpanProcessorBuilder builder = + AwsUnsampledOnlySpanProcessor.builder().setSpanExporter(InMemorySpanExporter.create()); + AwsUnsampledOnlySpanProcessor unsampledSP = builder.build(); + + assertThat(builder.getSpanExporter()).isInstanceOf(InMemorySpanExporter.class); + SpanProcessor delegate = unsampledSP.getDelegate(); + assertThat(delegate).isInstanceOf(BatchSpanProcessor.class); + BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate; + String delegateBspString = delegateBsp.toString(); + assertThat(delegateBspString) + .contains("spanExporter=io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter"); + assertThat(delegateBspString).contains("exportUnsampledSpans=true"); + } + + @Test + public void testStartAddsAttributeToSampledSpan() { + SpanContext mockSpanContext = mock(SpanContext.class); + when(mockSpanContext.isSampled()).thenReturn(true); + Context parentContextMock = mock(Context.class); + ReadWriteSpan spanMock = mock(ReadWriteSpan.class); + when(spanMock.getSpanContext()).thenReturn(mockSpanContext); + + AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build(); + processor.onStart(parentContextMock, spanMock); + + // verify setAttribute was never called + verify(spanMock, never()).setAttribute(any(), anyBoolean()); + } + + @Test + public void testStartAddsAttributeToUnsampledSpan() { + SpanContext mockSpanContext = mock(SpanContext.class); + when(mockSpanContext.isSampled()).thenReturn(false); + Context parentContextMock = mock(Context.class); + ReadWriteSpan spanMock = mock(ReadWriteSpan.class); + when(spanMock.getSpanContext()).thenReturn(mockSpanContext); + + AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build(); + processor.onStart(parentContextMock, spanMock); + + // verify setAttribute was called with the correct arguments + verify(spanMock, times(1)).setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false); + } + + @Test + public void testExportsOnlyUnsampledSpans() { + SpanExporter mockExporter = mock(SpanExporter.class); + when(mockExporter.export(anyCollection())).thenReturn(CompletableResultCode.ofSuccess()); + + TestDelegateProcessor delegate = new TestDelegateProcessor(); + AwsUnsampledOnlySpanProcessor processor = new AwsUnsampledOnlySpanProcessor(delegate); + + // unsampled span + SpanContext mockSpanContextUnsampled = mock(SpanContext.class); + when(mockSpanContextUnsampled.isSampled()).thenReturn(false); + ReadableSpan mockSpanUnsampled = mock(ReadableSpan.class); + when(mockSpanUnsampled.getSpanContext()).thenReturn(mockSpanContextUnsampled); + + // sampled span + SpanContext mockSpanContextSampled = mock(SpanContext.class); + when(mockSpanContextSampled.isSampled()).thenReturn(true); + ReadableSpan mockSpanSampled = mock(ReadableSpan.class); + when(mockSpanSampled.getSpanContext()).thenReturn(mockSpanContextSampled); + + processor.onEnd(mockSpanSampled); + processor.onEnd(mockSpanUnsampled); + + // validate that only the unsampled span was delegated + assertThat(delegate.getEndedSpans()).containsExactly(mockSpanUnsampled); + } + + private static class TestDelegateProcessor implements SpanProcessor { + // keep a queue of Readable spans added when onEnd is called + Collection endedSpans = new ArrayList<>(); + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) {} + + @Override + public boolean isStartRequired() { + return false; + } + + @Override + public void onEnd(ReadableSpan span) { + endedSpans.add(span); + } + + @Override + public boolean isEndRequired() { + return false; + } + + public Collection getEndedSpans() { + return endedSpans; + } + } +}