diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 13c921e953c324..de3e0ca93e6b7e 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -58,7 +58,7 @@ jobs: echo "full_tag=$(get_tag)-full" >> $GITHUB_OUTPUT echo "unique_tag=$(get_unique_tag)" >> $GITHUB_OUTPUT echo "unique_slim_tag=$(get_unique_tag)-slim" >> $GITHUB_OUTPUT - echo "unique_full_tag=$(get_unique_tag)-full" >> $GITHUB_OUTPUT + echo "unique_full_tag=$(get_unique_tag)" >> $GITHUB_OUTPUT echo "python_release_version=$(get_python_docker_release_v)" >> $GITHUB_OUTPUT - name: Check whether publishing enabled id: publish @@ -501,7 +501,7 @@ jobs: platforms: linux/amd64,linux/arm64/v8 - name: Compute DataHub Ingestion (Base-Slim) Tag id: tag - run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT datahub_ingestion_base_full_build: name: Build and Push DataHub Ingestion (Base-Full) Docker Image runs-on: ubuntu-latest @@ -567,13 +567,13 @@ jobs: datahub-ingestion: - 'docker/datahub-ingestion/**' - name: Build codegen - if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }} + if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }} run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image uses: ishworkh/docker-image-artifact-download@v1 if: ${{ needs.setup.outputs.publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }} with: - image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }} + image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }} - name: Build and push Slim Image if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }} uses: ./.github/actions/docker-custom-build-and-push @@ -583,7 +583,7 @@ jobs: ${{ env.DATAHUB_INGESTION_IMAGE }} build-args: | BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }} - DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head' }} + DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }} RELEASE_VERSION=${{ needs.setup.outputs.python_release_version }} APP_ENV=slim tags: ${{ needs.setup.outputs.slim_tag }} @@ -595,7 +595,7 @@ jobs: platforms: linux/amd64,linux/arm64/v8 - name: Compute Tag id: tag - run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_slim_tag || 'head' }}" >> $GITHUB_OUTPUT + run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT datahub_ingestion_slim_scan: permissions: contents: read # for actions/checkout to fetch code @@ -650,7 +650,7 @@ jobs: datahub-ingestion: - 'docker/datahub-ingestion/**' - name: Build codegen - if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' }} + if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }} run: ./gradlew :metadata-ingestion:codegen - name: Download Base Image uses: ishworkh/docker-image-artifact-download@v1 diff --git a/build.gradle b/build.gradle index 07a0e6ad1f49fd..0a94991b131aac 100644 --- a/build.gradle +++ b/build.gradle @@ -289,6 +289,11 @@ subprojects { } // https://docs.gradle.org/current/userguide/performance.html maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1 + + if (project.configurations.getByName("testImplementation").getDependencies() + .any{ it.getName() == "testng" }) { + useTestNG() + } } afterEvaluate { diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/auth/ListAccessTokensResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/auth/ListAccessTokensResolverTest.java index 54b8d23bab301d..52d06f73dcfab9 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/auth/ListAccessTokensResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/auth/ListAccessTokensResolverTest.java @@ -1,5 +1,6 @@ package com.linkedin.datahub.graphql.resolvers.auth; +import com.datahub.authentication.Authentication; import com.google.common.collect.ImmutableList; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.TestUtils; @@ -8,6 +9,10 @@ import com.linkedin.datahub.graphql.generated.ListAccessTokenResult; import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.query.SearchFlags; +import com.linkedin.metadata.query.filter.SortCriterion; +import com.linkedin.metadata.search.SearchEntityArray; +import com.linkedin.metadata.search.SearchResult; import graphql.schema.DataFetchingEnvironment; import java.util.Collections; import org.mockito.Mockito; @@ -36,14 +41,17 @@ public void testGetSuccess() throws Exception { Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input); final EntityClient mockClient = Mockito.mock(EntityClient.class); - Mockito.when(Mockito.eq(mockClient.filter( - Mockito.eq(Constants.ACCESS_TOKEN_ENTITY_NAME), - Mockito.eq(buildFilter(filters, Collections.emptyList())), - Mockito.notNull(), - Mockito.eq(input.getStart()), - Mockito.eq(input.getCount()), - Mockito.eq(getAuthentication(mockEnv))))) - .thenReturn(null); + final Authentication testAuth = getAuthentication(mockEnv); + Mockito.when(mockClient.search( + Mockito.eq(Constants.ACCESS_TOKEN_ENTITY_NAME), + Mockito.eq(""), + Mockito.eq(buildFilter(filters, Collections.emptyList())), + Mockito.any(SortCriterion.class), + Mockito.eq(input.getStart()), + Mockito.eq(input.getCount()), + Mockito.eq(testAuth), + Mockito.any(SearchFlags.class))) + .thenReturn(new SearchResult().setFrom(0).setNumEntities(0).setPageSize(0).setEntities(new SearchEntityArray())); final ListAccessTokensResolver resolver = new ListAccessTokensResolver(mockClient); final ListAccessTokenResult listAccessTokenResult = resolver.get(mockEnv).get(); diff --git a/docker/datahub-ingestion-base/Dockerfile b/docker/datahub-ingestion-base/Dockerfile index 3d47f796173704..564cc19cc9a5f9 100644 --- a/docker/datahub-ingestion-base/Dockerfile +++ b/docker/datahub-ingestion-base/Dockerfile @@ -1,7 +1,7 @@ ARG APP_ENV=full ARG BASE_IMAGE=base -FROM golang:1-alpine3.17 AS binary +FROM golang:1-alpine3.17 AS dockerize-binary ENV DOCKERIZE_VERSION v0.6.1 WORKDIR /go/src/github.com/jwilder @@ -41,7 +41,7 @@ RUN apt-get update && apt-get install -y -qq \ && rm -rf /var/lib/apt/lists/* /var/cache/apk/* # compiled against newer golang for security fixes -COPY --from=binary /go/bin/dockerize /usr/local/bin +COPY --from=dockerize-binary /go/bin/dockerize /usr/local/bin COPY ./docker/datahub-ingestion-base/base-requirements.txt requirements.txt COPY ./docker/datahub-ingestion-base/entrypoint.sh /entrypoint.sh diff --git a/docker/datahub-ingestion/Dockerfile b/docker/datahub-ingestion/Dockerfile index 8b726df5e88420..0132ceaa9b1a95 100644 --- a/docker/datahub-ingestion/Dockerfile +++ b/docker/datahub-ingestion/Dockerfile @@ -1,7 +1,7 @@ # Defining environment ARG APP_ENV=full ARG BASE_IMAGE=acryldata/datahub-ingestion-base -ARG DOCKER_VERSION=latest +ARG DOCKER_VERSION=head FROM $BASE_IMAGE:$DOCKER_VERSION as base USER 0 diff --git a/docker/datahub-ingestion/Dockerfile-slim-only b/docker/datahub-ingestion/Dockerfile-slim-only index 9ae116f839aa07..cb8c27ab463c48 100644 --- a/docker/datahub-ingestion/Dockerfile-slim-only +++ b/docker/datahub-ingestion/Dockerfile-slim-only @@ -1,6 +1,6 @@ # Defining environment ARG BASE_IMAGE=acryldata/datahub-ingestion-base -ARG DOCKER_VERSION=latest +ARG DOCKER_VERSION=head-slim FROM $BASE_IMAGE:$DOCKER_VERSION as base USER 0 diff --git a/docker/postgres-setup/init.sh b/docker/postgres-setup/init.sh index 6c0adc8c69bddf..afc9bdfe4c6688 100755 --- a/docker/postgres-setup/init.sh +++ b/docker/postgres-setup/init.sh @@ -1,8 +1,13 @@ #!/bin/sh export PGPASSWORD=$POSTGRES_PASSWORD +POSTGRES_CREATE_DB=${POSTGRES_CREATE_DB:-true} +POSTGRES_CREATE_DB_CONNECTION_DB=${POSTGRES_CREATE_DB_CONNECTION_DB:-postgres} + # workaround create database if not exists, check https://stackoverflow.com/a/36591842 -psql -U $POSTGRES_USERNAME -h $POSTGRES_HOST -p $POSTGRES_PORT -tc "SELECT 1 FROM pg_database WHERE datname = '${DATAHUB_DB_NAME}'" | grep -q 1 || psql -U $POSTGRES_USERNAME -h $POSTGRES_HOST -p $POSTGRES_PORT -c "CREATE DATABASE ${DATAHUB_DB_NAME}" +if [ "$POSTGRES_CREATE_DB" = true ]; then + psql -d "$POSTGRES_CREATE_DB_CONNECTION_DB" -U "$POSTGRES_USERNAME" -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -tc "SELECT 1 FROM pg_database WHERE datname = '${DATAHUB_DB_NAME}'" | grep -q 1 || psql -d "$POSTGRES_CREATE_DB_CONNECTION_DB" -U "$POSTGRES_USERNAME" -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" -c "CREATE DATABASE ${DATAHUB_DB_NAME}" +fi sed -e "s/DATAHUB_DB_NAME/${DATAHUB_DB_NAME}/g" /init.sql | tee -a /tmp/init-final.sql -psql -d $DATAHUB_DB_NAME -U $POSTGRES_USERNAME -h $POSTGRES_HOST -p $POSTGRES_PORT < /tmp/init-final.sql +psql -d "$DATAHUB_DB_NAME" -U "$POSTGRES_USERNAME" -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" < /tmp/init-final.sql diff --git a/docs-website/src/pages/docs/index.js b/docs-website/src/pages/docs/index.js index a0462091a046dc..0e8bfdcf3b9d7f 100644 --- a/docs-website/src/pages/docs/index.js +++ b/docs-website/src/pages/docs/index.js @@ -180,8 +180,8 @@ const quickLinkContent = [ { title: "Developer Guides", icon: , - description: "Interact with DataHub programmatically ", - to: "/docs/cli", + description: "Interact with DataHub programmatically", + to: "/docs/api/datahub-apis", }, { title: "Feature Guides", diff --git a/docs/authorization/policies.md b/docs/authorization/policies.md index 27d8b15e5a73a2..e3606f2a3e48d3 100644 --- a/docs/authorization/policies.md +++ b/docs/authorization/policies.md @@ -145,28 +145,31 @@ For example, the following resource filter will apply the policy to datasets, ch ```json { - "resource": { - "criteria": [ - { - "field": "resource_type", - "values": [ - "dataset", - "chart", - "dashboard" - ], - "condition": "EQUALS" - }, - { - "field": "domain", - "values": [ - "urn:li:domain:domain1" - ], - "condition": "EQUALS" + "resources": { + "filter": { + "criteria": [ + { + "field": "RESOURCE_TYPE", + "condition": "EQUALS", + "values": [ + "dataset", + "chart", + "dashboard" + ] + }, + { + "field": "DOMAIN", + "values": [ + "urn:li:domain:domain1" + ], + "condition": "EQUALS" + } + ] } - ] - } + } } ``` +Where `resources` is inside the `info` aspect of a Policy. Supported fields are as follows diff --git a/docs/datahub_lite.md b/docs/datahub_lite.md index 3918b8cee78303..de0a20eed1d01f 100644 --- a/docs/datahub_lite.md +++ b/docs/datahub_lite.md @@ -7,7 +7,6 @@ import TabItem from '@theme/TabItem'; DataHub Lite is a lightweight embeddable version of DataHub with no external dependencies. It is intended to enable local developer tooling use-cases such as simple access to metadata for scripts and other tools. DataHub Lite is compatible with the DataHub metadata format and all the ingestion connectors that DataHub supports. -It was built as a reaction to [recap](https://github.com/recap-cloud/recap) to prove that a similar lightweight system could be built within DataHub quite easily. Currently DataHub Lite uses DuckDB under the covers as its default storage layer, but that might change in the future. ## Features diff --git a/docs/modeling/extending-the-metadata-model.md b/docs/modeling/extending-the-metadata-model.md index be2d7d795de701..ba101be16b98e7 100644 --- a/docs/modeling/extending-the-metadata-model.md +++ b/docs/modeling/extending-the-metadata-model.md @@ -16,7 +16,6 @@ An important question that will arise once you've decided to extend the metadata

- The green lines represent pathways that will lead to lesser friction for you to maintain your code long term. The red lines represent higher risk of conflicts in the future. We are working hard to move the majority of model extension use-cases to no-code / low-code pathways to ensure that you can extend the core metadata model without having to maintain a custom fork of DataHub. We will refer to the two options as the **open-source fork** and **custom repository** approaches in the rest of the document below. @@ -92,10 +91,11 @@ the annotation model. Define the entity within an `entity-registry.yml` file. Depending on your approach, the location of this file may vary. More on that in steps [4](#step-4-choose-a-place-to-store-your-model-extension) and [5](#step-5-attaching-your-non-key-aspects-to-the-entity). Example: + ```yaml - - name: dashboard - doc: A container of related data assets. - keyAspect: dashboardKey +- name: dashboard + doc: A container of related data assets. + keyAspect: dashboardKey ``` - name: The entity name/type, this will be present as a part of the Urn. @@ -196,8 +196,8 @@ The Aspect has four key components: its properties, the @Aspect annotation, the can be defined as PDL primitives, enums, records, or collections ( see [pdl schema documentation](https://linkedin.github.io/rest.li/pdl_schema)) references to other entities, of type Urn or optionally `Urn` -- **@Aspect annotation**: Declares record is an Aspect and includes it when serializing an entity. Unlike the following - two annotations, @Aspect is applied to the entire record, rather than a specific field. Note, you can mark an aspect +- **@Aspect annotation**: Declares record is an Aspect and includes it when serializing an entity. Unlike the following + two annotations, @Aspect is applied to the entire record, rather than a specific field. Note, you can mark an aspect as a timeseries aspect. Check out this [doc](metadata-model.md#timeseries-aspects) for details. - **@Searchable annotation**: This annotation can be applied to any primitive field or a map field to indicate that it should be indexed in Elasticsearch and can be searched on. For a complete guide on using the search annotation, see @@ -205,7 +205,7 @@ The Aspect has four key components: its properties, the @Aspect annotation, the - **@Relationship annotation**: These annotations create edges between the Entity’s Urn and the destination of the annotated field when the entities are ingested. @Relationship annotations must be applied to fields of type Urn. In the case of DashboardInfo, the `charts` field is an Array of Urns. The @Relationship annotation cannot be applied - directly to an array of Urns. That’s why you see the use of an Annotation override (`”/*”:) to apply the @Relationship + directly to an array of Urns. That’s why you see the use of an Annotation override (`"/*":`) to apply the @Relationship annotation to the Urn directly. Read more about overrides in the annotation docs further down on this page. After you create your Aspect, you need to attach to all the entities that it applies to. @@ -231,7 +231,7 @@ entities: - keyAspect: dashBoardKey aspects: # the name of the aspect must be the same as that on the @Aspect annotation on the class - - dashboardInfo + - dashboardInfo ``` Previously, you were required to add all aspects for the entity into an Aspect union. You will see examples of this pattern throughout the code-base (e.g. `DatasetAspect`, `DashboardAspect` etc.). This is no longer required. @@ -251,14 +251,39 @@ Then, run `./gradlew build` from the repository root to rebuild Datahub with acc Then, re-deploy metadata-service (gms), and mae-consumer and mce-consumer (optionally if you are running them unbundled). See [docker development](../../docker/README.md) for details on how to deploy during development. This will allow Datahub to read and write your new entity or extensions to existing entities, along with serving search and graph queries for that entity type. -To emit proposals to ingest from the Datahub CLI tool, first install datahub cli -locally [following the instructions here](../../metadata-ingestion/developing.md). `./gradlew build` generated the avro -schemas your local ingestion cli tool uses earlier. After following the developing guide, you should be able to emit -your new event using the local datahub cli. +### (Optional) Step 7: Use custom models with the Python SDK + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + + + +If you're purely using the custom models locally, you can use a local development-mode install of the DataHub CLI. + +Install the DataHub CLI locally by following the [developer instructions](../../metadata-ingestion/developing.md). +The `./gradlew build` command already generated the avro schemas for your local ingestion cli tool to use. +After following the developing guide, you should be able to emit your new event using the local DataHub CLI. + + + -Now you are ready to start ingesting metadata for your new entity! +If you want to use your custom models beyond your local machine without forking DataHub, then you can generate a custom model package that can be installed from other places. -### (Optional) Step 7: Extend the DataHub frontend to view your entity in GraphQL & React +This package should be installed alongside the base `acryl-datahub` package, and its metadata models will take precedence over the default ones. + +```bash +cd metadata-ingestion +../gradlew customPackageGenerate -Ppackage_name=my-company-datahub-models -Ppackage_version="0.0.1" +``` + +This will generate some Python build artifacts, which you can distribute within your team or publish to PyPI. +The command output will contain additional details and exact CLI commands you can use. + + + + +### (Optional) Step 8: Extend the DataHub frontend to view your entity in GraphQL & React If you are extending an entity with additional aspects, and you can use the auto-render specifications to automatically render these aspects to your satisfaction, you do not need to write any custom code. @@ -301,9 +326,9 @@ It takes the following parameters: - **autoRender**: boolean (optional) - defaults to false. When set to true, the aspect will automatically be displayed on entity pages in a tab using a default renderer. **_This is currently only supported for Charts, Dashboards, DataFlows, DataJobs, Datasets, Domains, and GlossaryTerms_**. - **renderSpec**: RenderSpec (optional) - config for autoRender aspects that controls how they are displayed. **_This is currently only supported for Charts, Dashboards, DataFlows, DataJobs, Datasets, Domains, and GlossaryTerms_**. Contains three fields: - - **displayType**: One of `tabular`, `properties`. Tabular should be used for a list of data elements, properties for a single data bag. - - **displayName**: How the aspect should be referred to in the UI. Determines the name of the tab on the entity page. - - **key**: For `tabular` aspects only. Specifies the key in which the array to render may be found. + - **displayType**: One of `tabular`, `properties`. Tabular should be used for a list of data elements, properties for a single data bag. + - **displayName**: How the aspect should be referred to in the UI. Determines the name of the tab on the entity page. + - **key**: For `tabular` aspects only. Specifies the key in which the array to render may be found. ##### Example @@ -329,34 +354,34 @@ It takes the following parameters: Thus far, we have implemented 11 fieldTypes: - 1. *KEYWORD* - Short text fields that only support exact matches, often used only for filtering + 1. _KEYWORD_ - Short text fields that only support exact matches, often used only for filtering + + 2. _TEXT_ - Text fields delimited by spaces/slashes/periods. Default field type for string variables. - 2. *TEXT* - Text fields delimited by spaces/slashes/periods. Default field type for string variables. + 3. _TEXT_PARTIAL_ - Text fields delimited by spaces/slashes/periods with partial matching support. Note, partial + matching is expensive, so this field type should not be applied to fields with long values (like description) - 3. *TEXT_PARTIAL* - Text fields delimited by spaces/slashes/periods with partial matching support. Note, partial - matching is expensive, so this field type should not be applied to fields with long values (like description) + 4. _WORD_GRAM_ - Text fields delimited by spaces, slashes, periods, dashes, or underscores with partial matching AND + word gram support. That is, the text will be split by the delimiters and can be matched with delimited queries + matching two, three, or four length tokens in addition to single tokens. As with partial match, this type is + expensive, so should not be applied to fields with long values such as description. - 4. *WORD_GRAM* - Text fields delimited by spaces, slashes, periods, dashes, or underscores with partial matching AND - word gram support. That is, the text will be split by the delimiters and can be matched with delimited queries - matching two, three, or four length tokens in addition to single tokens. As with partial match, this type is - expensive, so should not be applied to fields with long values such as description. + 5. _BROWSE_PATH_ - Field type for browse paths. Applies specific mappings for slash delimited paths. - 5. *BROWSE_PATH* - Field type for browse paths. Applies specific mappings for slash delimited paths. + 6. _URN_ - Urn fields where each sub-component inside the urn is indexed. For instance, for a data platform urn like + "urn:li:dataplatform:kafka", it will index the platform name "kafka" and ignore the common components - 6. *URN* - Urn fields where each sub-component inside the urn is indexed. For instance, for a data platform urn like - "urn:li:dataplatform:kafka", it will index the platform name "kafka" and ignore the common components + 7. _URN_PARTIAL_ - Urn fields where each sub-component inside the urn is indexed with partial matching support. - 7. *URN_PARTIAL* - Urn fields where each sub-component inside the urn is indexed with partial matching support. + 8. _BOOLEAN_ - Boolean fields used for filtering. - 8. *BOOLEAN* - Boolean fields used for filtering. + 9. _COUNT_ - Count fields used for filtering. - 9. *COUNT* - Count fields used for filtering. - - 10. *DATETIME* - Datetime fields used to represent timestamps. + 10. _DATETIME_ - Datetime fields used to represent timestamps. - 11. *OBJECT* - Each property in an object will become an extra column in Elasticsearch and can be referenced as - `field.property` in queries. You should be careful to not use it on objects with many properties as it can cause a - mapping explosion in Elasticsearch. + 11. _OBJECT_ - Each property in an object will become an extra column in Elasticsearch and can be referenced as + `field.property` in queries. You should be careful to not use it on objects with many properties as it can cause a + mapping explosion in Elasticsearch. - **fieldName**: string (optional) - The name of the field in search index document. Defaults to the field name where the annotation resides. @@ -401,13 +426,13 @@ Now, when Datahub ingests Dashboards, it will index the Dashboard’s title in E Dashboards, that query will be used to search on the title index and matching Dashboards will be returned. Note, when @Searchable annotation is applied to a map, it will convert it into a list with "key.toString() -=value.toString()" as elements. This allows us to index map fields, while not increasing the number of columns indexed. +=value.toString()" as elements. This allows us to index map fields, while not increasing the number of columns indexed. This way, the keys can be queried by `aMapField:key1=value1`. -You can change this behavior by specifying the fieldType as OBJECT in the @Searchable annotation. It will put each key -into a column in Elasticsearch instead of an array of serialized kay-value pairs. This way the query would look more +You can change this behavior by specifying the fieldType as OBJECT in the @Searchable annotation. It will put each key +into a column in Elasticsearch instead of an array of serialized kay-value pairs. This way the query would look more like `aMapField.key1:value1`. As this method will increase the number of columns with each unique key - large maps can -cause a mapping explosion in Elasticsearch. You should *not* use the object fieldType if you expect your maps to get +cause a mapping explosion in Elasticsearch. You should _not_ use the object fieldType if you expect your maps to get large. #### @Relationship diff --git a/metadata-ingestion/.gitignore b/metadata-ingestion/.gitignore index 673c8e0995872b..acc15c45988698 100644 --- a/metadata-ingestion/.gitignore +++ b/metadata-ingestion/.gitignore @@ -8,6 +8,7 @@ bq_credentials.json junit.*.xml /tmp *.bak +custom-package/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 3b1aae0b24f885..a0fef614528cbe 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -176,7 +176,7 @@ The `deploy` subcommand of the `ingest` command tree allows users to upload thei datahub ingest deploy -n -c recipe.yaml ``` -By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Timezones are inferred from the system time, can be overriden with `--time-zone` flag. +By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Schedule timezones are UTC by default and can be overriden with `--time-zone` flag. ```shell datahub ingest deploy -n test --schedule "0 * * * *" --time-zone "Europe/London" -c recipe.yaml ``` diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index c20d98cbcbb58b..ea7990ab9c6608 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -62,6 +62,14 @@ task codegen(type: Exec, dependsOn: [environmentSetup, installPackage, ':metadat commandLine 'bash', '-c', "source ${venv_name}/bin/activate && ./scripts/codegen.sh" } +task customPackageGenerate(type: Exec, dependsOn: [environmentSetup, installPackage, ':metadata-events:mxe-schemas:build']) { + def package_name = project.findProperty('package_name') + def package_version = project.findProperty('package_version') + commandLine 'bash', '-c', + "source ${venv_name}/bin/activate && " + + "./scripts/custom_package_codegen.sh '${package_name}' '${package_version}'" +} + task install(dependsOn: [installPackage, codegen]) task installDev(type: Exec, dependsOn: [install]) { diff --git a/metadata-ingestion/docs/sources/kafka-connect/README.md b/metadata-ingestion/docs/sources/kafka-connect/README.md index 5031bff5a3fac0..e4f64c62914c57 100644 --- a/metadata-ingestion/docs/sources/kafka-connect/README.md +++ b/metadata-ingestion/docs/sources/kafka-connect/README.md @@ -21,4 +21,4 @@ This ingestion source maps the following Source System Concepts to DataHub Conce Works only for - Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph -- Sink connectors: BigQuery +- Sink connectors: BigQuery, Confluent S3, Snowflake diff --git a/metadata-ingestion/docs/sources/kafka/kafka.md b/metadata-ingestion/docs/sources/kafka/kafka.md index 2e8baa9516d17c..9fdfc3a3af1d02 100644 --- a/metadata-ingestion/docs/sources/kafka/kafka.md +++ b/metadata-ingestion/docs/sources/kafka/kafka.md @@ -130,3 +130,86 @@ message MessageWithMap { repeated Map1Entry map_1 = 1; } ``` + +### Enriching DataHub metadata with automated meta mapping + +:::note +Meta mapping is currently only available for Avro schemas +::: + +Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms. + +#### Simple tags + +If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the `schema_tags_field` config. + +Example Avro schema: + +```json +{ + "name": "sampleRecord", + "type": "record", + "tags": ["tag1", "tag2"], + "fields": [{ + "name": "field_1", + "type": "string", + "tags": ["tag3", "tag4"] + }] +} +``` + +The name of the field containing a list of tags can be configured with the `schema_tags_field` property: + +```yaml +config: + schema_tags_field: tags +``` + +#### meta mapping + +You can also map specific Avro fields into Owners, Tags and Terms using meta +mapping. + +Example Avro schema: + +```json +{ + "name": "sampleRecord", + "type": "record", + "owning_team": "@Data-Science", + "data_tier": "Bronze", + "fields": [{ + "name": "field_1", + "type": "string", + "gdpr": { + "pii": true + } + }] +} +``` + +This can be mapped to DataHub metadata with `meta_mapping` config: + +```yaml +config: + meta_mapping: + owning_team: + match: "^@(.*)" + operation: "add_owner" + config: + owner_type: group + data_tier: + match: "Bronze|Silver|Gold" + operation: "add_term" + config: + term: "{{ $match }}" + field_meta_mapping: + gdpr.pii: + match: true + operation: "add_tag" + config: + tag: "pii" +``` + +The underlying implementation is similar to [dbt meta mapping](https://datahubproject.io/docs/generated/ingestion/sources/dbt#dbt-meta-automated-mappings), which has more detailed examples that can be used for reference. + diff --git a/metadata-ingestion/docs/sources/looker/lookml_post.md b/metadata-ingestion/docs/sources/looker/lookml_post.md index 818cb681c4e90e..8ebbab4b9ed48d 100644 --- a/metadata-ingestion/docs/sources/looker/lookml_post.md +++ b/metadata-ingestion/docs/sources/looker/lookml_post.md @@ -2,11 +2,11 @@ :::note -The integration can use an SQL parser to try to parse the tables the views depends on. +The integration can use an SQL parser to try to parse the tables the views depends on. ::: -This parsing is disabled by default, but can be enabled by setting `parse_table_names_from_sql: True`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package. +This parsing is disabled by default, but can be enabled by setting `parse_table_names_from_sql: True`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package. As this package doesn't officially support all the SQL dialects that Looker supports, the result might not be correct. You can, however, implement a custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser` and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser. @@ -15,12 +15,14 @@ and must be made available to Datahub by ,for example, installing it. The config Looker projects support organization as multiple git repos, with [remote includes that can refer to projects that are stored in a different repo](https://cloud.google.com/looker/docs/importing-projects#include_files_from_an_imported_project). If your Looker implementation uses multi-project setup, you can configure the LookML source to pull in metadata from your remote projects as well. If you are using local or remote dependencies, you will see include directives in your lookml files that look like this: + ``` include: "//e_flights/views/users.view.lkml" include: "//e_commerce/public/orders.view.lkml" ``` Also, you will see projects that are being referred to listed in your `manifest.lkml` file. Something like this: + ``` project_name: this_project @@ -34,9 +36,9 @@ remote_dependency: ga_360_block { } ``` - To ingest Looker repositories that are including files defined in other projects, you will need to use the `project_dependencies` directive within the configuration section. Consider the following scenario: + - Your primary project refers to a remote project called `my_remote_project` - The remote project is homed in the GitHub repo `my_org/my_remote_project` - You have provisioned a GitHub deploy key and stored the credential in the environment variable (or UI secret), `${MY_REMOTE_PROJECT_DEPLOY_KEY}` @@ -71,6 +73,23 @@ source: :::note -This is not the same as ingesting the remote project as a primary Looker project because DataHub will not be processing the model files that might live in the remote project. If you want to additionally include the views accessible via the models in the remote project, create a second recipe where your remote project is the primary project. +This is not the same as ingesting the remote project as a primary Looker project because DataHub will not be processing the model files that might live in the remote project. If you want to additionally include the views accessible via the models in the remote project, create a second recipe where your remote project is the primary project. ::: + +### Debugging LookML Parsing Errors + +If you see messages like `my_file.view.lkml': "failed to load view file: Unable to find a matching expression for '' on line 5"` in the failure logs, it indicates a parsing error for the LookML file. + +The first thing to check is that the Looker IDE can validate the file without issues. You can check this by clicking this "Validate LookML" button in the IDE when in development mode. + +If that's not the issue, it might be because DataHub's parser, which is based on the [joshtemple/lkml](https://github.com/joshtemple/lkml) library, is slightly more strict than the official Looker parser. +Note that there's currently only one known discrepancy between the two parsers, and it's related to using [multiple colon characters](https://github.com/joshtemple/lkml/issues/82) when defining parameters. + +To check if DataHub can parse your LookML file syntax, you can use the `lkml` CLI tool. If this raises an exception, DataHub will fail to parse the file. + +```sh +pip install lkml + +lkml path/to/my_file.view.lkml +``` diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index 29ffa571c0ac8f..a9b9b4b20f5ac8 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -343,8 +343,15 @@ class AspectBag(TypedDict, total=False): "schemas_path", type=click.Path(exists=True, file_okay=False), required=True ) @click.argument("outdir", type=click.Path(), required=True) +@click.option("--check-unused-aspects", is_flag=True, default=False) +@click.option("--enable-custom-loader", is_flag=True, default=True) def generate( - entity_registry: str, pdl_path: str, schemas_path: str, outdir: str + entity_registry: str, + pdl_path: str, + schemas_path: str, + outdir: str, + check_unused_aspects: bool, + enable_custom_loader: bool, ) -> None: entities = load_entity_registry(Path(entity_registry)) schemas = load_schemas(schemas_path) @@ -388,10 +395,13 @@ def generate( aspect["Aspect"]["entityDoc"] = entity.doc # Check for unused aspects. We currently have quite a few. - # unused_aspects = set(aspects.keys()) - set().union( - # {entity.keyAspect for entity in entities}, - # *(set(entity.aspects) for entity in entities), - # ) + if check_unused_aspects: + unused_aspects = set(aspects.keys()) - set().union( + {entity.keyAspect for entity in entities}, + *(set(entity.aspects) for entity in entities), + ) + if unused_aspects: + raise ValueError(f"Unused aspects: {unused_aspects}") merged_schema = merge_schemas(list(schemas.values())) write_schema_files(merged_schema, outdir) @@ -404,6 +414,35 @@ def generate( Path(outdir) / "schema_classes.py", ) + if enable_custom_loader: + # Move schema_classes.py -> _schema_classes.py + # and add a custom loader. + (Path(outdir) / "_schema_classes.py").write_text( + (Path(outdir) / "schema_classes.py").read_text() + ) + (Path(outdir) / "schema_classes.py").write_text( + """ +# This is a specialized shim layer that allows us to dynamically load custom models from elsewhere. + +import importlib +from typing import TYPE_CHECKING + +from datahub.utilities._custom_package_loader import get_custom_models_package + +_custom_package_path = get_custom_models_package() + +if TYPE_CHECKING or not _custom_package_path: + from ._schema_classes import * + + # Required explicitly because __all__ doesn't include _ prefixed names. + from ._schema_classes import _Aspect, __SCHEMA_TYPES +else: + _custom_package = importlib.import_module(_custom_package_path) + globals().update(_custom_package.__dict__) + +""" + ) + # Keep a copy of a few raw avsc files. required_avsc_schemas = {"MetadataChangeEvent", "MetadataChangeProposal"} schema_save_dir = Path(outdir) / "schemas" diff --git a/metadata-ingestion/scripts/custom_package_codegen.py b/metadata-ingestion/scripts/custom_package_codegen.py new file mode 100644 index 00000000000000..4a674550d49df0 --- /dev/null +++ b/metadata-ingestion/scripts/custom_package_codegen.py @@ -0,0 +1,119 @@ +import re +import subprocess +import sys +from pathlib import Path + +import avro_codegen +import click + +if sys.version_info < (3, 10): + from importlib_metadata import version +else: + from importlib.metadata import version + +_avrogen_version = version("avro-gen3") + +autogen_header = """# Autogenerated by datahub's custom_package_codegen.py +# DO NOT EDIT THIS FILE DIRECTLY +""" + + +def python_package_name_normalize(name): + return re.sub(r"[-_.]+", "_", name).lower() + + +@click.command() +@click.argument( + "entity_registry", type=click.Path(exists=True, dir_okay=False), required=True +) +@click.argument( + "pdl_path", type=click.Path(exists=True, file_okay=False), required=True +) +@click.argument( + "schemas_path", type=click.Path(exists=True, file_okay=False), required=True +) +@click.argument("outdir", type=click.Path(), required=True) +@click.argument("package_name", type=str, required=True) +@click.argument("package_version", type=str, required=True) +@click.pass_context +def generate( + ctx: click.Context, + entity_registry: str, + pdl_path: str, + schemas_path: str, + outdir: str, + package_name: str, + package_version: str, +) -> None: + package_path = Path(outdir) / package_name + if package_path.is_absolute(): + raise click.UsageError("outdir must be a relative path") + + python_package_name = python_package_name_normalize(package_name) + click.echo( + f"Generating distribution {package_name} (package name {python_package_name}) at {package_path}" + ) + + src_path = package_path / "src" / python_package_name + src_path.mkdir(parents=True) + + ctx.invoke( + avro_codegen.generate, + entity_registry=entity_registry, + pdl_path=pdl_path, + schemas_path=schemas_path, + outdir=str(src_path / "models"), + enable_custom_loader=False, + ) + + (src_path / "__init__.py").write_text( + f"""{autogen_header} +__package_name__ = "{package_name}" +__version__ = "{package_version}" +""" + ) + + (package_path / "setup.py").write_text( + f"""{autogen_header} +from setuptools import setup + +_package_name = "{package_name}" +_package_version = "{package_version}" + +setup( + name=_package_name, + version=_package_version, + install_requires=[ + "avro-gen3=={_avrogen_version}", + "acryl-datahub", + ], + entry_points={{ + "datahub.custom_packages": [ + "models={python_package_name}.models.schema_classes", + ], + }}, +) +""" + ) + + # TODO add a README.md? + click.echo("Building package...") + subprocess.run(["python", "-m", "build", str(package_path)]) + + click.echo() + click.secho(f"Generated package at {package_path}", fg="green") + click.echo( + "This package should be installed alongside the main acryl-datahub package." + ) + click.echo() + click.echo(f"Install the custom package locally with `pip install {package_path}`") + click.echo( + f"To enable others to use it, share the file at {package_path}/dist/*.whl and have them install it with `pip install .whl`" + ) + click.echo( + f"Alternatively, publish it to PyPI with `twine upload {package_path}/dist/*`" + ) + + +if __name__ == "__main__": + generate() diff --git a/metadata-ingestion/scripts/custom_package_codegen.sh b/metadata-ingestion/scripts/custom_package_codegen.sh new file mode 100755 index 00000000000000..aec6293a4ef450 --- /dev/null +++ b/metadata-ingestion/scripts/custom_package_codegen.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -euo pipefail + +OUTDIR=./custom-package +PACKAGE_NAME="${1:?package name is required}" +PACKAGE_VERSION="${2:?package version is required}" + +# Note: this assumes that datahub has already been built with `./gradlew build`. +DATAHUB_ROOT=.. + +SCHEMAS_PDL="$DATAHUB_ROOT/metadata-models/src/main/pegasus/com/linkedin" +SCHEMAS_AVSC="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin" +ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml" + +rm -r $OUTDIR 2>/dev/null || true +python scripts/custom_package_codegen.py $ENTITY_REGISTRY $SCHEMAS_PDL $SCHEMAS_AVSC $OUTDIR "$PACKAGE_NAME" "$PACKAGE_VERSION" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index af1ea42a4c1e09..9c8bf2466181bc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -36,10 +36,11 @@ def get_long_description(): "click-default-group", "PyYAML", "toml>=0.10.0", - "entrypoints", + # In Python 3.10+, importlib_metadata is included in the standard library. + "importlib_metadata>=4.0.0; python_version < '3.10'", "docker", "expandvars>=0.6.5", - "avro-gen3==0.7.10", + "avro-gen3==0.7.11", # "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3", "avro>=1.10.2,<1.11", "python-dateutil>=2.8.0", @@ -404,7 +405,12 @@ def get_long_description(): "types-pkg_resources", "types-six", "types-python-dateutil", - "types-requests>=2.28.11.6", + # We need to avoid 2.31.0.5 and 2.31.0.4 due to + # https://github.com/python/typeshed/issues/10764. Once that + # issue is resolved, we can remove the upper bound and change it + # to a != constraint. + # We have a PR up to fix the underlying issue: https://github.com/python/typeshed/pull/10776. + "types-requests>=2.28.11.6,<=2.31.0.3", "types-toml", "types-PyMySQL", "types-PyYAML", @@ -425,7 +431,6 @@ def get_long_description(): "types-termcolor>=1.0.0", "types-Deprecated", "types-protobuf>=4.21.0.1", - "types-tzlocal", "sqlalchemy2-stubs", } @@ -509,6 +514,7 @@ def get_long_description(): "nifi", "vertica", "mode", + "kafka-connect", ] if plugin for dependency in plugins[plugin] @@ -647,6 +653,7 @@ def get_long_description(): "datahub = datahub.ingestion.reporting.datahub_ingestion_run_summary_provider:DatahubIngestionRunSummaryProvider", "file = datahub.ingestion.reporting.file_reporter:FileReporter", ], + "datahub.custom_packages": [], } @@ -713,6 +720,7 @@ def get_long_description(): ] ) ), + "cloud": ["acryl-datahub-cloud"], "dev": list(dev_requirements), "testing-utils": list(test_api_requirements), # To import `datahub.testing` "integration-tests": list(full_test_dev_requirements), diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index f20272ecd9dbfb..f7996900f7a7ad 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -131,7 +131,7 @@ def plugins(verbose: bool) -> None: """List the enabled ingestion plugins.""" click.secho("Sources:", bold=True) - click.echo(source_registry.summary(verbose=verbose)) + click.echo(source_registry.summary(verbose=verbose, col_width=25)) click.echo() click.secho("Sinks:", bold=True) click.echo(sink_registry.summary(verbose=verbose)) diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 60bafaadc0ed48..22bcb1ec61add6 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -10,7 +10,6 @@ import click import click_spinner -import tzlocal from click_default_group import DefaultGroup from tabulate import tabulate @@ -256,17 +255,17 @@ async def run_ingestion_and_check_upgrade() -> int: @click.option( "--time-zone", type=str, - help=f"Timezone for the schedule. By default uses the timezone of the current system: {tzlocal.get_localzone_name()}.", + help="Timezone for the schedule in 'America/New_York' format. Uses UTC by default.", required=False, - default=tzlocal.get_localzone_name(), + default="UTC", ) def deploy( name: str, config: str, - urn: str, + urn: Optional[str], executor_id: str, - cli_version: str, - schedule: str, + cli_version: Optional[str], + schedule: Optional[str], time_zone: str, ) -> None: """ @@ -284,8 +283,6 @@ def deploy( resolve_env_vars=False, ) - graphql_query: str - variables: dict = { "urn": urn, "name": name, @@ -304,7 +301,7 @@ def deploy( exit() logger.info("Found recipe URN, will update recipe.") - graphql_query = textwrap.dedent( + graphql_query: str = textwrap.dedent( """ mutation updateIngestionSource( $urn: String!, diff --git a/metadata-ingestion/src/datahub/ingestion/api/registry.py b/metadata-ingestion/src/datahub/ingestion/api/registry.py index 56ea716948199e..7d8192aff83d59 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/registry.py +++ b/metadata-ingestion/src/datahub/ingestion/api/registry.py @@ -15,18 +15,17 @@ Union, ) -import entrypoints import typing_inspect from datahub import __package_name__ from datahub.configuration.common import ConfigurationError -T = TypeVar("T") +if sys.version_info < (3, 10): + from importlib_metadata import entry_points +else: + from importlib.metadata import entry_points -# TODO: The `entrypoints` library is in maintenance mode and is not actively developed. -# We should switch to importlib.metadata once we drop support for Python 3.7. -# See https://entrypoints.readthedocs.io/en/latest/ and -# https://docs.python.org/3/library/importlib.metadata.html. +T = TypeVar("T") def _is_importable(path: str) -> bool: @@ -141,16 +140,8 @@ def register_from_entrypoint(self, entry_point_key: str) -> None: self._entrypoints.append(entry_point_key) def _load_entrypoint(self, entry_point_key: str) -> None: - entry_point: entrypoints.EntryPoint - for entry_point in entrypoints.get_group_all(entry_point_key): - name = entry_point.name - - if entry_point.object_name is None: - path = entry_point.module_name - else: - path = f"{entry_point.module_name}:{entry_point.object_name}" - - self.register_lazy(name, path) + for entry_point in entry_points(group=entry_point_key): + self.register_lazy(entry_point.name, entry_point.value) def _materialize_entrypoints(self) -> None: for entry_point_key in self._entrypoints: diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 75de18e9037eea..4acf99a50e50ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -4,6 +4,7 @@ import avro.schema +from datahub.emitter import mce_builder from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayTypeClass, BooleanTypeClass, @@ -21,7 +22,7 @@ TimeTypeClass, UnionTypeClass, ) -from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.utilities.mapping import Constants, OperationProcessor """A helper file for Avro schema -> MCE schema transformations""" @@ -98,7 +99,14 @@ class AvroToMceSchemaConverter: "uuid": StringTypeClass, } - def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None: + def __init__( + self, + is_key_schema: bool, + default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, + ) -> None: # Tracks the prefix name stack for nested name generation. self._prefix_name_stack: PrefixNameStack = [self.version_string] # Tracks the fields on the current path. @@ -112,6 +120,10 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None: if is_key_schema: # Helps maintain backwards-compatibility. Annotation for any field that is part of key-schema. self._prefix_name_stack.append("[key=True]") + # Meta mapping + self._meta_mapping_processor = meta_mapping_processor + self._schema_tags_field = schema_tags_field + self._tag_prefix = tag_prefix # Map of avro schema type to the conversion handler self._avro_type_to_mce_converter_map: Dict[ avro.schema.Schema, @@ -317,7 +329,25 @@ def emit(self) -> Generator[SchemaField, None, None]: merged_props.update(self._schema.other_props) merged_props.update(schema.other_props) - tags = None + # Parse meta_mapping + meta_aspects: Dict[str, Any] = {} + if self._converter._meta_mapping_processor: + meta_aspects = self._converter._meta_mapping_processor.process( + merged_props + ) + + tags: List[str] = [] + if self._converter._schema_tags_field: + for tag in merged_props.get(self._converter._schema_tags_field, []): + tags.append(self._converter._tag_prefix + tag) + + meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION) + if meta_tags_aspect: + tags += [ + tag_association.tag[len("urn:li:tag:") :] + for tag_association in meta_tags_aspect.tags + ] + if "deprecated" in merged_props: description = ( f"DEPRECATED: {merged_props['deprecated']}\n" @@ -325,9 +355,13 @@ def emit(self) -> Generator[SchemaField, None, None]: if description else "" ) - tags = GlobalTagsClass( - tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")] - ) + tags.append("Deprecated") + + tags_aspect = None + if tags: + tags_aspect = mce_builder.make_global_tag_aspect_with_tag_list(tags) + + meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) logical_type_name: Optional[str] = ( # logicalType nested inside type @@ -349,7 +383,8 @@ def emit(self) -> Generator[SchemaField, None, None]: recursive=False, nullable=self._converter._is_nullable(schema), isPartOfKey=self._converter._is_key_schema, - globalTags=tags, + globalTags=tags_aspect, + glossaryTerms=meta_terms_aspect, jsonProps=json.dumps(merged_props) if merged_props else None, ) yield field @@ -447,7 +482,9 @@ def _gen_from_non_field_nested_schemas( actual_schema = self._get_underlying_type_if_option_as_union(schema, schema) with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( - schema, actual_schema, self + schema, + actual_schema, + self, ) as fe_schema: if isinstance( actual_schema, @@ -478,7 +515,9 @@ def _gen_non_nested_to_mce_fields( ) -> Generator[SchemaField, None, None]: """Handles generation of MCE SchemaFields for non-nested AVRO types.""" with AvroToMceSchemaConverter.SchemaFieldEmissionContextManager( - schema, schema, self + schema, + schema, + self, ) as non_nested_emitter: yield from non_nested_emitter.emit() @@ -496,9 +535,12 @@ def _to_mce_fields( @classmethod def to_mce_fields( cls, - avro_schema_string: str, + avro_schema: avro.schema.Schema, is_key_schema: bool, default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, ) -> Generator[SchemaField, None, None]: """ Converts a key or value type AVRO schema string to appropriate MCE SchemaFields. @@ -506,8 +548,14 @@ def to_mce_fields( :param is_key_schema: True if it is a key-schema. :return: An MCE SchemaField generator. """ - avro_schema = avro.schema.parse(avro_schema_string) - converter = cls(is_key_schema, default_nullable) + # avro_schema = avro.schema.parse(avro_schema) + converter = cls( + is_key_schema, + default_nullable, + meta_mapping_processor, + schema_tags_field, + tag_prefix, + ) yield from converter._to_mce_fields(avro_schema) @@ -516,28 +564,40 @@ def to_mce_fields( def avro_schema_to_mce_fields( - avro_schema_string: str, + avro_schema: Union[avro.schema.Schema, str], is_key_schema: bool = False, default_nullable: bool = False, + meta_mapping_processor: Optional[OperationProcessor] = None, + schema_tags_field: Optional[str] = None, + tag_prefix: Optional[str] = None, swallow_exceptions: bool = True, ) -> List[SchemaField]: """ Converts an avro schema into schema fields compatible with MCE. - :param avro_schema_string: String representation of the AVRO schema. + :param avro_schema: AVRO schema, either as a string or as an avro.schema.Schema object. :param is_key_schema: True if it is a key-schema. Default is False (value-schema). :param swallow_exceptions: True if the caller wants exceptions to be suppressed + :param action_processor: Optional OperationProcessor to be used for meta mappings :return: The list of MCE compatible SchemaFields. """ try: + if isinstance(avro_schema, str): + avro_schema = avro.schema.parse(avro_schema) + return list( AvroToMceSchemaConverter.to_mce_fields( - avro_schema_string, is_key_schema, default_nullable + avro_schema, + is_key_schema, + default_nullable, + meta_mapping_processor, + schema_tags_field, + tag_prefix, ) ) except Exception: if swallow_exceptions: - logger.exception(f"Failed to parse {avro_schema_string} into mce fields.") + logger.exception(f"Failed to parse {avro_schema} into mce fields.") return [] else: raise diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 2d6882caa38ef7..661589a0c58e59 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -6,6 +6,7 @@ import pydantic +from datahub.ingestion.api.report import Report from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport @@ -16,18 +17,20 @@ logger: logging.Logger = logging.getLogger(__name__) -class BigQuerySchemaApiPerfReport: - list_projects = PerfTimer() - list_datasets = PerfTimer() - get_columns_for_dataset = PerfTimer() - get_tables_for_dataset = PerfTimer() - list_tables = PerfTimer() - get_views_for_dataset = PerfTimer() +@dataclass +class BigQuerySchemaApiPerfReport(Report): + list_projects: PerfTimer = field(default_factory=PerfTimer) + list_datasets: PerfTimer = field(default_factory=PerfTimer) + get_columns_for_dataset: PerfTimer = field(default_factory=PerfTimer) + get_tables_for_dataset: PerfTimer = field(default_factory=PerfTimer) + list_tables: PerfTimer = field(default_factory=PerfTimer) + get_views_for_dataset: PerfTimer = field(default_factory=PerfTimer) -class BigQueryAuditLogApiPerfReport: - get_exported_log_entries = PerfTimer() - list_log_entries = PerfTimer() +@dataclass +class BigQueryAuditLogApiPerfReport(Report): + get_exported_log_entries: PerfTimer = field(default_factory=PerfTimer) + list_log_entries: PerfTimer = field(default_factory=PerfTimer) @dataclass diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index 0bdcb115b377c8..54475cb509621d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -4,6 +4,7 @@ from hashlib import md5 from typing import Any, List, Optional, Set, Tuple +import avro.schema import jsonref from confluent_kafka.schema_registry.schema_registry_client import ( RegisteredSchema, @@ -22,6 +23,8 @@ SchemaField, SchemaMetadata, ) +from datahub.metadata.schema_classes import OwnershipSourceTypeClass +from datahub.utilities.mapping import OperationProcessor logger = logging.getLogger(__name__) @@ -59,6 +62,14 @@ def __init__( except Exception as e: logger.warning(f"Failed to get subjects from schema registry: {e}") + self.field_meta_processor = OperationProcessor( + self.source_config.field_meta_mapping, + self.source_config.tag_prefix, + OwnershipSourceTypeClass.SERVICE, + self.source_config.strip_user_ids_from_email, + match_nested_props=True, + ) + @classmethod def create( cls, source_config: KafkaSourceConfig, report: KafkaSourceReport @@ -290,10 +301,19 @@ def _get_schema_fields( fields: List[SchemaField] = [] if schema.schema_type == "AVRO": cleaned_str: str = self.get_schema_str_replace_confluent_ref_avro(schema) + avro_schema = avro.schema.parse(cleaned_str) + # "value.id" or "value.[type=string]id" fields = schema_util.avro_schema_to_mce_fields( - cleaned_str, is_key_schema=is_key_schema + avro_schema, + is_key_schema=is_key_schema, + meta_mapping_processor=self.field_meta_processor + if self.source_config.enable_meta_mapping + else None, + schema_tags_field=self.source_config.schema_tags_field, + tag_prefix=self.source_config.tag_prefix, ) + elif schema.schema_type == "PROTOBUF": imported_schemas: List[ ProtobufSchema diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 61f6103347eb37..566304e1999b79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -5,6 +5,7 @@ from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Type +import avro.schema import confluent_kafka import confluent_kafka.admin import pydantic @@ -18,6 +19,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataplatform_instance_urn, @@ -56,8 +58,10 @@ DataPlatformInstanceClass, DatasetPropertiesClass, KafkaSchemaClass, + OwnershipSourceTypeClass, SubTypesClass, ) +from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.registries.domain_registry import DomainRegistry logger = logging.getLogger(__name__) @@ -89,6 +93,29 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry", description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.", ) + schema_tags_field = pydantic.Field( + default="tags", + description="The field name in the schema metadata that contains the tags to be added to the dataset.", + ) + enable_meta_mapping = pydantic.Field( + default=True, + description="When enabled, applies the mappings that are defined through the meta_mapping directives.", + ) + meta_mapping: Dict = pydantic.Field( + default={}, + description="mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.", + ) + field_meta_mapping: Dict = pydantic.Field( + default={}, + description="mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.", + ) + strip_user_ids_from_email: bool = pydantic.Field( + default=False, + description="Whether or not to strip email id while adding owners using meta mappings.", + ) + tag_prefix: str = pydantic.Field( + default="", description="Prefix added to tags during ingestion." + ) ignore_warnings_on_schema_type: bool = pydantic.Field( default=False, description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.", @@ -167,6 +194,14 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): graph=self.ctx.graph, ) + self.meta_processor = OperationProcessor( + self.source_config.meta_mapping, + self.source_config.tag_prefix, + OwnershipSourceTypeClass.SERVICE, + self.source_config.strip_user_ids_from_email, + match_nested_props=True, + ) + def init_kafka_admin_client(self) -> None: try: # TODO: Do we require separate config than existing consumer_config ? @@ -227,7 +262,6 @@ def _extract_record( logger.debug(f"topic = {topic}") AVRO = "AVRO" - DOC_KEY = "doc" # 1. Create the default dataset snapshot for the topic. dataset_name = topic @@ -261,8 +295,8 @@ def _extract_record( topic, topic_detail, extra_topic_config ) - # 4. Set dataset's description as top level doc, if topic schema type is avro - description = None + # 4. Set dataset's description, tags, ownership, etc, if topic schema type is avro + description: Optional[str] = None if ( schema_metadata is not None and isinstance(schema_metadata.platformSchema, KafkaSchemaClass) @@ -271,9 +305,41 @@ def _extract_record( # Point to note: # In Kafka documentSchema and keySchema both contains "doc" field. # DataHub Dataset "description" field is mapped to documentSchema's "doc" field. - schema = json.loads(schema_metadata.platformSchema.documentSchema) - if isinstance(schema, dict): - description = schema.get(DOC_KEY) + + avro_schema = avro.schema.parse( + schema_metadata.platformSchema.documentSchema + ) + description = avro_schema.doc + # set the tags + all_tags: List[str] = [] + for tag in avro_schema.other_props.get( + self.source_config.schema_tags_field, [] + ): + all_tags.append(self.source_config.tag_prefix + tag) + + if self.source_config.enable_meta_mapping: + meta_aspects = self.meta_processor.process(avro_schema.other_props) + + meta_owners_aspects = meta_aspects.get(Constants.ADD_OWNER_OPERATION) + if meta_owners_aspects: + dataset_snapshot.aspects.append(meta_owners_aspects) + + meta_terms_aspect = meta_aspects.get(Constants.ADD_TERM_OPERATION) + if meta_terms_aspect: + dataset_snapshot.aspects.append(meta_terms_aspect) + + # Create the tags aspect + meta_tags_aspect = meta_aspects.get(Constants.ADD_TAG_OPERATION) + if meta_tags_aspect: + all_tags += [ + tag_association.tag[len("urn:li:tag:") :] + for tag_association in meta_tags_aspect.tags + ] + + if all_tags: + dataset_snapshot.aspects.append( + mce_builder.make_global_tag_aspect_with_tag_list(all_tags) + ) dataset_properties = DatasetPropertiesClass( name=topic, customProperties=custom_props, description=description diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index b3fa5e3401c078..f3344782917abf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -901,6 +901,108 @@ def _extract_lineages(self): return +@dataclass +class SnowflakeSinkConnector: + connector_manifest: ConnectorManifest + report: KafkaConnectSourceReport + + def __init__( + self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport + ) -> None: + self.connector_manifest = connector_manifest + self.report = report + self._extract_lineages() + + @dataclass + class SnowflakeParser: + database_name: str + schema_name: str + topics_to_tables: Dict[str, str] + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason) + + def get_table_name_from_topic_name(self, topic_name: str) -> str: + """ + This function converts the topic name to a valid Snowflake table name using some rules. + Refer below link for more info + https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics + """ + table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name) + if re.match("^[^a-zA-Z_].*", table_name): + table_name = "_" + table_name + # Connector may append original topic's hash code as suffix for conflict resolution + # if generated table names for 2 topics are similar. This corner case is not handled here. + # Note that Snowflake recommends to choose topic names that follow the rules for + # Snowflake identifier names so this case is not recommended by snowflake. + return table_name + + def get_parser( + self, + connector_manifest: ConnectorManifest, + ) -> SnowflakeParser: + database_name = connector_manifest.config["snowflake.database.name"] + schema_name = connector_manifest.config["snowflake.schema.name"] + + # Fetch user provided topic to table map + provided_topics_to_tables: Dict[str, str] = {} + if connector_manifest.config.get("snowflake.topic2table.map"): + for each in connector_manifest.config["snowflake.topic2table.map"].split( + "," + ): + topic, table = each.split(":") + provided_topics_to_tables[topic.strip()] = table.strip() + + topics_to_tables: Dict[str, str] = {} + # Extract lineage for only those topics whose data ingestion started + for topic in connector_manifest.topic_names: + if topic in provided_topics_to_tables: + # If user provided which table to get mapped with this topic + topics_to_tables[topic] = provided_topics_to_tables[topic] + else: + # Else connector converts topic name to a valid Snowflake table name. + topics_to_tables[topic] = self.get_table_name_from_topic_name(topic) + + return self.SnowflakeParser( + database_name=database_name, + schema_name=schema_name, + topics_to_tables=topics_to_tables, + ) + + def _extract_lineages(self): + self.connector_manifest.flow_property_bag = self.connector_manifest.config + + # For all snowflake sink connector properties, refer below link + # https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector + # remove private keys, secrets from properties + secret_properties = [ + "snowflake.private.key", + "snowflake.private.key.passphrase", + "value.converter.basic.auth.user.info", + ] + for k in secret_properties: + if k in self.connector_manifest.flow_property_bag: + del self.connector_manifest.flow_property_bag[k] + + lineages: List[KafkaConnectLineage] = list() + parser = self.get_parser(self.connector_manifest) + + for topic, table in parser.topics_to_tables.items(): + target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}" + lineages.append( + KafkaConnectLineage( + source_dataset=topic, + source_platform=KAFKA, + target_dataset=target_dataset, + target_platform="snowflake", + ) + ) + + self.connector_manifest.lineages = lineages + return + + @dataclass class ConfluentS3SinkConnector: connector_manifest: ConnectorManifest @@ -1130,6 +1232,12 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: connector_manifest = ConfluentS3SinkConnector( connector_manifest=connector_manifest, report=self.report ).connector_manifest + elif connector_manifest.config.get("connector.class").__eq__( + "com.snowflake.kafka.connector.SnowflakeSinkConnector" + ): + connector_manifest = SnowflakeSinkConnector( + connector_manifest=connector_manifest, report=self.report + ).connector_manifest else: self.report.report_dropped(connector_manifest.name) logger.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ab5d3a4e007ac0..ac4433b7eb1f0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -7,6 +7,7 @@ import time from collections import OrderedDict from datetime import datetime +from pathlib import PurePath from typing import Any, Dict, Iterable, List, Optional, Tuple from more_itertools import peekable @@ -819,7 +820,10 @@ def local_browser(self, path_spec: PathSpec) -> Iterable[Tuple[str, datetime, in dirs.sort(key=functools.cmp_to_key(partitioned_folder_comparator)) for file in sorted(files): - full_path = os.path.join(root, file) + # We need to make sure the path is in posix style which is not true on windows + full_path = PurePath( + os.path.normpath(os.path.join(root, file)) + ).as_posix() yield full_path, datetime.utcfromtimestamp( os.path.getmtime(full_path) ), os.path.getsize(full_path) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 0bc8bb17934f77..95f64443844088 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -51,15 +51,17 @@ class DatabaseId: database: str = Field( description="Database created from share in consumer account." ) - platform_instance: str = Field( - description="Platform instance of consumer snowflake account." + platform_instance: Optional[str] = Field( + default=None, + description="Platform instance of consumer snowflake account.", ) class SnowflakeShareConfig(ConfigModel): database: str = Field(description="Database from which share is created.") - platform_instance: str = Field( - description="Platform instance for snowflake account in which share is created." + platform_instance: Optional[str] = Field( + default=None, + description="Platform instance for snowflake account in which share is created.", ) consumers: Set[DatabaseId] = Field( @@ -247,10 +249,11 @@ def validate_shares( if shares: # Check: platform_instance should be present - assert current_platform_instance is not None, ( - "Did you forget to set `platform_instance` for current ingestion ? " - "It is required to use `platform_instance` when ingesting from multiple snowflake accounts." - ) + if current_platform_instance is None: + logger.info( + "It is advisable to use `platform_instance` when ingesting from multiple snowflake accounts, if they contain databases with same name. " + "Setting `platform_instance` allows distinguishing such databases without conflict and correctly ingest their metadata." + ) databases_included_in_share: List[DatabaseId] = [] databases_created_from_share: List[DatabaseId] = [] @@ -259,10 +262,11 @@ def validate_shares( shared_db = DatabaseId( share_details.database, share_details.platform_instance ) - assert all( - consumer.platform_instance != share_details.platform_instance - for consumer in share_details.consumers - ), "Share's platform_instance can not be same as consumer's platform instance. Self-sharing not supported in Snowflake." + if current_platform_instance: + assert all( + consumer.platform_instance != share_details.platform_instance + for consumer in share_details.consumers + ), "Share's platform_instance can not be same as consumer's platform instance. Self-sharing not supported in Snowflake." databases_included_in_share.append(shared_db) databases_created_from_share.extend(share_details.consumers) @@ -306,7 +310,11 @@ def inbounds(self) -> Dict[str, DatabaseId]: f"database {consumer.database} is created from inbound share {share_name}." ) inbounds[consumer.database] = share_details.source_database - break + if self.platform_instance: + break + # If not using platform_instance, any one of consumer databases + # can be the database from this instance. so we include all relevant + # databases in inbounds. else: logger.info( f"Skipping Share {share_name}, as it does not include current platform instance {self.platform_instance}", diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py index 6f7520bbf1988a..dad0ce7b59ee1d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py @@ -93,11 +93,15 @@ def report_missing_databases( db_names = [db.name for db in databases] missing_dbs = [db for db in inbounds + outbounds if db not in db_names] - if missing_dbs: + if missing_dbs and self.config.platform_instance: self.report_warning( "snowflake-shares", f"Databases {missing_dbs} were not ingested. Siblings/Lineage will not be set for these.", ) + elif missing_dbs: + logger.debug( + f"Databases {missing_dbs} were not ingested in this recipe.", + ) def gen_siblings( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index b5458a42192fc8..112defe76d9571 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -367,12 +367,12 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) ) def warn(self, log: logging.Logger, key: str, reason: str) -> None: - self.report.report_warning(key, reason) + self.report.report_warning(key, reason[:100]) log.warning(f"{key} => {reason}") def error(self, log: logging.Logger, key: str, reason: str) -> None: - self.report.report_failure(key, reason) - log.error(f"{key} => {reason}") + self.report.report_failure(key, reason[:100]) + log.error(f"{key} => {reason}\n{traceback.format_exc()}") def get_inspectors(self) -> Iterable[Inspector]: # This method can be overridden in the case that you want to dynamically @@ -528,10 +528,8 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit try: self.add_profile_metadata(inspector) except Exception as e: - logger.warning( - "Failed to get enrichment data for profiler", exc_info=True - ) - self.report.report_warning( + self.warn( + logger, "profile_metadata", f"Failed to get enrichment data for profile {e}", ) @@ -638,14 +636,9 @@ def loop_tables( # noqa: C901 dataset_name, inspector, schema, table, sql_config ) except Exception as e: - logger.warning( - f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}" - ) - self.report.report_warning( - f"{schema}.{table}", f"Ingestion error: {e}" - ) + self.warn(logger, f"{schema}.{table}", f"Ingestion error: {e}") except Exception as e: - self.report.report_failure(f"{schema}", f"Tables error: {e}") + self.error(logger, f"{schema}", f"Tables error: {e}") def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: pass @@ -806,9 +799,10 @@ def _get_columns( try: columns = inspector.get_columns(table, schema) if len(columns) == 0: - self.report.report_warning(MISSING_COLUMN_INFO, dataset_name) + self.warn(logger, MISSING_COLUMN_INFO, dataset_name) except Exception as e: - self.report.report_warning( + self.warn( + logger, dataset_name, f"unable to get column information due to an error -> {e}", ) @@ -903,14 +897,9 @@ def loop_views( sql_config=sql_config, ) except Exception as e: - logger.warning( - f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}" - ) - self.report.report_warning( - f"{schema}.{view}", f"Ingestion error: {e}" - ) + self.warn(logger, f"{schema}.{view}", f"Ingestion error: {e}") except Exception as e: - self.report.report_failure(f"{schema}", f"Views error: {e}") + self.error(logger, f"{schema}", f"Views error: {e}") def _process_view( self, @@ -924,9 +913,7 @@ def _process_view( columns = inspector.get_columns(view, schema) except KeyError: # For certain types of views, we are unable to fetch the list of columns. - self.report.report_warning( - dataset_name, "unable to get schema for this view" - ) + self.warn(logger, dataset_name, "unable to get schema for this view") schema_metadata = None else: schema_fields = self.get_schema_fields(dataset_name, columns) @@ -1112,7 +1099,8 @@ def loop_profiler_requests( if partition is None and self.is_table_partitioned( database=None, schema=schema, table=table ): - self.report.report_warning( + self.warn( + logger, "profile skipped as partitioned table is empty or partition id was invalid", dataset_name, ) diff --git a/metadata-ingestion/src/datahub/utilities/_custom_package_loader.py b/metadata-ingestion/src/datahub/utilities/_custom_package_loader.py new file mode 100644 index 00000000000000..1b66258557406d --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/_custom_package_loader.py @@ -0,0 +1,43 @@ +import sys +from typing import List, Optional + +if sys.version_info < (3, 10): + from importlib_metadata import EntryPoint, entry_points +else: + from importlib.metadata import EntryPoint, entry_points + + +_CUSTOM_PACKAGE_GROUP_KEY = "datahub.custom_packages" + +_MODELS_KEY = "models" + + +class CustomPackageException(Exception): + pass + + +def _get_all_registered_custom_packages() -> List[EntryPoint]: + return list(entry_points(group=_CUSTOM_PACKAGE_GROUP_KEY)) + + +def _get_custom_package_for_name(name: str) -> Optional[str]: + entrypoints = [ + ep for ep in _get_all_registered_custom_packages() if ep.name == name + ] + + if not entrypoints: + return None + + if len(entrypoints) > 1: + all_package_options = [ + entrypoint.dist.name for entrypoint in entrypoints if entrypoint.dist + ] + raise CustomPackageException( + f"Multiple custom packages registered for {name}: cannot pick between {all_package_options}" + ) + + return entrypoints[0].value + + +def get_custom_models_package() -> Optional[str]: + return _get_custom_package_for_name(_MODELS_KEY) diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index 8865254e885795..4fcef990ae4f43 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -269,7 +269,7 @@ def get_schema_fields_for_hive_column( hive_column_name=hive_column_name, hive_column_type=hive_column_type ) schema_fields = avro_schema_to_mce_fields( - avro_schema_string=json.dumps(avro_schema_json), + avro_schema=json.dumps(avro_schema_json), default_nullable=default_nullable, swallow_exceptions=False, ) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 32666ceecdf85c..793eccfb22c7e8 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -1,6 +1,8 @@ import contextlib import logging +import operator import re +from functools import reduce from typing import Any, Dict, List, Match, Optional, Union from datahub.emitter import mce_builder @@ -94,11 +96,13 @@ def __init__( tag_prefix: str = "", owner_source_type: Optional[str] = None, strip_owner_email_id: bool = False, + match_nested_props: bool = False, ): self.operation_defs = operation_defs self.tag_prefix = tag_prefix self.strip_owner_email_id = strip_owner_email_id self.owner_source_type = owner_source_type + self.match_nested_props = match_nested_props def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: # Defining the following local variables - @@ -121,9 +125,18 @@ def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]: ) if not operation_type or not operation_config: continue + raw_props_value = raw_props.get(operation_key) + if not raw_props_value and self.match_nested_props: + try: + raw_props_value = reduce( + operator.getitem, operation_key.split("."), raw_props + ) + except KeyError: + pass + maybe_match = self.get_match( self.operation_defs[operation_key][Constants.MATCH], - raw_props.get(operation_key), + raw_props_value, ) if maybe_match is not None: operation = self.get_operation_value( diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json new file mode 100644 index 00000000000000..76d49cebe5ae30 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_snowflake_sink_mces_golden.json @@ -0,0 +1,152 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "snowflake.database.name": "kafka_db", + "snowflake.schema.name": "kafka_schema", + "snowflake.topic2table.map": "topic1:table1", + "tasks.max": "1", + "topics": "topic1,_topic+2", + "snowflake.user.name": "kafka_connector_user_1", + "name": "snowflake_sink1", + "snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443" + }, + "name": "snowflake_sink1", + "description": "Sink connector using `com.snowflake.kafka.connector.SnowflakeSinkConnector` plugin." + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake_sink1:topic1", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema.table1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake_sink1:_topic+2", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,_topic+2,PROD)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema._topic_2,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-test" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 5f907bb05443c9..48063908e624f2 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -534,3 +534,103 @@ def test_kafka_connect_ingest_stateful( "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)", ] assert sorted(deleted_job_urns) == sorted(difference_job_urns) + + +def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: + api_vs_response = { + "http://localhost:28083": { + "method": "GET", + "status_code": 200, + "json": { + "version": "7.4.0-ccs", + "commit": "30969fa33c185e880b9e02044761dfaac013151d", + "kafka_cluster_id": "MDgRZlZhSZ-4fXhwRR79bw", + }, + }, + } + + api_vs_response.update(override_data) + + for url in api_vs_response.keys(): + request_mock.register_uri( + api_vs_response[url]["method"], + url, + json=api_vs_response[url]["json"], + status_code=api_vs_response[url]["status_code"], + ) + + +@freeze_time(FROZEN_TIME) +def test_kafka_connect_snowflake_sink_ingest( + pytestconfig, tmp_path, mock_time, requests_mock +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect" + override_data = { + "http://localhost:28083/connectors": { + "method": "GET", + "status_code": 200, + "json": ["snowflake_sink1"], + }, + "http://localhost:28083/connectors/snowflake_sink1": { + "method": "GET", + "status_code": 200, + "json": { + "name": "snowflake_sink1", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "snowflake.database.name": "kafka_db", + "snowflake.schema.name": "kafka_schema", + "snowflake.topic2table.map": "topic1:table1", + "tasks.max": "1", + "topics": "topic1,_topic+2", + "snowflake.user.name": "kafka_connector_user_1", + "snowflake.private.key": "rrSnqU=", + "name": "snowflake_sink1", + "snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443", + }, + "tasks": [{"connector": "snowflake_sink1", "task": 0}], + "type": "sink", + }, + }, + "http://localhost:28083/connectors/snowflake_sink1/topics": { + "method": "GET", + "status_code": 200, + "json": {"snowflake_sink1": {"topics": ["topic1", "_topic+2"]}}, + }, + } + + register_mock_api(request_mock=requests_mock, override_data=override_data) + + pipeline = Pipeline.create( + { + "run_id": "kafka-connect-test", + "source": { + "type": "kafka-connect", + "config": { + "platform_instance": "connect-instance-1", + "connect_uri": KAFKA_CONNECT_SERVER, + "connector_patterns": { + "allow": [ + "snowflake_sink1", + ] + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/kafka_connect_snowflake_sink_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file = "kafka_connect_snowflake_sink_mces_golden.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json", + golden_path=f"{test_resources_dir}/{golden_file}", + ) diff --git a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json index e51eaa10b8b10c..7dd328168e84c0 100644 --- a/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka/kafka_mces_golden.json @@ -86,7 +86,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -103,7 +104,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -118,7 +120,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -144,10 +147,10 @@ "time": 0, "actor": "urn:li:corpuser:unknown" }, - "hash": "cc452cf58242cdb9d09cf33d657497d8", + "hash": "a79a2fe3adab60b21d272a9cc3e93595", "platformSchema": { "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}", + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", "documentSchemaType": "AVRO", "keySchema": "{\"type\":\"record\",\"name\":\"UserKey\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Key schema for kafka topic\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"}]}", "keySchemaType": "AVRO" @@ -188,7 +191,15 @@ }, "nativeDataType": "email", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", @@ -200,7 +211,15 @@ }, "nativeDataType": "firstName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", @@ -212,7 +231,15 @@ }, "nativeDataType": "lastName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" } ] } @@ -224,6 +251,15 @@ ] } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { @@ -246,7 +282,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -263,7 +300,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -280,7 +318,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -295,7 +334,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -321,10 +361,10 @@ "time": 0, "actor": "urn:li:corpuser:unknown" }, - "hash": "dc1cf32c2688cc3d2d27fe6e856f06d2", + "hash": "62c7c400ec5760797a59c45e59c2f2dc", "platformSchema": { "com.linkedin.pegasus2avro.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"}]}", + "documentSchema": "{\"type\":\"record\",\"name\":\"CreateUserRequest\",\"namespace\":\"io.codebrews.createuserrequest\",\"doc\":\"Value schema for kafka topic\",\"fields\":[{\"name\":\"email\",\"type\":\"string\",\"tags\":[\"Email\"]},{\"name\":\"firstName\",\"type\":\"string\",\"tags\":[\"Name\"]},{\"name\":\"lastName\",\"type\":\"string\",\"tags\":[\"Name\"]}],\"tags\":[\"PII\"]}", "documentSchemaType": "AVRO", "keySchema": "\"string\"", "keySchemaType": "AVRO" @@ -353,7 +393,15 @@ }, "nativeDataType": "email", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Email" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Email\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].firstName", @@ -365,7 +413,15 @@ }, "nativeDataType": "firstName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" }, { "fieldPath": "[version=2.0].[type=CreateUserRequest].[type=string].lastName", @@ -377,7 +433,15 @@ }, "nativeDataType": "lastName", "recursive": false, - "isPartOfKey": false + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Name" + } + ] + }, + "isPartOfKey": false, + "jsonProps": "{\"tags\": [\"Name\"]}" } ] } @@ -389,6 +453,15 @@ ] } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:PII" + } + ] + } + }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { "customProperties": { @@ -411,7 +484,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -428,7 +502,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } }, { @@ -443,7 +518,56 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "kafka-test" + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Email", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Email" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Name", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Name" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:PII", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "PII" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "kafka-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka/value_schema.avsc b/metadata-ingestion/tests/integration/kafka/value_schema.avsc index 788cb94c47a72b..8cb6c42cb03f45 100644 --- a/metadata-ingestion/tests/integration/kafka/value_schema.avsc +++ b/metadata-ingestion/tests/integration/kafka/value_schema.avsc @@ -3,18 +3,22 @@ "type": "record", "name": "CreateUserRequest", "doc": "Value schema for kafka topic", + "tags": ["PII"], "fields": [ { "name": "email", - "type": "string" + "type": "string", + "tags": ["Email"] }, { "name": "firstName", - "type": "string" + "type": "string", + "tags": ["Name"] }, { "name": "lastName", - "type": "string" + "type": "string", + "tags": ["Name"] } ] } diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index b48ebf12ee37a1..603068780d0a7b 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -1,3 +1,4 @@ +import json from itertools import chain from typing import Dict, Optional, Tuple from unittest.mock import MagicMock, patch @@ -7,11 +8,17 @@ RegisteredSchema, Schema, ) +from freezegun import freeze_time from datahub.emitter.mce_builder import ( + OwnerType, make_dataplatform_instance_urn, make_dataset_urn, make_dataset_urn_with_platform_instance, + make_global_tag_aspect_with_tag_list, + make_glossary_terms_aspect_from_urn_list, + make_owner_urn, + make_ownership_aspect_from_urn_list, ) from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -20,7 +27,10 @@ from datahub.metadata.schema_classes import ( BrowsePathsClass, DataPlatformInstanceClass, + GlobalTagsClass, + GlossaryTermsClass, KafkaSchemaClass, + OwnershipClass, SchemaMetadataClass, ) @@ -521,3 +531,148 @@ def test_kafka_source_succeeds_with_describe_configs_error( mock_admin_client_instance.describe_configs.assert_called_once() assert len(workunits) == 2 + + +@freeze_time("2023-09-20 10:00:00") +@patch( + "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", + autospec=True, +) +@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) +def test_kafka_source_topic_meta_mappings( + mock_kafka_consumer, mock_schema_registry_client, mock_admin_client +): + # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies. + # ,) + topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = { + "topic1": ( + RegisteredSchema( + schema_id="schema_id_2", + schema=Schema( + schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}', + schema_type="AVRO", + ), + subject="topic1-key", + version=1, + ), + RegisteredSchema( + schema_id="schema_id_1", + schema=Schema( + schema_str=json.dumps( + { + "type": "record", + "name": "Topic1Value", + "namespace": "test.acryl", + "fields": [{"name": "t1value", "type": "string"}], + "owner": "@charles", + "business_owner": "jdoe.last@gmail.com", + "data_governance.team_owner": "Finance", + "has_pii": True, + "int_property": 1, + "double_property": 2.5, + } + ), + schema_type="AVRO", + ), + subject="topic1-value", + version=1, + ), + ) + } + + # Mock the kafka consumer + mock_kafka_instance = mock_kafka_consumer.return_value + mock_cluster_metadata = MagicMock() + mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map.keys()} + mock_kafka_instance.list_topics.return_value = mock_cluster_metadata + + # Mock the schema registry client + # - mock get_subjects: all subjects in topic_subject_schema_map + mock_schema_registry_client.return_value.get_subjects.return_value = [ + v.subject for v in chain(*topic_subject_schema_map.values()) + ] + + # - mock get_latest_version + def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: + for registered_schema in chain(*topic_subject_schema_map.values()): + if registered_schema.subject == subject_name: + return registered_schema + return None + + mock_schema_registry_client.return_value.get_latest_version = ( + mock_get_latest_version + ) + + ctx = PipelineContext(run_id="test1") + kafka_source = KafkaSource.create( + { + "connection": {"bootstrap": "localhost:9092"}, + "meta_mapping": { + "owner": { + "match": "^@(.*)", + "operation": "add_owner", + "config": {"owner_type": "user"}, + }, + "business_owner": { + "match": ".*", + "operation": "add_owner", + "config": {"owner_type": "user"}, + }, + "has_pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "has_pii_test"}, + }, + "int_property": { + "match": 1, + "operation": "add_tag", + "config": {"tag": "int_meta_property"}, + }, + "double_property": { + "match": 2.5, + "operation": "add_term", + "config": {"term": "double_meta_property"}, + }, + "data_governance.team_owner": { + "match": "Finance", + "operation": "add_term", + "config": {"term": "Finance_test"}, + }, + }, + }, + ctx, + ) + workunits = [w for w in kafka_source.get_workunits()] + assert len(workunits) == 4 + mce = workunits[0].metadata + assert isinstance(mce, MetadataChangeEvent) + + ownership_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) + ][0] + assert ownership_aspect == make_ownership_aspect_from_urn_list( + [ + make_owner_urn("charles", OwnerType.USER), + make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), + ], + "SERVICE", + ) + + tags_aspect = [ + asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) + ][0] + assert tags_aspect == make_global_tag_aspect_with_tag_list( + ["has_pii_test", "int_meta_property"] + ) + + terms_aspect = [ + asp + for asp in mce.proposedSnapshot.aspects + if isinstance(asp, GlossaryTermsClass) + ][0] + assert terms_aspect == make_glossary_terms_aspect_from_urn_list( + [ + "urn:li:glossaryTerm:Finance_test", + "urn:li:glossaryTerm:double_meta_property", + ] + ) diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py index aea1d8ddd9a548..d69dd4a8a96b0d 100644 --- a/metadata-ingestion/tests/unit/test_mapping.py +++ b/metadata-ingestion/tests/unit/test_mapping.py @@ -231,3 +231,51 @@ def test_operation_processor_advanced_matching_tags(): tag_aspect: GlobalTagsClass = aspect_map["add_tag"] assert len(tag_aspect.tags) == 1 assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567" + + +def test_operation_processor_matching_nested_props(): + raw_props = { + "gdpr": { + "pii": True, + }, + } + processor = OperationProcessor( + operation_defs={ + "gdpr.pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "pii"}, + }, + }, + owner_source_type="SOURCE_CONTROL", + match_nested_props=True, + ) + aspect_map = processor.process(raw_props) + assert "add_tag" in aspect_map + + tag_aspect: GlobalTagsClass = aspect_map["add_tag"] + assert len(tag_aspect.tags) == 1 + assert tag_aspect.tags[0].tag == "urn:li:tag:pii" + + +def test_operation_processor_matching_dot_props(): + raw_props = { + "gdpr.pii": True, + } + processor = OperationProcessor( + operation_defs={ + "gdpr.pii": { + "match": True, + "operation": "add_tag", + "config": {"tag": "pii"}, + }, + }, + owner_source_type="SOURCE_CONTROL", + match_nested_props=True, + ) + aspect_map = processor.process(raw_props) + assert "add_tag" in aspect_map + + tag_aspect: GlobalTagsClass = aspect_map["add_tag"] + assert len(tag_aspect.tags) == 1 + assert tag_aspect.tags[0].tag == "urn:li:tag:pii" diff --git a/metadata-ingestion/tests/unit/test_schema_util.py b/metadata-ingestion/tests/unit/test_schema_util.py index e81c335e178a2c..0a111d700cf8ce 100644 --- a/metadata-ingestion/tests/unit/test_schema_util.py +++ b/metadata-ingestion/tests/unit/test_schema_util.py @@ -6,7 +6,12 @@ from typing import Dict, List, Type import pytest +from freezegun import freeze_time +from datahub.emitter.mce_builder import ( + make_global_tag_aspect_with_tag_list, + make_glossary_terms_aspect_from_urn_list, +) from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import ( DateTypeClass, @@ -15,6 +20,7 @@ StringTypeClass, TimeTypeClass, ) +from datahub.utilities.mapping import OperationProcessor logger = logging.getLogger(__name__) @@ -771,3 +777,106 @@ def test_ignore_exceptions(): """ fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema) assert not fields + + +@freeze_time("2023-09-12") +def test_avro_schema_to_mce_fields_with_field_meta_mapping(): + schema = """ +{ + "type": "record", + "name": "Payment", + "namespace": "some.event.namespace", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"}, + {"name": "name","type": "string","default": "","has_pii": "True"}, + {"name": "phoneNumber", + "type": [{ + "type": "record", + "name": "PhoneNumber", + "doc": "testDoc", + "fields": [{ + "name": "areaCode", + "type": "string", + "doc": "areaCodeDoc", + "default": "" + }, { + "name": "countryCode", + "type": "string", + "default": "" + }, { + "name": "prefix", + "type": "string", + "default": "" + }, { + "name": "number", + "type": "string", + "default": "" + }] + }, + "null" + ], + "default": "null", + "has_pii": "True", + "glossary_field": "TERM_PhoneNumber" + }, + {"name": "address", + "type": [{ + "type": "record", + "name": "Address", + "fields": [{ + "name": "street", + "type": "string", + "default": "" + }] + }, + "null" + ], + "doc": "addressDoc", + "default": "null", + "has_pii": "True", + "glossary_field": "TERM_Address" + } + ] +} +""" + processor = OperationProcessor( + operation_defs={ + "has_pii": { + "match": "True", + "operation": "add_tag", + "config": {"tag": "has_pii_test"}, + }, + "glossary_field": { + "match": "TERM_(.*)", + "operation": "add_term", + "config": {"term": "{{ $match }}"}, + }, + } + ) + fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor) + expected_field_paths = [ + "[version=2.0].[type=Payment].[type=string].id", + "[version=2.0].[type=Payment].[type=double].amount", + "[version=2.0].[type=Payment].[type=string].name", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix", + "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number", + "[version=2.0].[type=Payment].[type=Address].address", + "[version=2.0].[type=Payment].[type=Address].address.[type=string].street", + ] + assert_field_paths_match(fields, expected_field_paths) + + pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"]) + assert fields[1].globalTags is None + assert fields[2].globalTags == pii_tag_aspect + assert fields[3].globalTags == pii_tag_aspect + assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list( + ["urn:li:glossaryTerm:PhoneNumber"] + ) + assert fields[8].globalTags == pii_tag_aspect + assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list( + ["urn:li:glossaryTerm:Address"] + ) diff --git a/metadata-ingestion/tests/unit/test_snowflake_shares.py b/metadata-ingestion/tests/unit/test_snowflake_shares.py index 7de86139baf39a..9e33ba6132e069 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_shares.py +++ b/metadata-ingestion/tests/unit/test_snowflake_shares.py @@ -231,6 +231,7 @@ def test_snowflake_shares_workunit_inbound_share( else: siblings_aspect = wu.get_aspect_of_type(Siblings) assert siblings_aspect is not None + assert not siblings_aspect.primary assert len(siblings_aspect.siblings) == 1 assert siblings_aspect.siblings == [ wu.get_urn().replace("instance1.db1", "instance2.db1") @@ -275,6 +276,7 @@ def test_snowflake_shares_workunit_outbound_share( for wu in wus: siblings_aspect = wu.get_aspect_of_type(Siblings) assert siblings_aspect is not None + assert siblings_aspect.primary assert len(siblings_aspect.siblings) == 2 assert siblings_aspect.siblings == [ wu.get_urn().replace("instance1.db2", "instance2.db2_from_share"), @@ -336,13 +338,85 @@ def test_snowflake_shares_workunit_inbound_and_outbound_share( siblings_aspect = wu.get_aspect_of_type(Siblings) assert siblings_aspect is not None if "db1" in wu.get_urn(): + assert not siblings_aspect.primary assert len(siblings_aspect.siblings) == 1 assert siblings_aspect.siblings == [ wu.get_urn().replace("instance1.db1", "instance2.db1") ] else: + assert siblings_aspect.primary assert len(siblings_aspect.siblings) == 2 assert siblings_aspect.siblings == [ wu.get_urn().replace("instance1.db2", "instance2.db2_from_share"), wu.get_urn().replace("instance1.db2", "instance3.db2"), ] + + +def test_snowflake_shares_workunit_inbound_and_outbound_share_no_platform_instance( + snowflake_databases: List[SnowflakeDatabase], +) -> None: + config = SnowflakeV2Config( + account_id="abc12345", + shares={ + "share1": SnowflakeShareConfig( + database="db1", + consumers=[ + DatabaseId(database="db1_from_share"), + DatabaseId(database="db1_other"), + ], + ), + "share2": SnowflakeShareConfig( + database="db2_main", + consumers=[ + DatabaseId(database="db2"), + DatabaseId(database="db2_other"), + ], + ), + }, + ) + + report = SnowflakeV2Report() + shares_handler = SnowflakeSharesHandler( + config, report, lambda x: make_snowflake_urn(x) + ) + + assert sorted(config.outbounds().keys()) == ["db1", "db2_main"] + assert sorted(config.inbounds().keys()) == [ + "db1_from_share", + "db1_other", + "db2", + "db2_other", + ] + wus = list(shares_handler.get_shares_workunits(snowflake_databases)) + + # 6 Sibling aspects for db1 tables + # 6 Sibling aspects and and 6 upstreamLineage for db2 tables + assert len(wus) == 18 + + for wu in wus: + assert isinstance( + wu.metadata, (MetadataChangeProposal, MetadataChangeProposalWrapper) + ) + if wu.metadata.aspectName == "upstreamLineage": + upstream_aspect = wu.get_aspect_of_type(UpstreamLineage) + assert upstream_aspect is not None + assert len(upstream_aspect.upstreams) == 1 + assert upstream_aspect.upstreams[0].dataset == wu.get_urn().replace( + "db2.", "db2_main." + ) + else: + siblings_aspect = wu.get_aspect_of_type(Siblings) + assert siblings_aspect is not None + if "db1" in wu.get_urn(): + assert siblings_aspect.primary + assert len(siblings_aspect.siblings) == 2 + assert siblings_aspect.siblings == [ + wu.get_urn().replace("db1.", "db1_from_share."), + wu.get_urn().replace("db1.", "db1_other."), + ] + else: + assert not siblings_aspect.primary + assert len(siblings_aspect.siblings) == 1 + assert siblings_aspect.siblings == [ + wu.get_urn().replace("db2.", "db2_main.") + ]