Skip to content

Commit

Permalink
feat(spark/openlineage): Use Openlineage 1.13.1 in Spark Plugin (data…
Browse files Browse the repository at this point in the history
…hub-project#10433)

- Use Openlineage 1.13.1 in Spark Plugin
- Add retry option to datahub client and Spark Plugin
- Add OpenLineage integration doc
  • Loading branch information
treff7es authored May 7, 2024
1 parent 71759f9 commit d08f36f
Show file tree
Hide file tree
Showing 25 changed files with 785 additions and 1,195 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.5.0'
ext.openLineageVersion = '1.13.1'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
15 changes: 13 additions & 2 deletions docs-website/filterTagIndexes.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@
"Features": ""
}
},
{
"Path": "docs/lineage/dagster",
"imgPath": "img/logos/platforms/dagster.svg",
"Title": "Dagster",
"Description": "Dagster is a next-generation open source orchestration platform for the development, production, and observation of data assets..",
"tags": {
"Platform Type": "Orchestrator",
"Connection Type": "Pull",
"Features": "Stateful Ingestion, UI Ingestion, Status Aspect"
}
},
{
"Path": "docs/generated/ingestion/sources/databricks",
"imgPath": "img/logos/platforms/databricks.png",
Expand Down Expand Up @@ -433,7 +444,7 @@
"Path": "docs/generated/ingestion/sources/hive-metastore",
"imgPath": "img/logos/platforms/presto.svg",
"Title": "Hive Metastore",
"Description": "Presto on Hive is a data tool that allows users to query and analyze large datasets stored in Hive using SQL-like syntax.",
"Description": "Hive Metastore (HMS) is a service that stores metadata that is related to Hive, Presto, Trino and other services in a backend Relational Database Management System (RDBMS) ",
"tags": {
"Platform Type": "Datastore",
"Connection Type": "Pull",
Expand Down Expand Up @@ -551,7 +562,7 @@
}
},
{
"Path": "docs/metadata-integration/java/spark-lineage",
"Path": "docs/metadata-integration/java/spark-lineage-beta",
"imgPath": "img/logos/platforms/spark.svg",
"Title": "Spark",
"Description": "Spark is a data processing tool that enables fast and efficient processing of large-scale data sets using distributed computing.",
Expand Down
5 changes: 5 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ module.exports = {
id: "docs/lineage/dagster",
label: "Dagster",
},
{
type: "doc",
id: "docs/lineage/openlineage",
label: "OpenLineage",
},
{
type: "doc",
id: "metadata-integration/java/spark-lineage/README",
Expand Down
11 changes: 11 additions & 0 deletions docs-website/static/img/logos/platforms/dagster.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
92 changes: 92 additions & 0 deletions docs/lineage/openlineage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# OpenLineage

DataHub, now supports [OpenLineage](https://openlineage.io/) integration. With this support, DataHub can ingest and display lineage information from various data processing frameworks, providing users with a comprehensive understanding of their data pipelines.

## Features

- **REST Endpoint Support**: DataHub now includes a REST endpoint that can understand OpenLineage events. This allows users to send lineage information directly to DataHub, enabling easy integration with various data processing frameworks.

- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more.

## OpenLineage Support with DataHub

### 1. REST Endpoint Support

DataHub's REST endpoint allows users to send OpenLineage events directly to DataHub. This enables easy integration with various data processing frameworks, providing users with a centralized location for viewing and managing data lineage information.

With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub.

#### How to Use

To send OpenLineage messages to DataHub using the REST endpoint, simply make a POST request to the following endpoint:

```
POST GMS_SERVER_HOST:GMS_PORT/api/v2/lineage
```

Include the OpenLineage message in the request body in JSON format.

Example:

```json
{
"eventType": "START",
"eventTime": "2020-12-28T19:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "workshop",
"name": "process_taxes"
},
"inputs": [
{
"namespace": "postgres://workshop-db:None",
"name": "workshop.public.taxes",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",
"name": "postgres://workshop-db:None",
"uri": "workshop-db"
}
}
}
],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
}
```
##### How to set up Airflow
Follow the Airflow guide to setup the Airflow DAGs to send lineage information to DataHub. The guide can be found [here](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html
The transport should look like this:
```json
{"type": "http",
"url": "https://GMS_SERVER_HOST:GMS_PORT/openapi/openlineage/",
"endpoint": "api/v1/lineage",
"auth": {
"type": "api_key",
"api_key": "your-datahub-api-key"
}
}
```

#### Known Limitations
With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub.

- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns)) support is not yet available.

- **Column-level Lineage**: DataHub's current OpenLineage support does not provide full column-level lineage tracking.
- etc...
### 2. Spark Event Listener Plugin

DataHub's Spark Event Listener plugin enhances OpenLineage support by providing additional features such as PathSpec support, column-level lineage, and more.

#### How to Use

Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)

## References

- [OpenLineage](https://openlineage.io/)
- [DataHub OpenAPI Guide](../api/openapi/openapi-usage-guide.md)
- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)
4 changes: 2 additions & 2 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
}
}

compileOnly externalDependency.httpAsyncClient
compileOnly externalDependency.httpClient
implementation externalDependency.jacksonDataBind
runtimeOnly externalDependency.jna

Expand All @@ -41,7 +41,7 @@ dependencies {
testImplementation externalDependency.mockServer
testImplementation externalDependency.mockServerClient
testImplementation externalDependency.testContainers
testImplementation externalDependency.httpAsyncClient
testImplementation externalDependency.httpClient
testRuntimeOnly externalDependency.logbackClassic
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.http.HttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;

public class MetadataResponseFuture implements Future<MetadataWriteResponse> {
private final Future<HttpResponse> requestFuture;
private final Future<SimpleHttpResponse> requestFuture;
private final AtomicReference<MetadataWriteResponse> responseReference;
private final CountDownLatch responseLatch;
private final ResponseMapper mapper;

public MetadataResponseFuture(
Future<HttpResponse> underlyingFuture,
Future<SimpleHttpResponse> underlyingFuture,
AtomicReference<MetadataWriteResponse> responseAtomicReference,
CountDownLatch responseLatch) {
this.requestFuture = underlyingFuture;
Expand All @@ -25,7 +25,8 @@ public MetadataResponseFuture(
this.mapper = null;
}

public MetadataResponseFuture(Future<HttpResponse> underlyingFuture, ResponseMapper mapper) {
public MetadataResponseFuture(
Future<SimpleHttpResponse> underlyingFuture, ResponseMapper mapper) {
this.requestFuture = underlyingFuture;
this.responseReference = null;
this.responseLatch = null;
Expand All @@ -50,7 +51,7 @@ public boolean isDone() {
@SneakyThrows
@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
HttpResponse response = requestFuture.get();
SimpleHttpResponse response = requestFuture.get();
if (mapper != null) {
return mapper.map(response);
} else {
Expand All @@ -63,7 +64,7 @@ public MetadataWriteResponse get() throws InterruptedException, ExecutionExcepti
@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
HttpResponse response = requestFuture.get(timeout, unit);
SimpleHttpResponse response = requestFuture.get(timeout, unit);
if (mapper != null) {
return mapper.map(response);
} else {
Expand All @@ -75,6 +76,6 @@ public MetadataWriteResponse get(long timeout, TimeUnit unit)

@FunctionalInterface
public interface ResponseMapper {
MetadataWriteResponse map(HttpResponse httpResponse);
MetadataWriteResponse map(SimpleHttpResponse httpResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datahub.client.rest;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
import java.util.Arrays;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;

@Slf4j
public class DatahubHttpRequestRetryStrategy extends DefaultHttpRequestRetryStrategy {
public DatahubHttpRequestRetryStrategy() {
this(1, TimeValue.ofSeconds(10));
}

public DatahubHttpRequestRetryStrategy(int maxRetries, TimeValue retryInterval) {
super(
maxRetries,
retryInterval,
Arrays.asList(
InterruptedIOException.class,
UnknownHostException.class,
ConnectException.class,
ConnectionClosedException.class,
NoRouteToHostException.class,
SSLException.class),
Arrays.asList(
HttpStatus.SC_TOO_MANY_REQUESTS,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_INTERNAL_SERVER_ERROR));
}

@Override
public boolean retryRequest(
HttpRequest request, IOException exception, int execCount, HttpContext context) {
log.warn("Checking if retry is needed: {}", execCount);
return super.retryRequest(request, exception, execCount, context);
}

@Override
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
log.warn("Retrying request due to error: {}", response);
return super.retryRequest(response, execCount, context);
}
}
Loading

0 comments on commit d08f36f

Please sign in to comment.