Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 15, 2024
2 parents a7ddd9d + 5fbf781 commit aa84fa7
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 1 deletion.
25 changes: 25 additions & 0 deletions docs/developers.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,28 @@ This means you're running out of space on your disk to build. Please free up som
#### `Build failed` for task `./gradlew :datahub-frontend:dist -x yarnTest -x yarnLint`

This could mean that you need to update your [Yarn](https://yarnpkg.com/getting-started/install) version

#### `:buildSrc:compileJava` task fails with `package com.linkedin.metadata.models.registry.config does not exist` and `cannot find symbol` error for `Entity`

There are currently two symbolic links within the [buildSrc](https://github.com/datahub-project/datahub/tree/master/buildSrc) directory for the [com.linkedin.metadata.aspect.plugins.config](https://github.com/datahub-project/datahub/blob/master/buildSrc/src/main/java/com/linkedin/metadata/aspect/plugins/config) and [com.linkedin.metadata.models.registry.config](https://github.com/datahub-project/datahub/blob/master/buildSrc/src/main/java/com/linkedin/metadata/models/registry/config
) packages, which points to the corresponding packages in the [entity-registry](https://github.com/datahub-project/datahub/tree/master/entity-registry) subproject.

When the repository is checked out using Windows 10/11 - even if WSL is later used for building using the mounted Windows filesystem in `/mnt/` - the symbolic links might have not been created correctly, instead the symbolic links were checked out as plain files. Although it is technically possible to use the mounted Windows filesystem in `/mnt/` for building in WSL, it is **strongly recommended** to checkout the repository within the Linux filesystem (e.g., in `/home/`) and building it from there, because accessing the Windows filesystem from Linux is relatively slow compared to the Linux filesystem and slows down the whole building process.

To be able to create symbolic links in Windows 10/11 the [Developer Mode](https://learn.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development) has to be enabled first. Then the following commands can be used to enable [symbolic links in Git](https://git-scm.com/docs/git-config#Documentation/git-config.txt-coresymlinks) and recreating the symbolic links:

```shell
# enable core.symlinks config
git config --global core.symlinks true

# check the current core.sysmlinks config and scope
git config --show-scope --show-origin core.symlinks

# in case the core.sysmlinks config is still set locally to false, remove the local config
git config --unset core.symlinks

# reset the current branch to recreate the missing symbolic links (alternatively it is also possibly to switch branches away and back)
git reset --hard
```

See also [here](https://stackoverflow.com/questions/5917249/git-symbolic-links-in-windows/59761201#59761201) for more information on how to enable symbolic links on Windows 10/11 and Git.
57 changes: 56 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)<br/> - [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)<br/> - [Add Dataset dataProduct](#add-dataset-dataproduct)

## Extract Ownership from Tags
Expand Down Expand Up @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```



## Domain Mapping Based on Tags
### Config Details

| Field | Required | Type | Default | Description |
|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------|
| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. |
| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.|

<br/>

let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer.

The config, which we’d append to our ingestion recipe YAML, would look like this:

Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS

When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN.

For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration

```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
domain_mapping:
'NeedsDocumentation': "urn:li:domain:documentation"
```


`domain_mapping_based_on_tags` can be configured in below different way

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Simple Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Dict, List, Optional, Set, cast

from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer
from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass


class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel):
domain_mapping: Dict[str, str]


class DatasetTagDomainMapper(DatasetDomainTransformer):
"""A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer."""

def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext):
super().__init__()
self.ctx: PipelineContext = ctx
self.config: DatasetTagDomainMapperConfig = config

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "DatasetTagDomainMapper":
config = DatasetTagDomainMapperConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
# Initialize the existing domain aspect
existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect)
assert self.ctx.graph
global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn)
# Check if we have tags received in existing aspect
if global_tags:
domain_mapping = self.config.domain_mapping
transformer_tags = domain_mapping.keys()
tags_seen: Set[str] = set()
for tag_item in global_tags.tags:
tag = tag_item.tag.split("urn:li:tag:")[-1]
if tag in transformer_tags:
tags_seen.add(tag)

if tags_seen:
domain_aspect = DomainsClass(domains=[])
domains_to_add: List[str] = []
for tag in tags_seen:
if domain_mapping.get(tag):
domains_to_add.append(domain_mapping[tag])

mapped_domains = AddDatasetDomain.get_domain_class(
self.ctx.graph, domains_to_add
)
domain_aspect.domains.extend(mapped_domains.domains)
if self.config.semantics == TransformerSemantics.PATCH:
# Try merging with server-side domains
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)
return cast(Optional[Aspect], domain_aspect)
return cast(Optional[Aspect], existing_domain_aspect)
193 changes: 193 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
PatternAddDatasetDomain,
SimpleAddDatasetDomain,
)
from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
DatasetTagDomainMapper,
)
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags
from datahub.ingestion.transformer.extract_ownership_from_tags import (
Expand Down Expand Up @@ -3458,3 +3461,193 @@ def test_pattern_cleanup_usage_statistics_user_3(
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")

tag_one = builder.make_tag_urn("test:tag_1")

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)])

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={"domain_mapping": {"test:tag_1": acryl_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
non_matching_tag = builder.make_tag_urn("nonMatching")

pipeline_context = PipelineContext(run_id="no_match_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={
"domain_mapping": {"test:tag_1": acryl_domain},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain not in transformed_aspect.domains
assert server_domain in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph):
some_tag = builder.make_tag_urn("someTag")

pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[]),
config={"domain_mapping": {}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 0


def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph):
# Two tags that match different rules in the domain mapping configuration
tag_one = builder.make_tag_urn("test:tag_1")
tag_two = builder.make_tag_urn("test:tag_2")
existing_domain = builder.make_domain_urn("existing.io")
finance = builder.make_domain_urn("finance")
hr = builder.make_domain_urn("hr")

pipeline_context = PipelineContext(run_id="multiple_matches_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)]
)

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[existing_domain])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore
pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[existing_domain]),
config={
"domain_mapping": {"test:tag_1": finance, "test:tag_2": hr},
"semantics": "PATCH",
},
pipeline_context=pipeline_context,
)

# Assertions to verify the expected outcome
assert len(output) == 2
assert output[0].record is not None
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)

# Expecting domains from both matched tags
assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr}
assert len(transformed_aspect.domains) == 3


def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
return None

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains

0 comments on commit aa84fa7

Please sign in to comment.