Skip to content

Commit

Permalink
feat(spark): OpenLineage 1.24.2 upgrade (datahub-project#11830)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Nov 19, 2024
1 parent 85c8e60 commit 1f396e8
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 53 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '15.5.2'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.19.0'
ext.openLineageVersion = '1.24.2'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
2 changes: 2 additions & 0 deletions entity-registry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies {
because("previous versions are vulnerable to CVE-2022-25857")
}
}
api project(path: ':li-utils')
api project(path: ':li-utils', configuration: "dataTemplate")
dataModel project(':li-utils')
annotationProcessor externalDependency.lombok

Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/acryl-spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ information like tokens.
| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. ||
| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set |
Expand Down
35 changes: 18 additions & 17 deletions metadata-integration/java/acryl-spark-lineage/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.palantir.git-version") apply false
}
apply plugin: 'java'
apply plugin: 'java-library'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'signing'
apply plugin: 'io.codearte.nexus-staging'
Expand Down Expand Up @@ -51,8 +51,8 @@ dependencies {

implementation project(':metadata-integration:java:openlineage-converter')

implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow')
implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow')
implementation project(path: ':metadata-integration:java:datahub-client')
implementation project(path: ':metadata-integration:java:openlineage-converter')

//implementation "io.acryl:datahub-client:0.10.2"
implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion"
Expand Down Expand Up @@ -91,6 +91,8 @@ shadowJar {
zip64 = true
archiveClassifier = ''
mergeServiceFiles()
project.configurations.implementation.canBeResolved = true
configurations = [project.configurations.implementation]

def exclude_modules = project
.configurations
Expand All @@ -106,6 +108,8 @@ shadowJar {
exclude(dependency {
exclude_modules.contains(it.name)
})
exclude(dependency("org.slf4j::"))
exclude("org/apache/commons/logging/**")
}

// preventing java multi-release JAR leakage
Expand All @@ -123,39 +127,36 @@ shadowJar {
relocate 'com.sun.activation', 'io.acryl.shaded.com.sun.activation'
relocate 'com.sun.codemodel', 'io.acryl.shaded.com.sun.codemodel'
relocate 'com.sun.mail', 'io.acryl.shaded.com.sun.mail'
relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson'
relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j'
//
relocate 'org.apache.hc', 'io.acryl.shaded.http'
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress'
relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3'
relocate 'org.apache.commons.codec', 'io.acryl.shaded.org.apache.commons.codec'
relocate 'org.apache.commons.compress', 'io.acryl.shaded.org.apache.commons.compress'
relocate 'org.apache.commons.lang3', 'io.acryl.shaded.org.apache.commons.lang3'
relocate 'mozilla', 'datahub.spark2.shaded.mozilla'
relocate 'com.typesafe', 'datahub.spark2.shaded.typesafe'
relocate 'io.opentracing', 'datahub.spark2.shaded.io.opentracing'
relocate 'io.netty', 'datahub.spark2.shaded.io.netty'
relocate 'ch.randelshofer', 'datahub.spark2.shaded.ch.randelshofer'
relocate 'ch.qos', 'datahub.spark2.shaded.ch.qos'
relocate 'com.typesafe', 'io.acryl.shaded.com.typesafe'
relocate 'io.opentracing', 'io.acryl.shaded.io.opentracing'
relocate 'io.netty', 'io.acryl.shaded.io.netty'
relocate 'ch.randelshofer', 'io.acryl.shaded.ch.randelshofer'
relocate 'ch.qos', 'io.acryl.shaded.ch.qos'
relocate 'org.springframework', 'io.acryl.shaded.org.springframework'
relocate 'com.fasterxml.jackson', 'io.acryl.shaded.jackson'
relocate 'org.yaml', 'io.acryl.shaded.org.yaml' // Required for shading snakeyaml
relocate 'net.jcip.annotations', 'io.acryl.shaded.annotations'
relocate 'javassist', 'io.acryl.shaded.javassist'
relocate 'edu.umd.cs.findbugs', 'io.acryl.shaded.findbugs'
relocate 'org.antlr', 'io.acryl.shaded.org.antlr'
relocate 'antlr', 'io.acryl.shaded.antlr'
//relocate 'org.antlr', 'io.acryl.shaded.org.antlr'
//relocate 'antlr', 'io.acryl.shaded.antlr'
relocate 'com.google.common', 'io.acryl.shaded.com.google.common'
relocate 'org.apache.commons', 'io.acryl.shaded.org.apache.commons'
relocate 'org.reflections', 'io.acryl.shaded.org.reflections'
relocate 'st4hidden', 'io.acryl.shaded.st4hidden'
relocate 'org.stringtemplate', 'io.acryl.shaded.org.stringtemplate'
relocate 'org.abego.treelayout', 'io.acryl.shaded.treelayout'
relocate 'org.slf4j', 'io.acryl.shaded.slf4j'
relocate 'javax.annotation', 'io.acryl.shaded.javax.annotation'
relocate 'com.github.benmanes.caffeine', 'io.acryl.shaded.com.github.benmanes.caffeine'
relocate 'org.checkerframework', 'io.acryl.shaded.org.checkerframework'
relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone'
relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna'

}

checkShadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
boolean disableSslVerification =
sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY)
&& sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);

boolean disableChunkedEncoding =
sparkConf.hasPath(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING)
&& sparkConf.getBoolean(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING);
int retry_interval_in_sec =
sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
Expand Down Expand Up @@ -150,6 +152,7 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
.disableSslVerification(disableSslVerification)
.maxRetries(max_retries)
.retryIntervalSec(retry_interval_in_sec)
.disableChunkedEncoding(disableChunkedEncoding)
.build();
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
case "kafka":
Expand Down Expand Up @@ -374,7 +377,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) {
String disabledFacets;
if (openLineageConfig.getFacetsConfig() != null
&& openLineageConfig.getFacetsConfig().getDisabledFacets() != null) {
disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets());
disabledFacets =
String.join(";", openLineageConfig.getFacetsConfig().getEffectiveDisabledFacets());
} else {
disabledFacets = "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class SparkConfigParser {
public static final String GMS_AUTH_TOKEN = "rest.token";
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";

public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@

package io.openlineage.spark.agent.util;

import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import datahub.spark.conf.SparkLineageConf;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.api.naming.NameNormalizer;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -21,7 +20,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -186,7 +184,7 @@ public static OpenLineage.ParentRunFacet parentRunFacet(
.run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build())
.job(
new OpenLineage.ParentRunFacetJobBuilder()
.name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT))
.name(NameNormalizer.normalize(parentJob))
.namespace(parentJobNamespace)
.build())
.build();
Expand Down Expand Up @@ -287,8 +285,6 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) {
* @param pfn
* @param x
* @return
* @param <T>
* @param <D>
*/
public static <T, D> List<T> safeApply(PartialFunction<D, List<T>> pfn, D x) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -18,6 +19,7 @@
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.ParallelCollectionRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.execution.datasources.FilePartition;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import scala.Tuple2;
import scala.collection.immutable.Seq;
Expand Down Expand Up @@ -90,7 +92,7 @@ public boolean isDefinedAt(Object rdd) {
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public Stream<Path> extract(FileScanRDD rdd) {
return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream()
.flatMap(fp -> Arrays.stream(fp.files()))
.flatMap((FilePartition fp) -> Arrays.stream(fp.files()))
.map(
f -> {
if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) {
Expand All @@ -115,21 +117,25 @@ public boolean isDefinedAt(Object rdd) {

@Override
public Stream<Path> extract(ParallelCollectionRDD rdd) {
int SEQ_LIMIT = 1000;
AtomicBoolean loggingDone = new AtomicBoolean(false);
try {
Object data = FieldUtils.readField(rdd, "data", true);
log.debug("ParallelCollectionRDD data: {}", data);
if (data instanceof Seq) {
return ScalaConversionUtils.fromSeq((Seq) data).stream()
if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) {
// exit if the first element is invalid
Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT);
return ScalaConversionUtils.fromSeq(data_slice).stream()
.map(
el -> {
Path path = null;
if (el instanceof Tuple2) {
// we're able to extract path
path = parentOf(((Tuple2) el)._1.toString());
log.debug("Found input {}", path);
} else {
// Change to debug to silence error
log.debug("unable to extract Path from {}", el.getClass().getCanonicalName());
} else if (!loggingDone.get()) {
log.warn("unable to extract Path from {}", el.getClass().getCanonicalName());
loggingDone.set(true);
}
return path;
})
Expand Down
28 changes: 20 additions & 8 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id("com.palantir.git-version") apply false
id 'java'
id 'java-library'
id 'com.github.johnrengelman.shadow'
id 'jacoco'
id 'signing'
Expand All @@ -12,11 +12,13 @@ apply from: "../versioning.gradle"
import org.apache.tools.ant.filters.ReplaceTokens


jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation
jar {
archiveClassifier = "lib"
}

dependencies {
implementation project(':entity-registry')
implementation project(':metadata-integration:java:datahub-event')
api project(':entity-registry')
api project(':metadata-integration:java:datahub-event')
implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand All @@ -33,7 +35,7 @@ dependencies {
implementation externalDependency.jacksonDataBind
runtimeOnly externalDependency.jna

implementation externalDependency.slf4jApi
api externalDependency.slf4jApi
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok
// VisibleForTesting
Expand Down Expand Up @@ -78,6 +80,11 @@ shadowJar {
// https://github.com/johnrengelman/shadow/issues/729
exclude('module-info.class', 'META-INF/versions/**',
'**/LICENSE', '**/LICENSE*.txt', '**/NOTICE', '**/NOTICE.txt', 'licenses/**', 'log4j2.*', 'log4j.*')
dependencies {
exclude(dependency("org.slf4j::"))
exclude(dependency("antlr::"))
exclude("org/apache/commons/logging/**")
}
mergeServiceFiles()
// we relocate namespaces manually, because we want to know exactly which libs we are exposing and why
// we can move to automatic relocation using ConfigureShadowRelocation after we get to a good place on these first
Expand All @@ -88,15 +95,20 @@ shadowJar {
relocate 'javassist', 'datahub.shaded.javassist'
relocate 'edu.umd.cs.findbugs', 'datahub.shaded.findbugs'
relocate 'org.antlr', 'datahub.shaded.org.antlr'
relocate 'antlr', 'datahub.shaded.antlr'
//relocate 'antlr', 'datahub.shaded.antlr'
relocate 'com.google.common', 'datahub.shaded.com.google.common'
relocate 'org.apache.commons', 'datahub.shaded.org.apache.commons'
relocate 'org.apache.commons.codec', 'datahub.shaded.org.apache.commons.codec'
relocate 'org.apache.commons.compress', 'datahub.shaded.org.apache.commons.compress'
relocate 'org.apache.commons.lang3', 'datahub.shaded.org.apache.commons.lang3'
relocate 'org.apache.commons.lang', 'datahub.shaded.org.apache.commons.lang'
relocate 'org.apache.commons.cli', 'datahub.shaded.org.apache.commons.cli'
relocate 'org.apache.commons.text', 'datahub.shaded.org.apache.commons.text'
relocate 'org.apache.commons.io', 'datahub.shaded.org.apache.commons.io'
relocate 'org.apache.maven', 'datahub.shaded.org.apache.maven'
relocate 'org.reflections', 'datahub.shaded.org.reflections'
relocate 'st4hidden', 'datahub.shaded.st4hidden'
relocate 'org.stringtemplate', 'datahub.shaded.org.stringtemplate'
relocate 'org.abego.treelayout', 'datahub.shaded.treelayout'
relocate 'org.slf4j', 'datahub.shaded.slf4j'
relocate 'javax.annotation', 'datahub.shaded.javax.annotation'
relocate 'com.github.benmanes.caffeine', 'datahub.shaded.com.github.benmanes.caffeine'
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public boolean retryRequest(

@Override
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
log.warn("Retrying request due to error: {}", response);
return super.retryRequest(response, execCount, context);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datahub.client.rest;

import static com.linkedin.metadata.Constants.*;
import static org.apache.hc.core5.http.HttpHeaders.*;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.StreamReadConstraints;
Expand All @@ -18,6 +19,7 @@
import datahub.event.UpsertAspectRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
Expand All @@ -26,6 +28,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -97,17 +100,20 @@ public RestEmitter(RestEmitterConfig config) {
this.config = config;
HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder();
httpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy());

// Override httpClient settings with RestEmitter configs if present
if (config.getTimeoutSec() != null) {
httpClientBuilder.setDefaultRequestConfig(
RequestConfig.custom()
.setConnectionRequestTimeout(
config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.setResponseTimeout(
config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS)
.build());
if ((config.getTimeoutSec() != null) || (config.isDisableChunkedEncoding())) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
// Override httpClient settings with RestEmitter configs if present
if (config.getTimeoutSec() != null) {
requestConfigBuilder
.setConnectionRequestTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS)
.setResponseTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS);
}
if (config.isDisableChunkedEncoding()) {
requestConfigBuilder.setContentCompressionEnabled(false);
}
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
}

PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder =
PoolingAsyncClientConnectionManagerBuilder.create();

Expand Down Expand Up @@ -223,8 +229,13 @@ private Future<MetadataWriteResponse> postGeneric(
if (this.config.getToken() != null) {
simpleRequestBuilder.setHeader("Authorization", "Bearer " + this.config.getToken());
}
if (this.config.isDisableChunkedEncoding()) {
byte[] payloadBytes = payloadJson.getBytes(StandardCharsets.UTF_8);
simpleRequestBuilder.setBody(payloadBytes, ContentType.APPLICATION_JSON);
} else {
simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON);
}

simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON);
AtomicReference<MetadataWriteResponse> responseAtomicReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
FutureCallback<SimpleHttpResponse> httpCallback =
Expand Down
Loading

0 comments on commit 1f396e8

Please sign in to comment.