Skip to content

Commit

Permalink
feat(telemetry): cross-component async write tracing
Browse files Browse the repository at this point in the history
* created TraceContext for opentelemetry spans
* added tracing header/cookies to control logging trace info
* support legacy dropwizard tracing using opentelemetry
* added smoke-tests for tracing conditions
  • Loading branch information
david-leifker committed Jan 21, 2025
1 parent 262dd76 commit 18495bb
Show file tree
Hide file tree
Showing 146 changed files with 6,929 additions and 1,805 deletions.
7 changes: 5 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ buildscript {
ext.springVersion = '6.1.14'
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
ext.openTelemetryVersion = '1.18.0'
ext.openTelemetryVersion = '1.45.0'
ext.neo4jVersion = '5.14.0'
ext.neo4jTestVersion = '5.14.0'
ext.neo4jApocVersion = '5.14.0'
Expand Down Expand Up @@ -218,7 +218,10 @@ project.ext.externalDependency = [
'neo4jApocCore': 'org.neo4j.procedure:apoc-core:' + neo4jApocVersion,
'neo4jApocCommon': 'org.neo4j.procedure:apoc-common:' + neo4jApocVersion,
'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion,
'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion,
'opentelemetrySdk': 'io.opentelemetry:opentelemetry-sdk:' + openTelemetryVersion,
'opentelemetrySdkTrace': 'io.opentelemetry:opentelemetry-sdk-trace:' + openTelemetryVersion,
'opentelemetryAutoConfig': 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:' + openTelemetryVersion,
'opentelemetryAnnotations': 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.11.0',
'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15',
'parquet': 'org.apache.parquet:parquet-avro:1.12.3',
'parquetHadoop': 'org.apache.parquet:parquet-hadoop:1.13.1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.linkedin.metadata.service.ViewService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.opentelemetry.extension.annotations.WithSpan;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.codahale.metrics.Timer;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.types.BatchMutableType;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
Expand All @@ -33,25 +34,29 @@ public MutableTypeBatchResolver(final BatchMutableType<I, B, T> batchMutableType

@Override
public CompletableFuture<List<T>> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final OperationContext opContext = context.getOperationContext();

final B[] input =
bindArgument(environment.getArgument("input"), _batchMutableType.batchInputClass());

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
Timer.Context timer = MetricUtils.timer(this.getClass(), "batchMutate").time();

try {
return _batchMutableType.batchUpdate(input, environment.getContext());
} catch (AuthorizationException e) {
throw e;
} catch (Exception e) {
_logger.error("Failed to perform batchUpdate", e);
throw new IllegalArgumentException(e);
} finally {
timer.stop();
}
},
this.getClass().getSimpleName(),
"get");
return opContext.withSpan(
"batchMutate",
() ->
GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
return _batchMutableType.batchUpdate(input, environment.getContext());
} catch (AuthorizationException e) {
throw e;
} catch (Exception e) {
_logger.error("Failed to perform batchUpdate", e);
throw new IllegalArgumentException(e);
}
},
this.getClass().getSimpleName(),
"get"),
MetricUtils.DROPWIZARD_METRIC,
"true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import io.opentelemetry.extension.annotations.WithSpan;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.linkedin.metadata.query.SearchFlags;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.opentelemetry.extension.annotations.WithSpan;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
import com.linkedin.gms.factory.kafka.trace.KafkaTraceReaderFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import com.linkedin.gms.factory.trace.TraceServiceFactory;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
Expand All @@ -30,7 +32,9 @@
DataHubAuthorizerFactory.class,
SimpleKafkaConsumerFactory.class,
KafkaEventConsumerFactory.class,
GraphQLEngineFactory.class
GraphQLEngineFactory.class,
KafkaTraceReaderFactory.class,
TraceServiceFactory.class
})
})
public class UpgradeCliApplication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ protected OperationContext javaSystemOperationContext(
.alternateValidation(
configurationProvider.getFeatureFlags().isAlternateMCPValidation())
.build(),
null,
true);

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.datahub.upgrade.impl;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeContext;
Expand Down Expand Up @@ -119,44 +118,60 @@ private UpgradeResult executeInternal(UpgradeContext context) {
}

private UpgradeStepResult executeStepInternal(UpgradeContext context, UpgradeStep step) {
int retryCount = step.retryCount();
UpgradeStepResult result = null;
int maxAttempts = retryCount + 1;
for (int i = 0; i < maxAttempts; i++) {
try (Timer.Context completionTimer =
MetricUtils.timer(MetricRegistry.name(step.id(), "completionTime")).time()) {
try (Timer.Context executionTimer =
MetricUtils.timer(MetricRegistry.name(step.id(), "executionTime")).time()) {
result = step.executable().apply(context);
}

if (result == null) {
// Failed to even retrieve a result. Create a default failure result.
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
context
.report()
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
MetricUtils.counter(MetricRegistry.name(step.id(), "retry")).inc();
}

if (DataHubUpgradeState.SUCCEEDED.equals(result.result())) {
MetricUtils.counter(MetricRegistry.name(step.id(), "succeeded")).inc();
break;
}
} catch (Exception e) {
log.error("Caught exception during attempt {} of Step with id {}", i, step.id(), e);
context
.report()
.addLine(
String.format(
"Caught exception during attempt %s of Step with id %s: %s", i, step.id(), e));
MetricUtils.counter(MetricRegistry.name(step.id(), "failed")).inc();
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
context.report().addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
}
}

return result;
return context
.opContext()
.withSpan(
"completionTime",
() -> {
int retryCount = step.retryCount();
UpgradeStepResult result = null;
int maxAttempts = retryCount + 1;
for (int i = 0; i < maxAttempts; i++) {
try {
result =
context
.opContext()
.withSpan(
"executionTime",
() -> step.executable().apply(context),
"step.id",
step.id(),
MetricUtils.DROPWIZARD_NAME,
MetricUtils.name(step.id(), "executionTime"));

if (result == null) {
// Failed to even retrieve a result. Create a default failure result.
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
context
.report()
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
MetricUtils.counter(MetricRegistry.name(step.id(), "retry")).inc();
}

if (DataHubUpgradeState.SUCCEEDED.equals(result.result())) {
MetricUtils.counter(MetricRegistry.name(step.id(), "succeeded")).inc();
break;
}
} catch (Exception e) {
log.error(
"Caught exception during attempt {} of Step with id {}", i, step.id(), e);
context
.report()
.addLine(
String.format(
"Caught exception during attempt %s of Step with id %s: %s",
i, step.id(), e));
MetricUtils.counter(MetricRegistry.name(step.id(), "failed")).inc();
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
context
.report()
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
}
}
return result;
},
MetricUtils.DROPWIZARD_METRIC,
"true");
}

private void executeCleanupInternal(UpgradeContext context, UpgradeResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate)
@Nullable
SystemMetadata getSystemMetadata();

/**
* Set system metadata on the item
*
* @param systemMetadata
*/
void setSystemMetadata(@Nonnull SystemMetadata systemMetadata);

/**
* The entity's schema
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.sql.Timestamp;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.NotImplementedException;

/**
* An aspect along with system metadata and creation timestamp. Represents an aspect as stored in
Expand Down Expand Up @@ -36,4 +37,9 @@ default Optional<Long> getSystemMetadataVersion() {
.map(SystemMetadata::getVersion)
.map(Long::parseLong);
}

@Override
default void setSystemMetadata(@Nonnull SystemMetadata systemMetadata) {
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ default SystemMetadata getSystemMetadata() {
return getMetadataChangeLog().getSystemMetadata();
}

@Override
default void setSystemMetadata(@Nonnull SystemMetadata systemMetadata) {
getMetadataChangeLog().setSystemMetadata(systemMetadata);
}

default SystemMetadata getPreviousSystemMetadata() {
return getMetadataChangeLog().getPreviousSystemMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static <T extends RecordTemplate> Set<ChangeMCP> ofOneMCP(

private Urn urn;
private RecordTemplate recordTemplate;
private SystemMetadata systemMetadata;
@Setter private SystemMetadata systemMetadata;
private AuditStamp auditStamp;
private ChangeType changeType;
@Nonnull private final EntitySpec entitySpec;
Expand Down
Loading

0 comments on commit 18495bb

Please sign in to comment.