From 1f396e87c1c48ad7b8a9996dc94c227ffd53e876 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 20 Nov 2024 00:13:24 +0100 Subject: [PATCH] feat(spark): OpenLineage 1.24.2 upgrade (#11830) --- build.gradle | 2 +- entity-registry/build.gradle | 2 ++ .../java/acryl-spark-lineage/README.md | 1 + .../java/acryl-spark-lineage/build.gradle | 35 ++++++++++--------- .../datahub/spark/DatahubSparkListener.java | 8 +++-- .../datahub/spark/conf/SparkConfigParser.java | 2 ++ .../spark/agent/util/PlanUtils.java | 8 ++--- .../spark/agent/util/RddPathUtils.java | 18 ++++++---- .../java/datahub-client/build.gradle | 28 ++++++++++----- .../rest/DatahubHttpRequestRetryStrategy.java | 1 - .../java/datahub/client/rest/RestEmitter.java | 33 +++++++++++------ .../client/rest/RestEmitterConfig.java | 2 ++ .../java/openlineage-converter/build.gradle | 2 +- 13 files changed, 89 insertions(+), 53 deletions(-) diff --git a/build.gradle b/build.gradle index 6e6dadb7ebfa34..9ee756d41e11ef 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.19.0' + ext.openLineageVersion = '1.24.2' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/entity-registry/build.gradle b/entity-registry/build.gradle index 2dedea1f16d99c..22e5b601d39db2 100644 --- a/entity-registry/build.gradle +++ b/entity-registry/build.gradle @@ -25,6 +25,8 @@ dependencies { because("previous versions are vulnerable to CVE-2022-25857") } } + api project(path: ':li-utils') + api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') annotationProcessor externalDependency.lombok diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index bd0a58b635b483..267e979b0fa073 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -165,6 +165,7 @@ information like tokens. | spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | | spark.datahub.rest.token | | | Authentication token. | | spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || | spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | | spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | | spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | diff --git a/metadata-integration/java/acryl-spark-lineage/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle index 6620c34021ac4a..3f83e5657bbf4d 100644 --- a/metadata-integration/java/acryl-spark-lineage/build.gradle +++ b/metadata-integration/java/acryl-spark-lineage/build.gradle @@ -1,7 +1,7 @@ plugins { id("com.palantir.git-version") apply false } -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'io.codearte.nexus-staging' @@ -51,8 +51,8 @@ dependencies { implementation project(':metadata-integration:java:openlineage-converter') - implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow') - implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow') + implementation project(path: ':metadata-integration:java:datahub-client') + implementation project(path: ':metadata-integration:java:openlineage-converter') //implementation "io.acryl:datahub-client:0.10.2" implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion" @@ -91,6 +91,8 @@ shadowJar { zip64 = true archiveClassifier = '' mergeServiceFiles() + project.configurations.implementation.canBeResolved = true + configurations = [project.configurations.implementation] def exclude_modules = project .configurations @@ -106,6 +108,8 @@ shadowJar { exclude(dependency { exclude_modules.contains(it.name) }) + exclude(dependency("org.slf4j::")) + exclude("org/apache/commons/logging/**") } // preventing java multi-release JAR leakage @@ -123,39 +127,36 @@ shadowJar { relocate 'com.sun.activation', 'io.acryl.shaded.com.sun.activation' relocate 'com.sun.codemodel', 'io.acryl.shaded.com.sun.codemodel' relocate 'com.sun.mail', 'io.acryl.shaded.com.sun.mail' - relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson' - relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j' // relocate 'org.apache.hc', 'io.acryl.shaded.http' - relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec' - relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress' - relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3' + relocate 'org.apache.commons.codec', 'io.acryl.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'io.acryl.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'io.acryl.shaded.org.apache.commons.lang3' relocate 'mozilla', 'datahub.spark2.shaded.mozilla' - relocate 'com.typesafe', 'datahub.spark2.shaded.typesafe' - relocate 'io.opentracing', 'datahub.spark2.shaded.io.opentracing' - relocate 'io.netty', 'datahub.spark2.shaded.io.netty' - relocate 'ch.randelshofer', 'datahub.spark2.shaded.ch.randelshofer' - relocate 'ch.qos', 'datahub.spark2.shaded.ch.qos' + relocate 'com.typesafe', 'io.acryl.shaded.com.typesafe' + relocate 'io.opentracing', 'io.acryl.shaded.io.opentracing' + relocate 'io.netty', 'io.acryl.shaded.io.netty' + relocate 'ch.randelshofer', 'io.acryl.shaded.ch.randelshofer' + relocate 'ch.qos', 'io.acryl.shaded.ch.qos' relocate 'org.springframework', 'io.acryl.shaded.org.springframework' relocate 'com.fasterxml.jackson', 'io.acryl.shaded.jackson' relocate 'org.yaml', 'io.acryl.shaded.org.yaml' // Required for shading snakeyaml relocate 'net.jcip.annotations', 'io.acryl.shaded.annotations' relocate 'javassist', 'io.acryl.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'io.acryl.shaded.findbugs' - relocate 'org.antlr', 'io.acryl.shaded.org.antlr' - relocate 'antlr', 'io.acryl.shaded.antlr' + //relocate 'org.antlr', 'io.acryl.shaded.org.antlr' + //relocate 'antlr', 'io.acryl.shaded.antlr' relocate 'com.google.common', 'io.acryl.shaded.com.google.common' - relocate 'org.apache.commons', 'io.acryl.shaded.org.apache.commons' relocate 'org.reflections', 'io.acryl.shaded.org.reflections' relocate 'st4hidden', 'io.acryl.shaded.st4hidden' relocate 'org.stringtemplate', 'io.acryl.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'io.acryl.shaded.treelayout' - relocate 'org.slf4j', 'io.acryl.shaded.slf4j' relocate 'javax.annotation', 'io.acryl.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'io.acryl.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'io.acryl.shaded.org.checkerframework' relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone' relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna' + } checkShadowJar { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index ee0938edb50454..b594f6bae954fa 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -120,7 +120,9 @@ public Optional initializeEmitter(Config sparkConf) { boolean disableSslVerification = sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) && sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY); - + boolean disableChunkedEncoding = + sparkConf.hasPath(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING) + && sparkConf.getBoolean(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING); int retry_interval_in_sec = sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) ? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) @@ -150,6 +152,7 @@ public Optional initializeEmitter(Config sparkConf) { .disableSslVerification(disableSslVerification) .maxRetries(max_retries) .retryIntervalSec(retry_interval_in_sec) + .disableChunkedEncoding(disableChunkedEncoding) .build(); return Optional.of(new RestDatahubEmitterConfig(restEmitterConf)); case "kafka": @@ -374,7 +377,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) { String disabledFacets; if (openLineageConfig.getFacetsConfig() != null && openLineageConfig.getFacetsConfig().getDisabledFacets() != null) { - disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets()); + disabledFacets = + String.join(";", openLineageConfig.getFacetsConfig().getEffectiveDisabledFacets()); } else { disabledFacets = ""; } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 45ec5365d09b36..3860285083c4bb 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -30,6 +30,8 @@ public class SparkConfigParser { public static final String GMS_AUTH_TOKEN = "rest.token"; public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; + public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding"; + public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic"; diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java index d46d741d155b8b..5f87df2a65d6c2 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java @@ -5,14 +5,13 @@ package io.openlineage.spark.agent.util; -import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE; - import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import datahub.spark.conf.SparkLineageConf; import io.datahubproject.openlineage.dataset.HdfsPathDataset; import io.openlineage.client.OpenLineage; import io.openlineage.spark.agent.Versions; +import io.openlineage.spark.api.naming.NameNormalizer; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -21,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -186,7 +184,7 @@ public static OpenLineage.ParentRunFacet parentRunFacet( .run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build()) .job( new OpenLineage.ParentRunFacetJobBuilder() - .name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)) + .name(NameNormalizer.normalize(parentJob)) .namespace(parentJobNamespace) .build()) .build(); @@ -287,8 +285,6 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) { * @param pfn * @param x * @return - * @param - * @param */ public static List safeApply(PartialFunction> pfn, D x) { try { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java index 62005bf15f8505..6ef7403362a909 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; @@ -18,6 +19,7 @@ import org.apache.spark.rdd.MapPartitionsRDD; import org.apache.spark.rdd.ParallelCollectionRDD; import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.execution.datasources.FilePartition; import org.apache.spark.sql.execution.datasources.FileScanRDD; import scala.Tuple2; import scala.collection.immutable.Seq; @@ -90,7 +92,7 @@ public boolean isDefinedAt(Object rdd) { @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") public Stream extract(FileScanRDD rdd) { return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream() - .flatMap(fp -> Arrays.stream(fp.files())) + .flatMap((FilePartition fp) -> Arrays.stream(fp.files())) .map( f -> { if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) { @@ -115,11 +117,15 @@ public boolean isDefinedAt(Object rdd) { @Override public Stream extract(ParallelCollectionRDD rdd) { + int SEQ_LIMIT = 1000; + AtomicBoolean loggingDone = new AtomicBoolean(false); try { Object data = FieldUtils.readField(rdd, "data", true); log.debug("ParallelCollectionRDD data: {}", data); - if (data instanceof Seq) { - return ScalaConversionUtils.fromSeq((Seq) data).stream() + if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) { + // exit if the first element is invalid + Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT); + return ScalaConversionUtils.fromSeq(data_slice).stream() .map( el -> { Path path = null; @@ -127,9 +133,9 @@ public Stream extract(ParallelCollectionRDD rdd) { // we're able to extract path path = parentOf(((Tuple2) el)._1.toString()); log.debug("Found input {}", path); - } else { - // Change to debug to silence error - log.debug("unable to extract Path from {}", el.getClass().getCanonicalName()); + } else if (!loggingDone.get()) { + log.warn("unable to extract Path from {}", el.getClass().getCanonicalName()); + loggingDone.set(true); } return path; }) diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index d9087347e1b5c6..1bdc848d0385b1 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -1,6 +1,6 @@ plugins { id("com.palantir.git-version") apply false - id 'java' + id 'java-library' id 'com.github.johnrengelman.shadow' id 'jacoco' id 'signing' @@ -12,11 +12,13 @@ apply from: "../versioning.gradle" import org.apache.tools.ant.filters.ReplaceTokens -jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation +jar { + archiveClassifier = "lib" +} dependencies { - implementation project(':entity-registry') - implementation project(':metadata-integration:java:datahub-event') + api project(':entity-registry') + api project(':metadata-integration:java:datahub-event') implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } @@ -33,7 +35,7 @@ dependencies { implementation externalDependency.jacksonDataBind runtimeOnly externalDependency.jna - implementation externalDependency.slf4jApi + api externalDependency.slf4jApi compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok // VisibleForTesting @@ -78,6 +80,11 @@ shadowJar { // https://github.com/johnrengelman/shadow/issues/729 exclude('module-info.class', 'META-INF/versions/**', '**/LICENSE', '**/LICENSE*.txt', '**/NOTICE', '**/NOTICE.txt', 'licenses/**', 'log4j2.*', 'log4j.*') + dependencies { + exclude(dependency("org.slf4j::")) + exclude(dependency("antlr::")) + exclude("org/apache/commons/logging/**") + } mergeServiceFiles() // we relocate namespaces manually, because we want to know exactly which libs we are exposing and why // we can move to automatic relocation using ConfigureShadowRelocation after we get to a good place on these first @@ -88,15 +95,20 @@ shadowJar { relocate 'javassist', 'datahub.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'datahub.shaded.findbugs' relocate 'org.antlr', 'datahub.shaded.org.antlr' - relocate 'antlr', 'datahub.shaded.antlr' + //relocate 'antlr', 'datahub.shaded.antlr' relocate 'com.google.common', 'datahub.shaded.com.google.common' - relocate 'org.apache.commons', 'datahub.shaded.org.apache.commons' + relocate 'org.apache.commons.codec', 'datahub.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'datahub.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'datahub.shaded.org.apache.commons.lang3' + relocate 'org.apache.commons.lang', 'datahub.shaded.org.apache.commons.lang' + relocate 'org.apache.commons.cli', 'datahub.shaded.org.apache.commons.cli' + relocate 'org.apache.commons.text', 'datahub.shaded.org.apache.commons.text' + relocate 'org.apache.commons.io', 'datahub.shaded.org.apache.commons.io' relocate 'org.apache.maven', 'datahub.shaded.org.apache.maven' relocate 'org.reflections', 'datahub.shaded.org.reflections' relocate 'st4hidden', 'datahub.shaded.st4hidden' relocate 'org.stringtemplate', 'datahub.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'datahub.shaded.treelayout' - relocate 'org.slf4j', 'datahub.shaded.slf4j' relocate 'javax.annotation', 'datahub.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'datahub.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework' diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java index 71a4b93baf48f4..50c0277c98b03b 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java @@ -48,7 +48,6 @@ public boolean retryRequest( @Override public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) { - log.warn("Retrying request due to error: {}", response); return super.retryRequest(response, execCount, context); } } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index e1017372be124b..d70c5baf10879d 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -1,6 +1,7 @@ package datahub.client.rest; import static com.linkedin.metadata.Constants.*; +import static org.apache.hc.core5.http.HttpHeaders.*; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.StreamReadConstraints; @@ -18,6 +19,7 @@ import datahub.event.UpsertAspectRequest; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -26,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -97,17 +100,20 @@ public RestEmitter(RestEmitterConfig config) { this.config = config; HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder(); httpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy()); - - // Override httpClient settings with RestEmitter configs if present - if (config.getTimeoutSec() != null) { - httpClientBuilder.setDefaultRequestConfig( - RequestConfig.custom() - .setConnectionRequestTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .setResponseTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .build()); + if ((config.getTimeoutSec() != null) || (config.isDisableChunkedEncoding())) { + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + // Override httpClient settings with RestEmitter configs if present + if (config.getTimeoutSec() != null) { + requestConfigBuilder + .setConnectionRequestTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS) + .setResponseTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS); + } + if (config.isDisableChunkedEncoding()) { + requestConfigBuilder.setContentCompressionEnabled(false); + } + httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); } + PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder = PoolingAsyncClientConnectionManagerBuilder.create(); @@ -223,8 +229,13 @@ private Future postGeneric( if (this.config.getToken() != null) { simpleRequestBuilder.setHeader("Authorization", "Bearer " + this.config.getToken()); } + if (this.config.isDisableChunkedEncoding()) { + byte[] payloadBytes = payloadJson.getBytes(StandardCharsets.UTF_8); + simpleRequestBuilder.setBody(payloadBytes, ContentType.APPLICATION_JSON); + } else { + simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); + } - simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); AtomicReference responseAtomicReference = new AtomicReference<>(); CountDownLatch responseLatch = new CountDownLatch(1); FutureCallback httpCallback = diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java index e28ad4ed660f0b..55c11aab0ebf3c 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java @@ -30,6 +30,8 @@ public class RestEmitterConfig { Integer timeoutSec; @Builder.Default boolean disableSslVerification = false; + @Builder.Default boolean disableChunkedEncoding = false; + @Builder.Default int maxRetries = 0; @Builder.Default int retryIntervalSec = 10; diff --git a/metadata-integration/java/openlineage-converter/build.gradle b/metadata-integration/java/openlineage-converter/build.gradle index 2e04881ab5ccda..d149104f089b36 100644 --- a/metadata-integration/java/openlineage-converter/build.gradle +++ b/metadata-integration/java/openlineage-converter/build.gradle @@ -1,4 +1,4 @@ -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'maven-publish'