diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 97a0da8546ed17..158d3416bc2a9d 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -76,6 +76,7 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml + !**/binary/** - name: Upload coverage to Codecov if: always() uses: codecov/codecov-action@v3 diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 6daf1904ba3ae3..64493e99211b46 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -29,11 +29,15 @@ jobs: "except_metadata_ingestion", "frontend", ] - timezone: ["UTC", "America/New_York"] + timezone: ["UTC"] + include: + # We only need the timezone variation for frontend tests. + - command: "frontend" + timezone: "America/New_York" runs-on: ubuntu-latest timeout-minutes: 60 steps: - - uses: szenius/set-timezone@v1.0 + - uses: szenius/set-timezone@v1.2 with: timezoneLinux: ${{ matrix.timezone }} - uses: hsheth2/sane-checkout-action@v1 @@ -48,8 +52,7 @@ jobs: python-version: "3.10" cache: pip - name: Gradle build (and test) for metadata ingestion - # we only need the timezone runs for frontend tests - if: ${{ matrix.command == 'except_metadata_ingestion' && matrix.timezone == 'America/New_York' }} + if: ${{ matrix.command == 'except_metadata_ingestion' }} run: | ./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test -x :metadata-io:test -x :metadata-ingestion-modules:airflow-plugin:build -x :metadata-ingestion-modules:airflow-plugin:check -x :datahub-frontend:build -x :datahub-web-react:build --parallel - name: Gradle build (and test) for frontend @@ -66,15 +69,9 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml + !**/binary/** - name: Ensure codegen is updated uses: ./.github/actions/ensure-codegen-updated - - name: Slack failure notification - if: failure() && github.event_name == 'push' - uses: kpritam/slack-job-status-action@v1 - with: - job-status: ${{ job.status }} - slack-bot-token: ${{ secrets.SLACK_BOT_TOKEN }} - channel: github-activities quickstart-compose-validation: runs-on: ubuntu-latest @@ -83,10 +80,6 @@ jobs: - uses: actions/setup-python@v4 with: python-version: "3.10" - - name: Download YQ - uses: chrisdickinson/setup-yq@v1.0.1 - with: - yq-version: v4.28.2 - name: Quickstart Compose Validation run: ./docker/quickstart/generate_and_compare.sh diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 41f82c194345c0..493484ecd545e7 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -911,13 +911,7 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml - - name: Slack failure notification - if: failure() && github.event_name == 'push' - uses: kpritam/slack-job-status-action@v1 - with: - job-status: ${{ job.status }} - slack-bot-token: ${{ secrets.SLACK_BOT_TOKEN }} - channel: github-activities + !**/binary/** deploy_datahub_head: name: Deploy to Datahub HEAD runs-on: ubuntu-latest diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index af73db483f9aeb..1da08b14b8b5b2 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -79,6 +79,7 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml + !**/binary/** - name: Upload coverage to Codecov if: ${{ always() && matrix.python-version == '3.10' }} uses: codecov/codecov-action@v3 diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index c964352c3e129a..eb5822b5b480d0 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -50,6 +50,7 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml + !**/binary/** - name: Ensure codegen is updated uses: ./.github/actions/ensure-codegen-updated diff --git a/.github/workflows/pr-labeler.yml b/.github/workflows/pr-labeler.yml index c0b2bf807b24bc..5a6978dfde19dc 100644 --- a/.github/workflows/pr-labeler.yml +++ b/.github/workflows/pr-labeler.yml @@ -19,6 +19,38 @@ jobs: if: ${{ !contains( + fromJson('[ + "anshbansal", + "asikowitz", + "chriscollins3456", + "david-leifker", + "shirshanka", + "sid-acryl", + "swaroopjagadish", + "treff7es", + "yoonhyejin", + "eboneil", + "ethan-cartwright", + "gabe-lyons", + "hsheth2", + "jjoyce0510", + "maggiehays", + "mrjefflewis", + "pedro93", + "RyanHolstien" + ]'), + github.actor + ) + }} + with: + github_token: ${{ github.token }} + labels: | + community-contribution + - uses: actions-ecosystem/action-add-labels@v1.1.0 + # only add names of champions here. Confirm with DevRel Team + if: + ${{ + contains( fromJson('[ "skrydal", "siladitya2", diff --git a/.github/workflows/spark-smoke-test.yml b/.github/workflows/spark-smoke-test.yml index ee7ead27f29659..e463e15243ee3e 100644 --- a/.github/workflows/spark-smoke-test.yml +++ b/.github/workflows/spark-smoke-test.yml @@ -68,10 +68,4 @@ jobs: **/build/reports/tests/test/** **/build/test-results/test/** **/junit.*.xml - - name: Slack failure notification - if: failure() && github.event_name == 'push' - uses: kpritam/slack-job-status-action@v1 - with: - job-status: ${{ job.status }} - slack-bot-token: ${{ secrets.SLACK_BOT_TOKEN }} - channel: github-activities + !**/binary/** diff --git a/datahub-web-react/src/app/entity/shared/components/styled/search/DownloadAsCsvModal.tsx b/datahub-web-react/src/app/entity/shared/components/styled/search/DownloadAsCsvModal.tsx index 92e859ee1b3299..c4b5f8fa02b2b8 100644 --- a/datahub-web-react/src/app/entity/shared/components/styled/search/DownloadAsCsvModal.tsx +++ b/datahub-web-react/src/app/entity/shared/components/styled/search/DownloadAsCsvModal.tsx @@ -21,7 +21,7 @@ type Props = { setShowDownloadAsCsvModal: (showDownloadAsCsvModal: boolean) => any; }; -const SEARCH_PAGE_SIZE_FOR_DOWNLOAD = 500; +const SEARCH_PAGE_SIZE_FOR_DOWNLOAD = 200; export default function DownloadAsCsvModal({ downloadSearchResults, diff --git a/datahub-web-react/yarn.lock b/datahub-web-react/yarn.lock index ef5be9dbe51552..9ea6c58eadc6b8 100644 --- a/datahub-web-react/yarn.lock +++ b/datahub-web-react/yarn.lock @@ -3,9 +3,9 @@ "@adobe/css-tools@^4.3.1": - version "4.3.1" - resolved "https://registry.yarnpkg.com/@adobe/css-tools/-/css-tools-4.3.1.tgz#abfccb8ca78075a2b6187345c26243c1a0842f28" - integrity sha512-/62yikz7NLScCGAAST5SHdnjaDJQBDq0M2muyRTpf2VQhw6StBg2ALiu73zSJQ4fMVLA+0uBhBHAle7Wg+2kSg== + version "4.3.2" + resolved "https://registry.yarnpkg.com/@adobe/css-tools/-/css-tools-4.3.2.tgz#a6abc715fb6884851fca9dad37fc34739a04fd11" + integrity sha512-DA5a1C0gD/pLOvhv33YMrbf2FK3oUzwNl9oOJqE4XVjuEtt6XIakRcsd7eLiOSPkp1kTRQGICTA8cKra/vFbjw== "@ampproject/remapping@^2.2.0": version "2.2.1" diff --git a/docker/README.md b/docker/README.md index 21d38bbb7f2eeb..3510649707c65d 100644 --- a/docker/README.md +++ b/docker/README.md @@ -64,7 +64,7 @@ successful release on Github will automatically publish the images. To build the full images (that we are going to publish), you need to run the following: ``` -COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker-compose -p datahub build +COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker compose -p datahub build ``` This is because we're relying on builtkit for multistage builds. It does not hurt also set `DATAHUB_VERSION` to diff --git a/docker/airflow/local_airflow.md b/docker/airflow/local_airflow.md index fbfc1d17327c53..e2bd62df84098b 100644 --- a/docker/airflow/local_airflow.md +++ b/docker/airflow/local_airflow.md @@ -54,7 +54,7 @@ curl -L 'https://raw.githubusercontent.com/datahub-project/datahub/master/metada First you need to initialize airflow in order to create initial database tables and the initial airflow user. ``` -docker-compose up airflow-init +docker compose up airflow-init ``` You should see the following final initialization message @@ -66,10 +66,10 @@ airflow_install_airflow-init_1 exited with code 0 ``` -Afterwards you need to start the airflow docker-compose +Afterwards you need to start the airflow docker compose ``` -docker-compose up +docker compose up ``` You should see a host of messages as Airflow starts up. diff --git a/docker/dev-with-cassandra.sh b/docker/dev-with-cassandra.sh index f71d91de190807..6f9cf6b88e8607 100755 --- a/docker/dev-with-cassandra.sh +++ b/docker/dev-with-cassandra.sh @@ -23,13 +23,13 @@ fi # YOU MUST BUILD VIA GRADLE BEFORE RUNNING THIS. DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd $DIR && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose \ -f docker-compose-with-cassandra.yml \ -f docker-compose.dev.yml \ $CONSUMERS_COMPOSE $MONITORING_COMPOSE $M1_COMPOSE \ pull \ && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose -p datahub \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose -p datahub \ -f docker-compose-with-cassandra.yml \ -f docker-compose.dev.yml \ $CONSUMERS_COMPOSE $MONITORING_COMPOSE $M1_COMPOSE \ diff --git a/docker/dev-without-neo4j.sh b/docker/dev-without-neo4j.sh index 07e51840bece76..78a8f4e1161be8 100755 --- a/docker/dev-without-neo4j.sh +++ b/docker/dev-without-neo4j.sh @@ -23,13 +23,13 @@ fi # Launches dev instances of DataHub images. See documentation for more details. # YOU MUST BUILD VIA GRADLE BEFORE RUNNING THIS. cd "${DIR}/../.." && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose \ -f "${DIR}/docker-compose-without-neo4j.yml" \ -f "${DIR}/docker-compose-without-neo4j.override.yml" \ -f "${DIR}/docker-compose.dev.yml" \ $CONSUMERS_COMPOSE $MONITORING_COMPOSE $M1_COMPOSE pull \ && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose -p datahub \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose -p datahub \ -f "${DIR}/docker-compose-without-neo4j.yml" \ -f "${DIR}/docker-compose-without-neo4j.override.yml" \ -f "${DIR}/docker-compose.dev.yml" \ diff --git a/docker/dev.sh b/docker/dev.sh index 9f7fafdaf3d5ec..86f58a416daf70 100755 --- a/docker/dev.sh +++ b/docker/dev.sh @@ -23,13 +23,13 @@ fi # YOU MUST BUILD VIA GRADLE BEFORE RUNNING THIS. DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd $DIR && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose \ -f docker-compose.yml \ -f docker-compose.override.yml \ -f docker-compose.dev.yml \ $CONSUMERS_COMPOSE $MONITORING_COMPOSE $M1_COMPOSE pull \ && \ - COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose -p datahub \ + COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose -p datahub \ -f docker-compose.yml \ -f docker-compose.override.yml \ -f docker-compose.dev.yml \ diff --git a/docker/ingestion/ingestion.sh b/docker/ingestion/ingestion.sh index 6d88b395d759b9..8fa7b8c565ff0d 100755 --- a/docker/ingestion/ingestion.sh +++ b/docker/ingestion/ingestion.sh @@ -2,4 +2,4 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" export DATAHUB_VERSION=${DATAHUB_VERSION:-head} -cd $DIR && docker-compose pull && docker-compose -p datahub up +cd $DIR && docker compose pull && docker compose -p datahub up diff --git a/docker/nuke.sh b/docker/nuke.sh index 875b739e9f48d1..364773b77b10e4 100755 --- a/docker/nuke.sh +++ b/docker/nuke.sh @@ -4,8 +4,8 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd $DIR # Tear down and clean up all DataHub-related containers, volumes, and network -docker-compose -p datahub down -v -docker-compose rm -f -v +docker compose -p datahub down -v +docker compose rm -f -v # Tear down ingestion container -(cd ingestion && docker-compose -p datahub down -v) +(cd ingestion && docker compose -p datahub down -v) diff --git a/docker/quickstart.sh b/docker/quickstart.sh index a7eadf18bcb664..90c7d4ba9d0a49 100755 --- a/docker/quickstart.sh +++ b/docker/quickstart.sh @@ -33,11 +33,11 @@ echo "Quickstarting DataHub: version ${DATAHUB_VERSION}" if docker volume ls | grep -c -q datahub_neo4jdata then echo "Datahub Neo4j volume found, starting with neo4j as graph service" - cd $DIR && docker-compose pull && docker-compose -p datahub up + cd $DIR && docker compose pull && docker compose -p datahub up else echo "No Datahub Neo4j volume found, starting with elasticsearch as graph service" cd $DIR && \ - DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker-compose -p datahub \ + DOCKER_DEFAULT_PLATFORM="$(uname -m)" docker compose -p datahub \ -f quickstart/docker-compose-without-neo4j.quickstart.yml \ $MONITORING_COMPOSE $CONSUMERS_COMPOSE $M1_COMPOSE up $@ fi diff --git a/docs/developers.md b/docs/developers.md index 4e31aceeb43821..b378ea282e20f2 100644 --- a/docs/developers.md +++ b/docs/developers.md @@ -101,13 +101,13 @@ Replace whatever container you want in the existing deployment. I.e, replacing datahub's backend (GMS): ```shell -(cd docker && COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker-compose -p datahub -f docker-compose-without-neo4j.yml -f docker-compose-without-neo4j.override.yml -f docker-compose.dev.yml up -d --no-deps --force-recreate --build datahub-gms) +(cd docker && COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker compose -p datahub -f docker-compose-without-neo4j.yml -f docker-compose-without-neo4j.override.yml -f docker-compose.dev.yml up -d --no-deps --force-recreate --build datahub-gms) ``` Running the local version of the frontend ```shell -(cd docker && COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker-compose -p datahub -f docker-compose-without-neo4j.yml -f docker-compose-without-neo4j.override.yml -f docker-compose.dev.yml up -d --no-deps --force-recreate --build datahub-frontend-react) +(cd docker && COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker compose -p datahub -f docker-compose-without-neo4j.yml -f docker-compose-without-neo4j.override.yml -f docker-compose.dev.yml up -d --no-deps --force-recreate --build datahub-frontend-react) ``` ## IDE Support diff --git a/docs/quickstart.md b/docs/quickstart.md index 29b22b54dc87a3..5856ef84c0074e 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -274,7 +274,7 @@ It is not intended for a production environment. This recommendation is based on #### Default Credentials -`quickstart` uses docker-compose configuration which includes default credentials for both DataHub, and it's underlying +`quickstart` uses docker compose configuration which includes default credentials for both DataHub, and it's underlying prerequisite data stores, such as MySQL. Additionally, other components are unauthenticated out of the box. This is a design choice to make development easier and is not best practice for a production environment. diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index 047699f084c612..b3cc350cc109fa 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -70,6 +70,7 @@ task customPackageGenerate(type: Exec, dependsOn: [environmentSetup, installPack def package_version = project.findProperty('package_version') commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " + + "pip install build && " + "./scripts/custom_package_codegen.sh '${package_name}' '${package_version}'" } diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 1c84a2759d23e6..33ff722a0d0dd6 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -14,6 +14,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | | `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) | +| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct) ## Extract Ownership from Tags ### Config Details @@ -961,6 +962,75 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` +## Simple Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|-------------------------------|----------|-----------------|---------------|----------------------------------------------------------------------------------------| +| `dataset_to_data_product_urns`| ✅ | Dict[str, str] | | Dataset Entity urn as key and dataproduct urn as value to create with dataset as asset.| + +Let’s suppose we’d like to add a set of dataproduct with specific datasets as its assets. To do so, we can use the `simple_add_dataset_dataproduct` transformer that’s included in the ingestion framework. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + + ```yaml + transformers: + - type: "simple_add_dataset_dataproduct" + config: + dataset_to_data_product_urns: + "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)": "urn:li:dataProduct:first" + "urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)": "urn:li:dataProduct:second" + ``` + +## Pattern Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------| +| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.| + +Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + + ```yaml + transformers: + - type: "pattern_add_dataset_dataproduct" + config: + dataset_to_data_product_urns_pattern: + rules: + ".*example1.*": "urn:li:dataProduct:first" + ".*example2.*": "urn:li:dataProduct:second" + ``` + +## Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|-----------------------------------|---------------|------------------------------------------------------------------------------------------| +| `get_data_product_to_add` | ✅ | callable[[str], Optional[str]] | | A function which takes dataset entity urn as input and return dataproduct urn to create. | + +If you'd like to add more complex logic for creating dataproducts, you can use the more generic add_dataset_dataproduct transformer, which calls a user-provided function to determine the dataproduct to create with specified datasets as its asset. + +```yaml +transformers: + - type: "add_dataset_dataproduct" + config: + get_data_product_to_add: "." +``` + +Then define your function to return a dataproduct entity urn, for example: + +```python +import datahub.emitter.mce_builder as builder + +def custom_dataproducts(entity_urn: str) -> Optional[str]: + """Compute the dataproduct urn to a given dataset urn.""" + + dataset_to_data_product_map = { + builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first" + } + return dataset_to_data_product_map.get(dataset_urn) +``` +Finally, you can install and use your custom transformer as [shown here](#installing-the-package). + ## Relationship Between replace_existing and semantics The transformer behaviour mentioned here is in context of `simple_add_dataset_ownership`, however it is applicable for all dataset transformers which are supporting `replace_existing` and `semantics` configuration attributes, for example `simple_add_dataset_tags` will add or remove tags as per behaviour mentioned in this section. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 10db019b513812..8bbabce4f749fc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -649,6 +649,9 @@ "pattern_add_dataset_schema_terms = datahub.ingestion.transformer.add_dataset_schema_terms:PatternAddDatasetSchemaTerms", "pattern_add_dataset_schema_tags = datahub.ingestion.transformer.add_dataset_schema_tags:PatternAddDatasetSchemaTags", "extract_ownership_from_tags = datahub.ingestion.transformer.extract_ownership_from_tags:ExtractOwnersFromTagsTransformer", + "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", + "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", + "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py new file mode 100644 index 00000000000000..45e92628430258 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py @@ -0,0 +1,133 @@ +import logging +from typing import Callable, Dict, List, Optional, Union + +import pydantic + +from datahub.configuration.common import ConfigModel, KeyValuePattern +from datahub.configuration.import_resolver import pydantic_resolve_key +from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetDataproductTransformer, +) +from datahub.metadata.schema_classes import MetadataChangeProposalClass +from datahub.specific.dataproduct import DataProductPatchBuilder + +logger = logging.getLogger(__name__) + + +class AddDatasetDataProductConfig(ConfigModel): + # dataset_urn -> data product urn + get_data_product_to_add: Callable[[str], Optional[str]] + + _resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add") + + +class AddDatasetDataProduct(DatasetDataproductTransformer): + """Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function.""" + + ctx: PipelineContext + config: AddDatasetDataProductConfig + + def __init__(self, config: AddDatasetDataProductConfig, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + self.config = config + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDataProduct": + config = AddDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + return None + + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + data_products: Dict[str, DataProductPatchBuilder] = {} + + logger.debug("Generating dataproducts") + for entity_urn in self.entity_map.keys(): + data_product_urn = self.config.get_data_product_to_add(entity_urn) + if data_product_urn: + if data_product_urn not in data_products: + data_products[data_product_urn] = DataProductPatchBuilder( + data_product_urn + ).add_asset(entity_urn) + else: + data_products[data_product_urn] = data_products[ + data_product_urn + ].add_asset(entity_urn) + + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] + for data_product in data_products.values(): + mcps.extend(list(data_product.build())) + return mcps + + +class SimpleDatasetDataProductConfig(ConfigModel): + dataset_to_data_product_urns: Dict[str, str] + + +class SimpleAddDatasetDataProduct(AddDatasetDataProduct): + """Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" + + def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext): + + generic_config = AddDatasetDataProductConfig( + get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get( + dataset_urn + ), + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetDataProduct": + config = SimpleDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) + + +class PatternDatasetDataProductConfig(ConfigModel): + dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all() + + @pydantic.root_validator(pre=True) + def validate_pattern_value(cls, values: Dict) -> Dict: + rules = values["dataset_to_data_product_urns_pattern"]["rules"] + for key, value in rules.items(): + if isinstance(value, list) and len(value) > 1: + raise ValueError( + "Same dataset cannot be an asset of two different data product." + ) + elif isinstance(value, str): + rules[key] = [rules[key]] + return values + + +class PatternAddDatasetDataProduct(AddDatasetDataProduct): + """Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" + + def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext): + dataset_to_data_product = config.dataset_to_data_product_urns_pattern + generic_config = AddDatasetDataProductConfig( + get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value( + dataset_urn + )[0] + if dataset_to_data_product.value(dataset_urn) + else None, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternAddDatasetDataProduct": + config = PatternDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 72a8c226e491ed..7508b33c6bfc67 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,5 +1,5 @@ import logging -from typing import Callable, List, Optional, cast +from typing import Callable, List, Optional, Union, cast import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( @@ -13,6 +13,7 @@ from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer from datahub.metadata.schema_classes import ( GlobalTagsClass, + MetadataChangeProposalClass, TagAssociationClass, TagKeyClass, ) @@ -65,9 +66,13 @@ def transform_aspect( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect ) - def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: - mcps: List[MetadataChangeProposalWrapper] = [] + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] logger.debug("Generating tags") @@ -121,7 +126,6 @@ class PatternAddDatasetTags(AddDatasetTags): """Transformer that adds a specified set of tags to each dataset.""" def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext): - config.tag_pattern.all tag_pattern = config.tag_pattern generic_config = AddDatasetTagsConfig( get_tags_to_add=lambda _: [ diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 8b6f42dcfba4b8..254b3d084f2be2 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -1,6 +1,6 @@ import logging from abc import ABCMeta, abstractmethod -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union import datahub.emitter.mce_builder as builder from datahub.emitter.aspect import ASPECT_MAP @@ -28,7 +28,9 @@ def _update_work_unit_id( class HandleEndOfStreamTransformer: - def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: + def handle_end_of_stream( + self, + ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: return [] @@ -206,15 +208,19 @@ def _handle_end_of_stream( ): return - mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream() + mcps: Sequence[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = self.handle_end_of_stream() for mcp in mcps: - if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error + if ( + mcp.aspect is None or mcp.aspectName is None or mcp.entityUrn is None + ): # to silent the lint error continue record_metadata = _update_work_unit_id( envelope=envelope, - aspect_name=mcp.aspect.get_aspect_name(), # type: ignore + aspect_name=mcp.aspectName, urn=mcp.entityUrn, ) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 0b2433c3a1fe2b..79151f7b11bf02 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -118,3 +118,8 @@ def aspect_name(self) -> str: class DatasetSchemaMetadataTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "schemaMetadata" + + +class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta): + def aspect_name(self) -> str: + return "dataProductProperties" diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 546549dcf37a4a..5152f406ed3ce0 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1,3 +1,4 @@ +import json import re from typing import ( Any, @@ -27,6 +28,11 @@ from datahub.ingestion.transformer.add_dataset_browse_path import ( AddDatasetBrowsePathTransformer, ) +from datahub.ingestion.transformer.add_dataset_dataproduct import ( + AddDatasetDataProduct, + PatternAddDatasetDataProduct, + SimpleAddDatasetDataProduct, +) from datahub.ingestion.transformer.add_dataset_ownership import ( AddDatasetOwnership, PatternAddDatasetOwnership, @@ -873,7 +879,7 @@ def test_pattern_dataset_tags_transformation(mock_time): assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags -def test_import_resolver(): +def test_add_dataset_tags_transformation(): transformer = AddDatasetTags.create( { "get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method" @@ -2665,3 +2671,156 @@ def test_pattern_dataset_schema_tags_transformation_patch( assert builder.make_tag_urn("pii") in global_tags_urn assert builder.make_tag_urn("FirstName") in global_tags_urn assert builder.make_tag_urn("Name") in global_tags_urn + + +def test_simple_dataset_data_product_transformation(mock_time): + transformer = SimpleAddDatasetDataProduct.create( + { + "dataset_to_data_product_urns": { + builder.make_dataset_urn( + "bigquery", "example1" + ): "urn:li:dataProduct:first", + builder.make_dataset_urn( + "bigquery", "example2" + ): "urn:li:dataProduct:second", + builder.make_dataset_urn( + "bigquery", "example3" + ): "urn:li:dataProduct:first", + } + }, + PipelineContext(run_id="test-dataproduct"), + ) + + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [ + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example1") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example2") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example3") + ), + EndOfStream(), + ] + ] + ) + ) + + assert len(outputs) == 6 + + # Check new dataproduct entity should be there + assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[3].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[3].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1"), + builder.make_dataset_urn("bigquery", "example3"), + ] + + second_data_product_aspect = json.loads( + outputs[4].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example2") + ] + + assert isinstance(outputs[5].record, EndOfStream) + + +def test_pattern_dataset_data_product_transformation(mock_time): + transformer = PatternAddDatasetDataProduct.create( + { + "dataset_to_data_product_urns_pattern": { + "rules": { + ".*example1.*": "urn:li:dataProduct:first", + ".*": "urn:li:dataProduct:second", + } + }, + }, + PipelineContext(run_id="test-dataproducts"), + ) + + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [ + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example1") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example2") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example3") + ), + EndOfStream(), + ] + ] + ) + ) + + assert len(outputs) == 6 + + # Check new dataproduct entity should be there + assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[3].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[3].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1") + ] + + second_data_product_aspect = json.loads( + outputs[4].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example2"), + builder.make_dataset_urn("bigquery", "example3"), + ] + + assert isinstance(outputs[5].record, EndOfStream) + + +def dummy_data_product_resolver_method(dataset_urn): + dataset_to_data_product_map = { + builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first" + } + return dataset_to_data_product_map.get(dataset_urn) + + +def test_add_dataset_data_product_transformation(): + transformer = AddDatasetDataProduct.create( + { + "get_data_product_to_add": "tests.unit.test_transform_dataset.dummy_data_product_resolver_method" + }, + PipelineContext(run_id="test-dataproduct"), + ) + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [make_generic_dataset(), EndOfStream()] + ] + ) + ) + # Check new dataproduct entity should be there + assert outputs[1].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[1].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[1].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1") + ] diff --git a/smoke-test/tests/cypress/cypress/e2e/search/query_and_filter_search.js b/smoke-test/tests/cypress/cypress/e2e/search/query_and_filter_search.js index 59105be587803b..ee927feeaea586 100644 --- a/smoke-test/tests/cypress/cypress/e2e/search/query_and_filter_search.js +++ b/smoke-test/tests/cypress/cypress/e2e/search/query_and_filter_search.js @@ -6,7 +6,6 @@ const datasetNames = { tags: "some-cypress-feature-1", hivePlatform: "cypress_logging_events", airflowPlatform: "User Creations", - awsPlatform: "project/root/events/logging_events_bckp", hdfsPlatform: "SampleHdfsDataset" }; @@ -39,7 +38,7 @@ describe("auto-complete dropdown, filter plus query search test", () => { //Dashboard searchToExecute("*"); - selectFilteredEntity("Type", "Dashboards", "filter__entityType"); + selectFilteredEntity("Type", "Dashboards", "filter__entityType"); cy.clickOptionWithText(datasetNames.dashboardsType); verifyFilteredEntity('Dashboard'); @@ -74,12 +73,6 @@ describe("auto-complete dropdown, filter plus query search test", () => { cy.clickOptionWithText(datasetNames.hivePlatform); verifyFilteredEntity('Hive'); - //AWS S3 - searchToExecute("*"); - selectFilteredEntity("Platform", "AWS S3", "filter_platform"); - cy.clickOptionWithText(datasetNames.awsPlatform); - verifyFilteredEntity('AWS S3'); - //HDFS searchToExecute("*"); selectFilteredEntity("Platform", "HDFS", "filter_platform");