Skip to content

Commit

Permalink
Implement context_propagation_only setting (#3358)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz authored Nov 3, 2023
1 parent e4c5043 commit 337f174
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
* Added protection against invalid timestamps provided by manual instrumentation - {pull}3363[#3363]
* Added support for AWS SDK 2.21 - {pull}3373[#3373]
* Capture bucket and object key to Lambda transaction as OTel attributes - `aws.s3.bueckt`, `aws.s3.key` - {pull}3364[#3364]
* Added `context_propagation_only` configuration option - {pull}3358[#3358]
[float]
===== Bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,17 @@ public String toString(Collection<String> value) {
"is available somewhere in the classpath where it never gets loaded unless this matching is applied.")
.buildWithDefault(true);

private final ConfigurationOption<Boolean> contextPropagationOnly = ConfigurationOption.booleanOption()
.key("context_propagation_only")
.configurationCategory(CORE_CATEGORY)
.description("When set to true, disables log sending, metrics and trace collection.\n" +
"Trace context propagation and log correlation will stay active.\n"
+"Note that in contrast to <<config-disable-send, `disable_send`>> the agent will still" +
" connect to the APM-server for fetching configuration updates and health checks.")
.dynamic(true)
.tags("added[1.44.0]")
.buildWithDefault(false);

private final ConfigurationOption<List<WildcardMatcher>> classesExcludedFromInstrumentation = ConfigurationOption
.builder(new ValueConverter<List<WildcardMatcher>>() {

Expand Down Expand Up @@ -1147,6 +1158,10 @@ public List<WildcardMatcher> getBaggageToAttach() {
return baggateToAttach.get();
}

public boolean isContextPropagationOnly() {
return contextPropagationOnly.get();
}

public enum CloudProvider {
AUTO,
AWS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, Heade
return startChildTransaction(headerCarrier, headersGetter, sampler, epochMicros, currentContext().getBaggage(), initiatingClassLoader);
}

@Nullable
private <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, Sampler sampler,
long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader) {
Transaction transaction = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TracerConfiguration extends ConfigurationOptionProvider {
"When recording, the agent instruments incoming HTTP requests, tracks errors and collects and sends metrics.\n" +
"When not recording, the agent works as a noop, not collecting data and not communicating with the APM sever,\n" +
"except for polling the central configuration endpoint.\n" +
"Note that trace context propagation, baggage and log correlation will also be disabled when recording is disabled.\n"+
"As this is a reversible switch, agent threads are not being killed when inactivated, but they will be \n" +
"mostly idle in this state, so the overhead should be negligible.\n" +
"\n" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.impl.transaction;

import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractRefCountedContext<T extends AbstractRefCountedContext<T>> extends ElasticContext<T> implements Recyclable {
private static final Logger logger = LoggerFactory.getLogger(AbstractRefCountedContext.class);

private final AtomicInteger references = new AtomicInteger();

protected AbstractRefCountedContext(ElasticApmTracer tracer) {
super(tracer);
}

@Override
public void incrementReferences() {
int referenceCount = references.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("increment references to {} ({})", this, referenceCount);
if (logger.isTraceEnabled()) {
logger.trace("incrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been incremented."));
}
}
}

@Override
public void decrementReferences() {
int referenceCount = references.decrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("decrement references to {} ({})", this, referenceCount);
if (logger.isTraceEnabled()) {
logger.trace("decrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been decremented."));
}
}
if (referenceCount == 0) {
recycle();
}
}

public boolean isReferenced() {
return references.get() > 0;
}

public int getReferenceCount() {
return references.get();
}

protected abstract void recycle();

@Override
public void resetState() {
references.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractSpan<T extends AbstractSpan<T>> extends ElasticContext<T> implements Recyclable, co.elastic.apm.agent.tracer.AbstractSpan<T> {
public abstract class AbstractSpan<T extends AbstractSpan<T>> extends AbstractRefCountedContext<T> implements Recyclable, co.elastic.apm.agent.tracer.AbstractSpan<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class);
private static final Logger oneTimeDuplicatedEndLogger = LoggerUtils.logOnce(logger);
private static final Logger oneTimeMaxSpanLinksLogger = LoggerUtils.logOnce(logger);
Expand All @@ -59,7 +59,6 @@ public abstract class AbstractSpan<T extends AbstractSpan<T>> extends ElasticCon
protected final AtomicLong endTimestamp = new AtomicLong();

private ChildDurationTimer childDurations = new ChildDurationTimer();
protected AtomicInteger references = new AtomicInteger();
protected volatile boolean finished = true;
private int namePriority = PRIORITY_DEFAULT;
private boolean discardRequested = false;
Expand Down Expand Up @@ -125,10 +124,6 @@ public abstract class AbstractSpan<T extends AbstractSpan<T>> extends ElasticCon

private final Map<String, Object> otelAttributes = new HashMap<>();

public int getReferenceCount() {
return references.get();
}

@Override
public T requestDiscarding() {
this.discardRequested = true;
Expand Down Expand Up @@ -203,10 +198,6 @@ public AbstractSpan(ElasticApmTracer tracer) {
collectBreakdownMetrics = selfTimeCollectionEnabled && breakdownMetricsEnabled;
}

public boolean isReferenced() {
return references.get() > 0;
}

@Override
public boolean isFinished() {
return finished;
Expand Down Expand Up @@ -434,6 +425,7 @@ public List<TraceContext> getSpanLinks() {

@Override
public void resetState() {
super.resetState();
finished = true;
name.setLength(0);
type = null;
Expand All @@ -443,7 +435,6 @@ public void resetState() {
traceContext.resetState();
baggage = Baggage.EMPTY;
childDurations.resetState();
references.set(0);
namePriority = PRIORITY_DEFAULT;
discardRequested = false;
isExit = false;
Expand Down Expand Up @@ -666,35 +657,6 @@ void onChildEnd(long epochMicros) {
}
}

@Override
public void incrementReferences() {
int referenceCount = references.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("increment references to {} ({})", this, referenceCount);
if (logger.isTraceEnabled()) {
logger.trace("incrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been incremented."));
}
}
}

@Override
public void decrementReferences() {
int referenceCount = references.decrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("decrement references to {} ({})", this, referenceCount);
if (logger.isTraceEnabled()) {
logger.trace("decrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been decremented."));
}
}
if (referenceCount == 0) {
recycle();
}
}

protected abstract void recycle();

@Override
public void setNonDiscardable() {
getTraceContext().setNonDiscardable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package co.elastic.apm.agent.report;

import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.impl.error.ErrorCapture;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.impl.transaction.Transaction;
Expand Down Expand Up @@ -132,8 +133,11 @@ public void translateTo(ReportingEvent event, long sequence, byte[] bytes) {

private final PartialTransactionReporter partialTransactionReporter;

private final CoreConfiguration coreConfiguration;

public ApmServerReporter(boolean dropTransactionIfQueueFull,
ReporterConfiguration reporterConfiguration,
CoreConfiguration coreConfiguration,
ReportingEventHandler reportingEventHandler,
ReporterMonitor monitor,
ApmServerClient apmServer,
Expand All @@ -144,6 +148,7 @@ public ApmServerReporter(boolean dropTransactionIfQueueFull,
this.dropTransactionIfQueueFull = dropTransactionIfQueueFull;
this.syncReport = reporterConfiguration.isReportSynchronously();
this.monitor = monitor;
this.coreConfiguration = coreConfiguration;
disruptor = new Disruptor<>(
new TransactionEventFactory(),
MathUtils.getNextPowerOf2(reporterConfiguration.getMaxQueueSize()),
Expand All @@ -164,7 +169,9 @@ public void start() {

@Override
public void reportPartialTransaction(Transaction transaction) {
partialTransactionReporter.reportPartialTransaction(transaction);
if (!coreConfiguration.isContextPropagationOnly()) {
partialTransactionReporter.reportPartialTransaction(transaction);
}
}

@Override
Expand Down Expand Up @@ -340,6 +347,10 @@ long getQueueElementCount() {
}

private <E> boolean tryAddEventToRingBuffer(E event, EventTranslatorOneArg<ReportingEvent, E> eventTranslator, ReportingEvent.ReportingEventType targetType) {
if(coreConfiguration.isContextPropagationOnly()) {
logger.debug("Dropping event {} because of context_propagation_only", event.getClass().getSimpleName(), event);
return false;
}
long capacity = getQueueCapacity();
monitor.eventCreated(targetType, capacity, getQueueElementCount());
if (dropTransactionIfQueueFull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package co.elastic.apm.agent.report;

import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.objectpool.ObjectPoolFactory;
import co.elastic.apm.agent.report.processor.ProcessorEventHandler;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
Expand All @@ -34,8 +35,9 @@ public Reporter createReporter(ConfigurationRegistry configurationRegistry,
ObjectPoolFactory poolFactory) {

ReporterConfiguration reporterConfiguration = configurationRegistry.getConfig(ReporterConfiguration.class);
CoreConfiguration coreConfig = configurationRegistry.getConfig(CoreConfiguration.class);
ReportingEventHandler reportingEventHandler = getReportingEventHandler(configurationRegistry, reporterConfiguration, payloadSerializer, apmServerClient);
return new ApmServerReporter(true, reporterConfiguration, reportingEventHandler, monitor, apmServerClient, payloadSerializer, poolFactory);
return new ApmServerReporter(true, reporterConfiguration, coreConfig, reportingEventHandler, monitor, apmServerClient, payloadSerializer, poolFactory);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package co.elastic.apm.agent.report;

import co.elastic.apm.agent.MockTracer;
import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.error.ErrorCapture;
import co.elastic.apm.agent.impl.metadata.MetaDataMock;
Expand All @@ -31,6 +32,7 @@
import co.elastic.apm.agent.objectpool.ObjectPoolFactory;
import co.elastic.apm.agent.report.processor.ProcessorEventHandler;
import co.elastic.apm.agent.report.serialize.DslJsonSerializer;
import co.elastic.apm.agent.report.serialize.SerializationConstants;
import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
Expand Down Expand Up @@ -66,6 +68,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

class ApmServerReporterIntegrationTest {

Expand All @@ -81,6 +84,8 @@ class ApmServerReporterIntegrationTest {
private volatile int statusCode = HttpStatus.OK_200;
private volatile int acceptedEventCount = 0;
private ReporterConfiguration reporterConfiguration;

private CoreConfiguration coreConfiguration;
private ApmServerReporter reporter;

private ReporterMonitor mockMonitor;
Expand Down Expand Up @@ -137,6 +142,8 @@ void setUp() throws Exception {

ConfigurationRegistry config = tracer.getConfigurationRegistry();
reporterConfiguration = config.getConfig(ReporterConfiguration.class);
coreConfiguration = config.getConfig(CoreConfiguration.class);
SerializationConstants.init(coreConfiguration);

// mockito mocking does not seem to reliably work here
// thus we rely on mutable test state instead of having different mocking strategies.
Expand All @@ -163,7 +170,7 @@ void setUp() throws Exception {
payloadSerializer,
apmServerClient);
mockMonitor = Mockito.mock(ReporterMonitor.class);
reporter = new ApmServerReporter(false, reporterConfiguration, v2handler, mockMonitor, apmServerClient, payloadSerializer, new ObjectPoolFactory());
reporter = new ApmServerReporter(false, reporterConfiguration, coreConfiguration, v2handler, mockMonitor, apmServerClient, payloadSerializer, new ObjectPoolFactory());
reporter.start();
}

Expand Down Expand Up @@ -194,6 +201,22 @@ void testReportTransaction() {
verify(mockMonitor).requestFinished(eq(payload), eq(1L), gt(0L), eq(true));
}


@Test
void testContextPropagationOnlyRespected() {
doReturn(true).when(coreConfiguration).isContextPropagationOnly();

reporter.reportPartialTransaction(new Transaction(tracer));
reporter.report(new Transaction(tracer));
assertThat(reporter.flush(5, TimeUnit.SECONDS, false)).isTrue();
assertThat(reporter.getDropped()).isEqualTo(0);
assertThat(receivedIntakeApiCalls.get()).isEqualTo(0);
assertThat(receivedIntakeApiCallsWithFlushParam.get()).isEqualTo(0);
assertThat(reporter.getReported()).isEqualTo(0);

verifyNoInteractions(mockMonitor);
}

@Test
void testReportTransaction_withFlushRequest() {
reporter.report(new Transaction(tracer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package co.elastic.apm.agent.esrestclient.v6_4;

import co.elastic.apm.agent.bci.ElasticApmAgent;
import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.configuration.SpyConfiguration;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.ElasticApmTracerBuilder;
Expand Down Expand Up @@ -126,6 +127,7 @@ public static void startElasticsearchContainerAndClient() throws IOException {

final ConfigurationRegistry configurationRegistry = SpyConfiguration.createSpyConfig();
ReporterConfiguration reporterConfiguration = configurationRegistry.getConfig(ReporterConfiguration.class);
CoreConfiguration coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
doReturn(0).when(reporterConfiguration).getMaxQueueSize();
StacktraceConfiguration stacktraceConfiguration = configurationRegistry.getConfig(StacktraceConfiguration.class);
doReturn(30).when(stacktraceConfiguration).getStackTraceLimit();
Expand All @@ -145,7 +147,7 @@ public static void startElasticsearchContainerAndClient() throws IOException {
processorEventHandler,
payloadSerializer,
apmServerClient);
realReporter = new ApmServerReporter(true, reporterConfiguration, v2handler, ReporterMonitor.NOOP, apmServerClient, payloadSerializer, new ObjectPoolFactory());
realReporter = new ApmServerReporter(true, reporterConfiguration, coreConfiguration, v2handler, ReporterMonitor.NOOP, apmServerClient, payloadSerializer, new ObjectPoolFactory());
realReporter.start();

tracer = new ElasticApmTracerBuilder()
Expand Down
Loading

0 comments on commit 337f174

Please sign in to comment.