Skip to content

Commit

Permalink
fix(ingest/transformer): replace externalUrl in dataset properties (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Apr 15, 2024
1 parent 771ab0d commit f860f79
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 3 deletions.
18 changes: 18 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,24 @@ Then define your class to return a list of custom properties, for example:
add_properties_resolver_class: "<your_module>.<your_class>"
```

## Replace ExternalUrl
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `input_pattern` | ✅ | string | | String or pattern to replace |
| `replacement` | ✅ | string | | Replacement string |


Matches the full/partial string in the externalUrl of the dataset properties and replace that with the replacement string

```yaml
transformers:
- type: "replace_external_url"
config:
input_pattern: '\b\w*hub\b'
replacement: "sub"
```

## Simple Add Dataset domains
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl"
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import copy
import re
from typing import Any, Dict, Optional, cast

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetPropertiesTransformer,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass


class ReplaceExternalUrlConfig(ConfigModel):
input_pattern: str
replacement: str


class ReplaceExternalUrl(DatasetPropertiesTransformer):
"""Transformer that clean the ownership URN."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig

def __init__(
self,
config: ReplaceExternalUrlConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
in_dataset_properties_aspect: DatasetPropertiesClass = cast(
DatasetPropertiesClass, aspect
)

if (
not hasattr(in_dataset_properties_aspect, "externalUrl")
or not in_dataset_properties_aspect.externalUrl
):
return cast(Aspect, in_dataset_properties_aspect)
else:
out_dataset_properties_aspect: DatasetPropertiesClass = copy.deepcopy(
in_dataset_properties_aspect
)

pattern = re.compile(self.config.input_pattern)
replacement = self.config.replacement

out_dataset_properties_aspect.externalUrl = re.sub(
pattern, replacement, in_dataset_properties_aspect.externalUrl
)

return cast(Aspect, out_dataset_properties_aspect)
88 changes: 85 additions & 3 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@
ExtractOwnersFromTagsTransformer,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
Expand All @@ -87,9 +91,6 @@
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn
from src.datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)


def make_generic_dataset(
Expand Down Expand Up @@ -3209,3 +3210,84 @@ def test_clean_owner_urn_transformation_should_not_remove_system_identifier(
config: List[Union[re.Pattern, str]] = ["urn:li:corpuser:"]

_test_clean_owner_urns(pipeline_context, in_owner_urns, config, in_owner_urns)


def test_replace_external_url_word_replace(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)


def test_replace_external_regex_replace_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)


def test_replace_external_regex_replace_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)

0 comments on commit f860f79

Please sign in to comment.