From cb8002413e50abbb971d8259b60c0309a9caef13 Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Sat, 6 Jan 2024 03:33:22 +0530 Subject: [PATCH] feat(ingestion/transformer): Add dataset dataproduct transformer (#9491) Co-authored-by: Harshal Sheth --- .../docs/transformer/dataset_transformer.md | 70 ++++++++ metadata-ingestion/setup.py | 3 + .../transformer/add_dataset_dataproduct.py | 133 +++++++++++++++ .../ingestion/transformer/add_dataset_tags.py | 12 +- .../ingestion/transformer/base_transformer.py | 16 +- .../transformer/dataset_transformer.py | 5 + .../tests/unit/test_transform_dataset.py | 161 +++++++++++++++++- 7 files changed, 390 insertions(+), 10 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 1c84a2759d23e6..33ff722a0d0dd6 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -14,6 +14,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | | `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) | +| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct) ## Extract Ownership from Tags ### Config Details @@ -961,6 +962,75 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` +## Simple Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|-------------------------------|----------|-----------------|---------------|----------------------------------------------------------------------------------------| +| `dataset_to_data_product_urns`| ✅ | Dict[str, str] | | Dataset Entity urn as key and dataproduct urn as value to create with dataset as asset.| + +Let’s suppose we’d like to add a set of dataproduct with specific datasets as its assets. To do so, we can use the `simple_add_dataset_dataproduct` transformer that’s included in the ingestion framework. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + + ```yaml + transformers: + - type: "simple_add_dataset_dataproduct" + config: + dataset_to_data_product_urns: + "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)": "urn:li:dataProduct:first" + "urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)": "urn:li:dataProduct:second" + ``` + +## Pattern Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------| +| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.| + +Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + + ```yaml + transformers: + - type: "pattern_add_dataset_dataproduct" + config: + dataset_to_data_product_urns_pattern: + rules: + ".*example1.*": "urn:li:dataProduct:first" + ".*example2.*": "urn:li:dataProduct:second" + ``` + +## Add Dataset dataProduct +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|-----------------------------------|---------------|------------------------------------------------------------------------------------------| +| `get_data_product_to_add` | ✅ | callable[[str], Optional[str]] | | A function which takes dataset entity urn as input and return dataproduct urn to create. | + +If you'd like to add more complex logic for creating dataproducts, you can use the more generic add_dataset_dataproduct transformer, which calls a user-provided function to determine the dataproduct to create with specified datasets as its asset. + +```yaml +transformers: + - type: "add_dataset_dataproduct" + config: + get_data_product_to_add: "." +``` + +Then define your function to return a dataproduct entity urn, for example: + +```python +import datahub.emitter.mce_builder as builder + +def custom_dataproducts(entity_urn: str) -> Optional[str]: + """Compute the dataproduct urn to a given dataset urn.""" + + dataset_to_data_product_map = { + builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first" + } + return dataset_to_data_product_map.get(dataset_urn) +``` +Finally, you can install and use your custom transformer as [shown here](#installing-the-package). + ## Relationship Between replace_existing and semantics The transformer behaviour mentioned here is in context of `simple_add_dataset_ownership`, however it is applicable for all dataset transformers which are supporting `replace_existing` and `semantics` configuration attributes, for example `simple_add_dataset_tags` will add or remove tags as per behaviour mentioned in this section. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 10db019b513812..8bbabce4f749fc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -649,6 +649,9 @@ "pattern_add_dataset_schema_terms = datahub.ingestion.transformer.add_dataset_schema_terms:PatternAddDatasetSchemaTerms", "pattern_add_dataset_schema_tags = datahub.ingestion.transformer.add_dataset_schema_tags:PatternAddDatasetSchemaTags", "extract_ownership_from_tags = datahub.ingestion.transformer.extract_ownership_from_tags:ExtractOwnersFromTagsTransformer", + "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", + "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", + "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py new file mode 100644 index 00000000000000..45e92628430258 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py @@ -0,0 +1,133 @@ +import logging +from typing import Callable, Dict, List, Optional, Union + +import pydantic + +from datahub.configuration.common import ConfigModel, KeyValuePattern +from datahub.configuration.import_resolver import pydantic_resolve_key +from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetDataproductTransformer, +) +from datahub.metadata.schema_classes import MetadataChangeProposalClass +from datahub.specific.dataproduct import DataProductPatchBuilder + +logger = logging.getLogger(__name__) + + +class AddDatasetDataProductConfig(ConfigModel): + # dataset_urn -> data product urn + get_data_product_to_add: Callable[[str], Optional[str]] + + _resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add") + + +class AddDatasetDataProduct(DatasetDataproductTransformer): + """Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function.""" + + ctx: PipelineContext + config: AddDatasetDataProductConfig + + def __init__(self, config: AddDatasetDataProductConfig, ctx: PipelineContext): + super().__init__() + self.ctx = ctx + self.config = config + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDataProduct": + config = AddDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + return None + + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + data_products: Dict[str, DataProductPatchBuilder] = {} + + logger.debug("Generating dataproducts") + for entity_urn in self.entity_map.keys(): + data_product_urn = self.config.get_data_product_to_add(entity_urn) + if data_product_urn: + if data_product_urn not in data_products: + data_products[data_product_urn] = DataProductPatchBuilder( + data_product_urn + ).add_asset(entity_urn) + else: + data_products[data_product_urn] = data_products[ + data_product_urn + ].add_asset(entity_urn) + + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] + for data_product in data_products.values(): + mcps.extend(list(data_product.build())) + return mcps + + +class SimpleDatasetDataProductConfig(ConfigModel): + dataset_to_data_product_urns: Dict[str, str] + + +class SimpleAddDatasetDataProduct(AddDatasetDataProduct): + """Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" + + def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext): + + generic_config = AddDatasetDataProductConfig( + get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get( + dataset_urn + ), + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetDataProduct": + config = SimpleDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) + + +class PatternDatasetDataProductConfig(ConfigModel): + dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all() + + @pydantic.root_validator(pre=True) + def validate_pattern_value(cls, values: Dict) -> Dict: + rules = values["dataset_to_data_product_urns_pattern"]["rules"] + for key, value in rules.items(): + if isinstance(value, list) and len(value) > 1: + raise ValueError( + "Same dataset cannot be an asset of two different data product." + ) + elif isinstance(value, str): + rules[key] = [rules[key]] + return values + + +class PatternAddDatasetDataProduct(AddDatasetDataProduct): + """Transformer that adds a specified dataproduct entity for provided dataset as its asset.""" + + def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext): + dataset_to_data_product = config.dataset_to_data_product_urns_pattern + generic_config = AddDatasetDataProductConfig( + get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value( + dataset_urn + )[0] + if dataset_to_data_product.value(dataset_urn) + else None, + ) + super().__init__(generic_config, ctx) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternAddDatasetDataProduct": + config = PatternDatasetDataProductConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 72a8c226e491ed..7508b33c6bfc67 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,5 +1,5 @@ import logging -from typing import Callable, List, Optional, cast +from typing import Callable, List, Optional, Union, cast import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( @@ -13,6 +13,7 @@ from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer from datahub.metadata.schema_classes import ( GlobalTagsClass, + MetadataChangeProposalClass, TagAssociationClass, TagKeyClass, ) @@ -65,9 +66,13 @@ def transform_aspect( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect ) - def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: - mcps: List[MetadataChangeProposalWrapper] = [] + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] logger.debug("Generating tags") @@ -121,7 +126,6 @@ class PatternAddDatasetTags(AddDatasetTags): """Transformer that adds a specified set of tags to each dataset.""" def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext): - config.tag_pattern.all tag_pattern = config.tag_pattern generic_config = AddDatasetTagsConfig( get_tags_to_add=lambda _: [ diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 8b6f42dcfba4b8..254b3d084f2be2 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -1,6 +1,6 @@ import logging from abc import ABCMeta, abstractmethod -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union import datahub.emitter.mce_builder as builder from datahub.emitter.aspect import ASPECT_MAP @@ -28,7 +28,9 @@ def _update_work_unit_id( class HandleEndOfStreamTransformer: - def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: + def handle_end_of_stream( + self, + ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: return [] @@ -206,15 +208,19 @@ def _handle_end_of_stream( ): return - mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream() + mcps: Sequence[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = self.handle_end_of_stream() for mcp in mcps: - if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error + if ( + mcp.aspect is None or mcp.aspectName is None or mcp.entityUrn is None + ): # to silent the lint error continue record_metadata = _update_work_unit_id( envelope=envelope, - aspect_name=mcp.aspect.get_aspect_name(), # type: ignore + aspect_name=mcp.aspectName, urn=mcp.entityUrn, ) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 0b2433c3a1fe2b..79151f7b11bf02 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -118,3 +118,8 @@ def aspect_name(self) -> str: class DatasetSchemaMetadataTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "schemaMetadata" + + +class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta): + def aspect_name(self) -> str: + return "dataProductProperties" diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 546549dcf37a4a..5152f406ed3ce0 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1,3 +1,4 @@ +import json import re from typing import ( Any, @@ -27,6 +28,11 @@ from datahub.ingestion.transformer.add_dataset_browse_path import ( AddDatasetBrowsePathTransformer, ) +from datahub.ingestion.transformer.add_dataset_dataproduct import ( + AddDatasetDataProduct, + PatternAddDatasetDataProduct, + SimpleAddDatasetDataProduct, +) from datahub.ingestion.transformer.add_dataset_ownership import ( AddDatasetOwnership, PatternAddDatasetOwnership, @@ -873,7 +879,7 @@ def test_pattern_dataset_tags_transformation(mock_time): assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags -def test_import_resolver(): +def test_add_dataset_tags_transformation(): transformer = AddDatasetTags.create( { "get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method" @@ -2665,3 +2671,156 @@ def test_pattern_dataset_schema_tags_transformation_patch( assert builder.make_tag_urn("pii") in global_tags_urn assert builder.make_tag_urn("FirstName") in global_tags_urn assert builder.make_tag_urn("Name") in global_tags_urn + + +def test_simple_dataset_data_product_transformation(mock_time): + transformer = SimpleAddDatasetDataProduct.create( + { + "dataset_to_data_product_urns": { + builder.make_dataset_urn( + "bigquery", "example1" + ): "urn:li:dataProduct:first", + builder.make_dataset_urn( + "bigquery", "example2" + ): "urn:li:dataProduct:second", + builder.make_dataset_urn( + "bigquery", "example3" + ): "urn:li:dataProduct:first", + } + }, + PipelineContext(run_id="test-dataproduct"), + ) + + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [ + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example1") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example2") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example3") + ), + EndOfStream(), + ] + ] + ) + ) + + assert len(outputs) == 6 + + # Check new dataproduct entity should be there + assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[3].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[3].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1"), + builder.make_dataset_urn("bigquery", "example3"), + ] + + second_data_product_aspect = json.loads( + outputs[4].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example2") + ] + + assert isinstance(outputs[5].record, EndOfStream) + + +def test_pattern_dataset_data_product_transformation(mock_time): + transformer = PatternAddDatasetDataProduct.create( + { + "dataset_to_data_product_urns_pattern": { + "rules": { + ".*example1.*": "urn:li:dataProduct:first", + ".*": "urn:li:dataProduct:second", + } + }, + }, + PipelineContext(run_id="test-dataproducts"), + ) + + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [ + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example1") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example2") + ), + make_generic_dataset( + entity_urn=builder.make_dataset_urn("bigquery", "example3") + ), + EndOfStream(), + ] + ] + ) + ) + + assert len(outputs) == 6 + + # Check new dataproduct entity should be there + assert outputs[3].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[3].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[3].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1") + ] + + second_data_product_aspect = json.loads( + outputs[4].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example2"), + builder.make_dataset_urn("bigquery", "example3"), + ] + + assert isinstance(outputs[5].record, EndOfStream) + + +def dummy_data_product_resolver_method(dataset_urn): + dataset_to_data_product_map = { + builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first" + } + return dataset_to_data_product_map.get(dataset_urn) + + +def test_add_dataset_data_product_transformation(): + transformer = AddDatasetDataProduct.create( + { + "get_data_product_to_add": "tests.unit.test_transform_dataset.dummy_data_product_resolver_method" + }, + PipelineContext(run_id="test-dataproduct"), + ) + outputs = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}) + for input in [make_generic_dataset(), EndOfStream()] + ] + ) + ) + # Check new dataproduct entity should be there + assert outputs[1].record.entityUrn == "urn:li:dataProduct:first" + assert outputs[1].record.aspectName == "dataProductProperties" + + first_data_product_aspect = json.loads( + outputs[1].record.aspect.value.decode("utf-8") + ) + assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [ + builder.make_dataset_urn("bigquery", "example1") + ]