diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 66cf47c19de704..0acc134d4ef00b 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -20,24 +20,70 @@ The below table shows transformer which can transform aspects of entity [Dataset ### Config Details | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | -| `tag_prefix` | | str | | Regex to use for tags to match against. Supports Regex to match a prefix which is used to remove content. Rest of string is considered owner ID for creating owner URN. | -| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | +| `tag_pattern` | | str | | Regex to use for tags to match against. Supports Regex to match a pattern which is used to remove content. Rest of string is considered owner ID for creating owner URN. | +| `is_user` | | bool | `true` | Whether should be consider a user or not. If `false` then considered a group. | +| `owner_character_mapping` | | dict[str, str] | | A mapping of extracted owner character to datahub owner character. | | `email_domain` | | str | | If set then this is appended to create owner URN. | +| `extract_owner_type_from_tag_pattern` | | str | `false` | Whether to extract an owner type from provided tag pattern first group. If `true`, no need to provide owner_type and owner_type_urn config. For example: if provided tag pattern is `(.*)_owner_email:` and actual tag is `developer_owner_email`, then extracted owner type will be `developer`.| | `owner_type` | | str | `TECHNICAL_OWNER` | Ownership type. | | `owner_type_urn` | | str | `None` | Set to a custom ownership type's URN if using custom ownership. | -Matches against a tag prefix and considers string in tags after that prefix as owner to create ownership. +Let’s suppose we’d like to add a dataset ownerships based on part of dataset tags. To do so, we can use the `extract_ownership_from_tags` transformer that’s included in the ingestion framework. + +The config, which we’d append to our ingestion recipe YAML, would look like this: ```yaml transformers: - type: "extract_ownership_from_tags" config: - tag_prefix: "dbt:techno-genie:" - is_user: true - email_domain: "coolcompany.com" + tag_pattern: "owner_email:" ``` +So if we have input dataset tag like +- `urn:li:tag:dataset_owner_email:abc@email.com` +- `urn:li:tag:dataset_owner_email:xyz@email.com` + +The portion of the tag after the matched tag pattern will be converted into an owner. Hence users `abc@email.com` and `xyz@email.com` will be added as owners. + +### Examples + +- Add owners, however owner should be considered as group and also email domain not provided in tag string. For example: from tag urn `urn:li:tag:dataset_owner:abc` extracted owner urn should be `urn:li:corpGroup:abc@email.com` then config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner:" + is_user: false + email_domain: "email.com" + ``` +- Add owners, however owner type and owner type urn wanted to provide externally. For example: from tag urn `urn:li:tag:dataset_owner_email:abc@email.com` owner type should be `CUSTOM` and owner type urn as `"urn:li:ownershipType:data_product"` then config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner_email:" + owner_type: "CUSTOM" + owner_type_urn: "urn:li:ownershipType:data_product" + ``` +- Add owners, however some owner characters needs to replace with some other characters before ingestion. For example: from tag urn `urn:li:tag:dataset_owner_email:abc_xyz-email_com` extracted owner urn should be `urn:li:corpGroup:abc.xyz@email.com` then config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "owner_email:" + owner_character_mapping: + "_": ".", + "-": "@", + ``` +- Add owners, however owner type also need to extracted from tag pattern. For example: from tag urn `urn:li:tag:data_producer_owner_email:abc@email.com` extracted owner type should be `data_producer` then config would look like this: + ```yaml + transformers: + - type: "extract_ownership_from_tags" + config: + tag_pattern: "(.*)_owner_email:" + extract_owner_type_from_tag_pattern: true + ``` + ## Clean suffix prefix from Ownership ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index d9933db67f66a5..1c29d38273d823 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -246,6 +246,10 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str: return f"urn:li:{owner_type.value}:{owner}" +def make_ownership_type_urn(type: str) -> str: + return f"urn:li:ownershipType:{type}" + + def make_term_urn(term: str) -> str: """ Makes a term urn if the input is not a term urn already diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py index c4eba0e011de3d..e509b4b719166b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py @@ -1,10 +1,11 @@ import logging import re from functools import lru_cache -from typing import List, Optional, Sequence, Union, cast +from typing import Dict, List, Optional, Sequence, Union, cast -from datahub.configuration.common import TransformerSemanticsConfigModel -from datahub.emitter.mce_builder import Aspect +from datahub.configuration.common import ConfigModel +from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.emitter.mce_builder import Aspect, make_ownership_type_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer @@ -22,13 +23,19 @@ logger = logging.getLogger(__name__) -class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel): - tag_prefix: str +class ExtractOwnersFromTagsConfig(ConfigModel): + tag_pattern: str = "" is_user: bool = True + owner_character_mapping: Optional[Dict[str, str]] = None email_domain: Optional[str] = None + extract_owner_type_from_tag_pattern: bool = False owner_type: str = "TECHNICAL_OWNER" owner_type_urn: Optional[str] = None + _rename_tag_prefix_to_tag_pattern = pydantic_renamed_field( + "tag_prefix", "tag_pattern" + ) + @lru_cache(maxsize=10) def get_owner_type(owner_type_str: str) -> str: @@ -63,6 +70,19 @@ def get_owner_urn(self, owner_str: str) -> str: return owner_str + "@" + self.config.email_domain return owner_str + def convert_owner_as_per_mapping(self, owner: str) -> str: + if self.config.owner_character_mapping: + # Sort the provided mapping by its length. + # Eg: Suppose we have {"_":".", "__":"#"} character mapping. + # In this case "__" character should get replace first compare to "_" character. + for key in sorted( + self.config.owner_character_mapping.keys(), + key=len, + reverse=True, + ): + owner = owner.replace(key, self.config.owner_character_mapping[key]) + return owner + def handle_end_of_stream( self, ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: @@ -79,29 +99,41 @@ def transform_aspect( owners: List[OwnerClass] = [] for tag_class in tags: - tag_urn = TagUrn.from_string(tag_class.tag) - tag_str = tag_urn.entity_ids[0] - re_match = re.search(self.config.tag_prefix, tag_str) + tag_str = TagUrn.from_string(tag_class.tag).name + re_match = re.search(self.config.tag_pattern, tag_str) if re_match: owner_str = tag_str[re_match.end() :].strip() + owner_str = self.convert_owner_as_per_mapping(owner_str) owner_urn_str = self.get_owner_urn(owner_str) - if self.config.is_user: - owner_urn = str(CorpuserUrn(owner_urn_str)) + owner_urn = ( + str(CorpuserUrn(owner_urn_str)) + if self.config.is_user + else str(CorpGroupUrn(owner_urn_str)) + ) + + if self.config.extract_owner_type_from_tag_pattern: + if re_match.groups(): + owners.append( + OwnerClass( + owner=owner_urn, + type=OwnershipTypeClass.CUSTOM, + typeUrn=make_ownership_type_urn(re_match.group(1)), + ) + ) else: - owner_urn = str(CorpGroupUrn(owner_urn_str)) - owner_type = get_owner_type(self.config.owner_type) - if owner_type == OwnershipTypeClass.CUSTOM: - assert ( - self.config.owner_type_urn is not None - ), "owner_type_urn must be set if owner_type is CUSTOM" - - owners.append( - OwnerClass( - owner=owner_urn, - type=owner_type, - typeUrn=self.config.owner_type_urn, + owner_type = get_owner_type(self.config.owner_type) + if owner_type == OwnershipTypeClass.CUSTOM: + assert ( + self.config.owner_type_urn is not None + ), "owner_type_urn must be set if owner_type is CUSTOM" + + owners.append( + OwnerClass( + owner=owner_urn, + type=owner_type, + typeUrn=self.config.owner_type_urn, + ) ) - ) self.owner_mcps.append( MetadataChangeProposalWrapper( @@ -111,5 +143,4 @@ def transform_aspect( ), ) ) - - return None + return aspect diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 42d65fdf02683f..426d4dc12f2086 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -28,6 +28,7 @@ BigqueryDataset, BigqueryProject, BigQuerySchemaApi, + BigqueryTable, BigqueryTableSnapshot, BigqueryView, ) @@ -35,10 +36,19 @@ LineageEdge, LineageEdgeColumnMapping, ) +from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties from datahub.metadata.schema_classes import ( + ContainerClass, + DataPlatformInstanceClass, DatasetPropertiesClass, + GlobalTagsClass, MetadataChangeProposalClass, + SchemaMetadataClass, + StatusClass, + SubTypesClass, + TagAssociationClass, + TimeStampClass, ) @@ -352,6 +362,97 @@ def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock) assert projects == [] +@pytest.fixture +def bigquery_table() -> BigqueryTable: + now = datetime.now(tz=timezone.utc) + return BigqueryTable( + name="table1", + comment="comment1", + created=now, + last_altered=now, + size_in_bytes=2400, + rows_count=2, + expires=now - timedelta(days=10), + labels={"data_producer_owner_email": "games_team-nytimes_com"}, + num_partitions=1, + max_partition_id="1", + max_shard_id="1", + active_billable_bytes=2400, + long_term_billable_bytes=2400, + ) + + +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): + project_id = "test-project" + dataset_name = "test-dataset" + config = BigQueryV2Config.parse_obj( + { + "project_id": project_id, + "capture_table_label_as_tag": True, + } + ) + source: BigqueryV2Source = BigqueryV2Source( + config=config, ctx=PipelineContext(run_id="test") + ) + + gen = source.gen_table_dataset_workunits( + bigquery_table, [], project_id, dataset_name + ) + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert mcp.aspect == StatusClass(removed=False) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, SchemaMetadataClass) + assert mcp.aspect.schemaName == f"{project_id}.{dataset_name}.{bigquery_table.name}" + assert mcp.aspect.fields == [] + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, DatasetPropertiesClass) + assert mcp.aspect.name == bigquery_table.name + assert ( + mcp.aspect.qualifiedName == f"{project_id}.{dataset_name}.{bigquery_table.name}" + ) + assert mcp.aspect.description == bigquery_table.comment + assert mcp.aspect.created == TimeStampClass( + time=int(bigquery_table.created.timestamp() * 1000) + ) + assert mcp.aspect.lastModified == TimeStampClass( + time=int(bigquery_table.last_altered.timestamp() * 1000) + ) + assert mcp.aspect.tags == [] + + assert mcp.aspect.customProperties == { + "expiration_date": str(bigquery_table.expires), + "size_in_bytes": str(bigquery_table.size_in_bytes), + "billable_bytes_active": str(bigquery_table.active_billable_bytes), + "billable_bytes_long_term": str(bigquery_table.long_term_billable_bytes), + "number_of_partitions": str(bigquery_table.num_partitions), + "max_partition_id": str(bigquery_table.max_partition_id), + "is_partitioned": "True", + "max_shard_id": str(bigquery_table.max_shard_id), + "is_sharded": "True", + } + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, GlobalTagsClass) + assert mcp.aspect.tags == [ + TagAssociationClass( + "urn:li:tag:data_producer_owner_email:games_team-nytimes_com" + ) + ] + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, ContainerClass) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, DataPlatformInstanceClass) + + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert isinstance(mcp.aspect, SubTypesClass) + assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE + + @patch.object(BigQueryV2Config, "get_bigquery_client") def test_simple_upstream_table_generation(get_bq_client_mock): a: BigQueryTableRef = BigQueryTableRef( diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 6828c741dda2b5..c31ec12abfbd71 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -643,6 +643,7 @@ def _test_owner( config: Dict, expected_owner: str, expected_owner_type: Optional[str] = None, + expected_owner_type_urn: Optional[str] = None, ) -> None: dataset = make_generic_dataset( aspects=[ @@ -682,6 +683,8 @@ def _test_owner( assert owner.owner == expected_owner + assert owner.typeUrn == expected_owner_type_urn + _test_owner( tag="owner:foo", config={ @@ -736,6 +739,25 @@ def _test_owner( }, expected_owner="urn:li:corpuser:foo@example.com", expected_owner_type=OwnershipTypeClass.CUSTOM, + expected_owner_type_urn="urn:li:ownershipType:ad8557d6-dcb9-4d2a-83fc-b7d0d54f3e0f", + ) + _test_owner( + tag="data_producer_owner_email:abc_xyz-email_com", + config={ + "tag_pattern": "(.*)_owner_email:", + "owner_character_mapping": { + "_": ".", + "-": "@", + "__": "_", + "--": "-", + "_-": "#", + "-_": " ", + }, + "extract_owner_type_from_tag_pattern": True, + }, + expected_owner="urn:li:corpuser:abc.xyz@email.com", + expected_owner_type=OwnershipTypeClass.CUSTOM, + expected_owner_type_urn="urn:li:ownershipType:data_producer", )