initializeEmitter(Config sparkConf) {
if (token != null) {"REST Emitter Configuration: Token {}", "XXXXX");
if (disableSslVerification) {
log.warn("REST Emitter Configuration: ssl verification will be disabled.");
RestEmitterConfig restEmitterConf =
+ .maxRetries(max_retries)
+ .retryIntervalSec(retry_interval_in_sec)
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
} else {
@@ -145,7 +187,12 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
if (datahubConf.hasPath(STREAMING_JOB) && (datahubConf.getBoolean(STREAMING_JOB))) {
- emitter.emitCoalesced();
+ if (emitter != null) {
+ emitter.emitCoalesced();
+ } else {
+ log.warn("Emitter is not initialized, unable to emit coalesced events");
+ }
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("onApplicationEnd completed successfully in {} ms", elapsedTime);
@@ -170,6 +217,8 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) {
public void onJobStart(SparkListenerJobStart jobStart) {
long startTime = System.currentTimeMillis();
+ initializeContextFactoryIfNotInitialized();
log.debug("Job start called");
@@ -227,4 +276,72 @@ public void onOtherEvent(SparkListenerEvent event) {
log.debug("onOtherEvent completed successfully in {} ms", elapsedTime);
+ private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
+ meterRegistry =
+ MicrometerProvider.addMeterRegistryFromConfig(openLineageConfig.getMetricsConfig());
+ String disabledFacets;
+ if (openLineageConfig.getFacetsConfig() != null
+ && openLineageConfig.getFacetsConfig().getDisabledFacets() != null) {
+ disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets());
+ } else {
+ disabledFacets = "";
+ }
+ meterRegistry
+ .config()
+ .commonTags(
+ Tags.of(
+ Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
+ Tag.of("openlineage.spark.version", sparkVersion),
+ Tag.of("openlineage.spark.disabled.facets", disabledFacets)));
+ ((CompositeMeterRegistry) meterRegistry)
+ .getRegistries()
+ .forEach(
+ r ->
+ r.config()
+ .commonTags(
+ Tags.of(
+ Tag.of("openlineage.spark.integration.version", Versions.getVersion()),
+ Tag.of("openlineage.spark.version", sparkVersion),
+ Tag.of("openlineage.spark.disabled.facets", disabledFacets))));
+ }
+ private void initializeContextFactoryIfNotInitialized() {
+ if (contextFactory != null || isDisabled) {
+ return;
+ }
+ asJavaOptional(activeSparkContext.apply())
+ .ifPresent(context -> initializeContextFactoryIfNotInitialized(context.appName()));
+ }
+ private void initializeContextFactoryIfNotInitialized(String appName) {
+ if (contextFactory != null || isDisabled) {
+ return;
+ }
+ SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
+ if (sparkEnv == null) {
+ log.warn(
+ "OpenLineage listener instantiated, but no configuration could be found. "
+ + "Lineage events will not be collected");
+ return;
+ }
+ initializeContextFactoryIfNotInitialized(sparkEnv.conf(), appName);
+ }
+ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, String appName) {
+ if (contextFactory != null || isDisabled) {
+ return;
+ }
+ try {
+ SparkOpenLineageConfig config = ArgumentParser.parse(sparkConf);
+ // Needs to be done before initializing OpenLineageClient
+ initializeMetrics(config);
+ emitter = new DatahubEventEmitter(config, appName);
+ contextFactory = new ContextFactory(emitter, meterRegistry, config);
+ circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build();
+ OpenLineageSparkListener.init(contextFactory);
+ } catch (URISyntaxException e) {
+ log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e);
+ }
+ }
diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/
index 7e10f51feb38a4..f1af56ff888d3c 100644
--- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/
+++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/
@@ -29,6 +29,9 @@ public class SparkConfigParser {
public static final String GMS_URL_KEY = "rest.server";
public static final String GMS_AUTH_TOKEN = "rest.token";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
+ public static final String MAX_RETRIES = "rest.max_retries";
+ public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
@@ -304,7 +307,7 @@ public static boolean isCoalesceEnabled(Config datahubConfig) {
public static boolean isPatchEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(PATCH_ENABLED)) {
- return true;
+ return false;
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/
deleted file mode 100644
index 99643592dc200e..00000000000000
--- a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/
+++ /dev/null
@@ -1,493 +0,0 @@
-/* Copyright 2018-2023 contributors to the OpenLineage project
-/* SPDX-License-Identifier: Apache-2.0
-package io.openlineage.spark.agent.lifecycle;
-import static io.openlineage.client.OpenLineageClientUtils.mergeFacets;
-import static io.openlineage.spark.agent.util.ScalaConversionUtils.fromSeq;
-import static io.openlineage.spark.agent.util.ScalaConversionUtils.toScalaFn;
-import io.openlineage.client.OpenLineage;
-import io.openlineage.client.OpenLineage.DatasetFacet;
-import io.openlineage.client.OpenLineage.DatasetFacets;
-import io.openlineage.client.OpenLineage.InputDataset;
-import io.openlineage.client.OpenLineage.InputDatasetFacet;
-import io.openlineage.client.OpenLineage.InputDatasetInputFacets;
-import io.openlineage.client.OpenLineage.JobBuilder;
-import io.openlineage.client.OpenLineage.JobFacet;
-import io.openlineage.client.OpenLineage.OutputDataset;
-import io.openlineage.client.OpenLineage.OutputDatasetFacet;
-import io.openlineage.client.OpenLineage.OutputDatasetOutputFacets;
-import io.openlineage.client.OpenLineage.ParentRunFacet;
-import io.openlineage.client.OpenLineage.RunEvent;
-import io.openlineage.client.OpenLineage.RunEventBuilder;
-import io.openlineage.client.OpenLineage.RunFacet;
-import io.openlineage.client.OpenLineage.RunFacets;
-import io.openlineage.client.OpenLineage.RunFacetsBuilder;
-import io.openlineage.spark.agent.hooks.HookUtils;
-import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageUtils;
-import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageVisitor;
-import io.openlineage.spark.agent.util.FacetUtils;
-import io.openlineage.spark.agent.util.PlanUtils;
-import io.openlineage.spark.agent.util.ScalaConversionUtils;
-import io.openlineage.spark.api.CustomFacetBuilder;
-import io.openlineage.spark.api.OpenLineageContext;
-import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import lombok.AllArgsConstructor;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.scheduler.ActiveJob;
-import org.apache.spark.scheduler.JobFailed;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
-import org.apache.spark.scheduler.Stage;
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
-import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
-import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
-import scala.Function1;
-import scala.PartialFunction;
- * Event handler that accepts various {@link org.apache.spark.scheduler.SparkListener} events and
- * helps build up an {@link RunEvent} by passing event components to partial functions that know how
- * to convert those event components into {@link RunEvent} properties.
- *
- * The event types that can be consumed to generate @link OpenLineage.RunEvent} properties have
- * no common supertype, so the generic argument for the function input is simply {@link Object}. The
- * types of arguments that may be found include
- *
- *
- * - {@link org.apache.spark.scheduler.StageInfo}
- *
- {@link Stage}
- *
- {@link RDD}
- *
- {@link ActiveJob}
- *
- {@link org.apache.spark.sql.execution.QueryExecution}
- *
- *
- * These components are extracted from various {@link org.apache.spark.scheduler.SparkListener}
- * events, such as {@link SparkListenerStageCompleted}, {@link SparkListenerJobStart}, and {@link
- * org.apache.spark.scheduler.SparkListenerTaskEnd}.
- *
- *
{@link RDD} chains will be _flattened_ so each `RDD` dependency is passed to the builders one
- * at a time. This means a builder can directly specify the type of {@link RDD} it handles, such as
- * a {@link org.apache.spark.rdd.HadoopRDD} or a {@link
- * org.apache.spark.sql.execution.datasources.FileScanRDD}, without having to check the dependencies
- * of every {@link org.apache.spark.rdd.MapPartitionsRDD} or {@link
- * org.apache.spark.sql.execution.SQLExecutionRDD}.
- *
- *
Any {@link RunFacet}s and {@link JobFacet}s returned by the {@link CustomFacetBuilder}s are
- * appended to the {@link OpenLineage.Run} and {@link OpenLineage.Job}, respectively.
- *
- *
If any {@link OpenLineage.InputDatasetBuilder}s or {@link
- * OpenLineage.OutputDatasetBuilder}s are returned from the partial functions, the {@link
- * #inputDatasetBuilders} or {@link #outputDatasetBuilders} will be invoked using the same input
- * arguments in order to construct any {@link InputDatasetFacet}s or {@link OutputDatasetFacet}s to
- * the returned dataset. {@link InputDatasetFacet}s and {@link OutputDatasetFacet}s will be attached
- * to any {@link OpenLineage.InputDatasetBuilder} or {@link OpenLineage.OutputDatasetBuilder}
- * found for the event. This is because facets may be constructed from generic information that is
- * not specifically tied to a Dataset. For example, {@link
- * OpenLineage.OutputStatisticsOutputDatasetFacet}s are created from {@link
- * org.apache.spark.executor.TaskMetrics} attached to the last {@link
- * org.apache.spark.scheduler.StageInfo} for a given job execution. However, the {@link
- * OutputDataset} is constructed by reading the {@link LogicalPlan}. There's no way to tie the
- * output metrics in the {@link org.apache.spark.scheduler.StageInfo} to the {@link OutputDataset}
- * in the {@link LogicalPlan} except by inference. Similarly, input metrics can be found in the
- * {@link org.apache.spark.scheduler.StageInfo} for the stage that reads a dataset and the {@link
- * InputDataset} can usually be constructed by walking the {@link RDD} dependency tree for that
- * {@link Stage} and finding a {@link org.apache.spark.sql.execution.datasources.FileScanRDD} or
- * other concrete implementation. But while there is typically only one {@link InputDataset} read in
- * a given stage, there's no guarantee of that and the {@link org.apache.spark.executor.TaskMetrics}
- * in the {@link org.apache.spark.scheduler.StageInfo} won't disambiguate.
- *
- *
If a facet needs to be attached to a specific dataset, the user must take care to construct
- * both the Dataset and the Facet in the same builder.
- */
-class OpenLineageRunEventBuilder {
- @NonNull private final OpenLineageContext openLineageContext;
- @NonNull
- private final Collection>> inputDatasetBuilders;
- @NonNull
- private final Collection>>
- inputDatasetQueryPlanVisitors;
- @NonNull
- private final Collection>> outputDatasetBuilders;
- @NonNull
- private final Collection>>
- outputDatasetQueryPlanVisitors;
- @NonNull
- private final Collection> datasetFacetBuilders;
- @NonNull
- private final Collection>
- inputDatasetFacetBuilders;
- @NonNull
- private final Collection>
- outputDatasetFacetBuilders;
- @NonNull private final Collection> runFacetBuilders;
- @NonNull private final Collection> jobFacetBuilders;
- @NonNull private final Collection columnLineageVisitors;
- private final UnknownEntryFacetListener unknownEntryFacetListener =
- UnknownEntryFacetListener.getInstance();
- private final Map jobMap = new HashMap<>();
- private final Map stageMap = new HashMap<>();
- OpenLineageRunEventBuilder(OpenLineageContext context, OpenLineageEventHandlerFactory factory) {
- this(
- context,
- factory.createInputDatasetBuilder(context),
- factory.createInputDatasetQueryPlanVisitors(context),
- factory.createOutputDatasetBuilder(context),
- factory.createOutputDatasetQueryPlanVisitors(context),
- factory.createDatasetFacetBuilders(context),
- factory.createInputDatasetFacetBuilders(context),
- factory.createOutputDatasetFacetBuilders(context),
- factory.createRunFacetBuilders(context),
- factory.createJobFacetBuilders(context),
- factory.createColumnLevelLineageVisitors(context));
- }
- /**
- * Add an {@link ActiveJob} and all of its {@link Stage}s to the maps so we can look them up by id
- * later.
- *
- * @param job
- */
- void registerJob(ActiveJob job) {
- jobMap.put(job.jobId(), job);
- stageMap.put(job.finalStage().id(), job.finalStage());
- job.finalStage()
- .parents()
- .forall(
- toScalaFn(
- stage -> {
- stageMap.put(, stage);
- return true;
- }));
- }
- RunEvent buildRun(
- Optional parentRunFacet,
- RunEventBuilder runEventBuilder,
- JobBuilder jobBuilder,
- SparkListenerStageSubmitted event) {
- Stage stage = stageMap.get(event.stageInfo().stageId());
- RDD> rdd = stage.rdd();
- List