diff --git a/docs/developers.md b/docs/developers.md index 980aa3e3acf879..0c9d7bee3d79f2 100644 --- a/docs/developers.md +++ b/docs/developers.md @@ -169,3 +169,28 @@ This means you're running out of space on your disk to build. Please free up som #### `Build failed` for task `./gradlew :datahub-frontend:dist -x yarnTest -x yarnLint` This could mean that you need to update your [Yarn](https://yarnpkg.com/getting-started/install) version + +#### `:buildSrc:compileJava` task fails with `package com.linkedin.metadata.models.registry.config does not exist` and `cannot find symbol` error for `Entity` + +There are currently two symbolic links within the [buildSrc](https://github.com/datahub-project/datahub/tree/master/buildSrc) directory for the [com.linkedin.metadata.aspect.plugins.config](https://github.com/datahub-project/datahub/blob/master/buildSrc/src/main/java/com/linkedin/metadata/aspect/plugins/config) and [com.linkedin.metadata.models.registry.config](https://github.com/datahub-project/datahub/blob/master/buildSrc/src/main/java/com/linkedin/metadata/models/registry/config +) packages, which points to the corresponding packages in the [entity-registry](https://github.com/datahub-project/datahub/tree/master/entity-registry) subproject. + +When the repository is checked out using Windows 10/11 - even if WSL is later used for building using the mounted Windows filesystem in `/mnt/` - the symbolic links might have not been created correctly, instead the symbolic links were checked out as plain files. Although it is technically possible to use the mounted Windows filesystem in `/mnt/` for building in WSL, it is **strongly recommended** to checkout the repository within the Linux filesystem (e.g., in `/home/`) and building it from there, because accessing the Windows filesystem from Linux is relatively slow compared to the Linux filesystem and slows down the whole building process. + +To be able to create symbolic links in Windows 10/11 the [Developer Mode](https://learn.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development) has to be enabled first. Then the following commands can be used to enable [symbolic links in Git](https://git-scm.com/docs/git-config#Documentation/git-config.txt-coresymlinks) and recreating the symbolic links: + +```shell +# enable core.symlinks config +git config --global core.symlinks true + +# check the current core.sysmlinks config and scope +git config --show-scope --show-origin core.symlinks + +# in case the core.sysmlinks config is still set locally to false, remove the local config +git config --unset core.symlinks + +# reset the current branch to recreate the missing symbolic links (alternatively it is also possibly to switch branches away and back) +git reset --hard +``` + +See also [here](https://stackoverflow.com/questions/5917249/git-symbolic-links-in-windows/59761201#59761201) for more information on how to enable symbolic links on Windows 10/11 and Git. diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 217d65ed05d6ae..64d1438cfcc73d 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | -| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) | +| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains)
- [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) | | `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct) ## Extract Ownership from Tags @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` + + + +## Domain Mapping Based on Tags +### Config Details + +| Field | Required | Type | Default | Description | +|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------| +| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. | +| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.| + +
+ +let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer. + +The config, which we’d append to our ingestion recipe YAML, would look like this: + +Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS + +When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN. + +For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration + +```yaml +transformers: + - type: "domain_mapping_based_on_tags" + config: + domain_mapping: + 'NeedsDocumentation': "urn:li:domain:documentation" +``` + + +`domain_mapping_based_on_tags` can be configured in below different way + +- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: OVERWRITE # OVERWRITE is default behaviour + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` +- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS +```yaml + transformers: + - type: "domain_mapping_based_on_tags" + config: + semantics: PATCH + domain_mapping: + 'example1': "urn:li:domain:engineering" + 'example2': "urn:li:domain:hr" + ``` + ## Simple Add Dataset dataProduct ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 668845e7764c36..9d35b9b8cadf54 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -708,6 +708,7 @@ "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", + "domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py new file mode 100644 index 00000000000000..7be8069e1b0852 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain_based_on_tags.py @@ -0,0 +1,70 @@ +from typing import Dict, List, Optional, Set, cast + +from datahub.configuration.common import ( + TransformerSemantics, + TransformerSemanticsConfigModel, +) +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain +from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer +from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass + + +class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel): + domain_mapping: Dict[str, str] + + +class DatasetTagDomainMapper(DatasetDomainTransformer): + """A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer.""" + + def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext): + super().__init__() + self.ctx: PipelineContext = ctx + self.config: DatasetTagDomainMapperConfig = config + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "DatasetTagDomainMapper": + config = DatasetTagDomainMapperConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + # Initialize the existing domain aspect + existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect) + assert self.ctx.graph + global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn) + # Check if we have tags received in existing aspect + if global_tags: + domain_mapping = self.config.domain_mapping + transformer_tags = domain_mapping.keys() + tags_seen: Set[str] = set() + for tag_item in global_tags.tags: + tag = tag_item.tag.split("urn:li:tag:")[-1] + if tag in transformer_tags: + tags_seen.add(tag) + + if tags_seen: + domain_aspect = DomainsClass(domains=[]) + domains_to_add: List[str] = [] + for tag in tags_seen: + if domain_mapping.get(tag): + domains_to_add.append(domain_mapping[tag]) + + mapped_domains = AddDatasetDomain.get_domain_class( + self.ctx.graph, domains_to_add + ) + domain_aspect.domains.extend(mapped_domains.domains) + if self.config.semantics == TransformerSemantics.PATCH: + # Try merging with server-side domains + patch_domain_aspect: Optional[ + DomainsClass + ] = AddDatasetDomain._merge_with_server_domains( + self.ctx.graph, entity_urn, domain_aspect + ) + return cast(Optional[Aspect], patch_domain_aspect) + return cast(Optional[Aspect], domain_aspect) + return cast(Optional[Aspect], existing_domain_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 7e01dd89095685..a0deae972badb4 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -67,6 +67,9 @@ PatternAddDatasetDomain, SimpleAddDatasetDomain, ) +from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( + DatasetTagDomainMapper, +) from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags from datahub.ingestion.transformer.extract_ownership_from_tags import ( @@ -3458,3 +3461,193 @@ def test_pattern_cleanup_usage_statistics_user_3( assert output[0].record.aspect assert len(output[0].record.aspect.userCounts) == 2 assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts + + +def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + + tag_one = builder.make_tag_urn("test:tag_1") + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)]) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[server_domain]), + config={"domain_mapping": {"test:tag_1": acryl_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + non_matching_tag = builder.make_tag_urn("nonMatching") + + pipeline_context = PipelineContext(run_id="no_match_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[server_domain]), + config={ + "domain_mapping": {"test:tag_1": acryl_domain}, + }, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain not in transformed_aspect.domains + assert server_domain in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph): + some_tag = builder.make_tag_urn("someTag") + + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[]), + config={"domain_mapping": {}}, + pipeline_context=pipeline_context, + ) + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 0 + + +def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph): + # Two tags that match different rules in the domain mapping configuration + tag_one = builder.make_tag_urn("test:tag_1") + tag_two = builder.make_tag_urn("test:tag_2") + existing_domain = builder.make_domain_urn("existing.io") + finance = builder.make_domain_urn("finance") + hr = builder.make_domain_urn("hr") + + pipeline_context = PipelineContext(run_id="multiple_matches_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)] + ) + + # Return fake aspect to simulate server behaviour + def fake_get_domain(entity_urn: str) -> models.DomainsClass: + return models.DomainsClass(domains=[existing_domain]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + pipeline_context.graph.get_domain = fake_get_domain # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[existing_domain]), + config={ + "domain_mapping": {"test:tag_1": finance, "test:tag_2": hr}, + "semantics": "PATCH", + }, + pipeline_context=pipeline_context, + ) + + # Assertions to verify the expected outcome + assert len(output) == 2 + assert output[0].record is not None + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + + # Expecting domains from both matched tags + assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr} + assert len(transformed_aspect.domains) == 3 + + +def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[]) + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={"domain_mapping": {"test:tag_1": server_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains + + +def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + server_domain = builder.make_domain_urn("test.io") + pipeline_context = PipelineContext(run_id="empty_config_pipeline") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + + # Return fake aspect to simulate server behaviour + def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: + return None + + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=DatasetTagDomainMapper, + aspect=models.DomainsClass(domains=[acryl_domain]), + config={"domain_mapping": {"test:tag_1": server_domain}}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert isinstance(output[0].record.aspect, models.DomainsClass) + assert len(output[0].record.aspect.domains) == 1 + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 1 + assert acryl_domain in transformed_aspect.domains + assert server_domain not in transformed_aspect.domains