diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 773a7e8554832d..ac6fefc3095741 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -953,7 +953,7 @@ Then define your class to return a list of custom properties, for example: add_properties_resolver_class: "." ``` -## Replace ExternalUrl +## Replace ExternalUrl Dataset ### Config Details | Field | Required | Type | Default | Description | |-----------------------------|----------|---------|---------------|---------------------------------------------| @@ -971,6 +971,24 @@ transformers: replacement: "sub" ``` +## Replace ExternalUrl Container +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|---------|---------------|---------------------------------------------| +| `input_pattern` | ✅ | string | | String or pattern to replace | +| `replacement` | ✅ | string | | Replacement string | + + +Matches the full/partial string in the externalUrl of the container properties and replace that with the replacement string + +```yaml +transformers: + - type: "replace_external_url_container" + config: + input_pattern: '\b\w*hub\b' + replacement: "sub" +``` + ## Clean User URN in DatasetUsageStatistics Aspect ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e973ff629ee841..8a593b23d1f9cc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -750,7 +750,8 @@ "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct", "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", - "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", + "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlDataset", + "replace_external_url_container = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlContainer", "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", "domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper", "tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 3e313ddd356be7..42dd54f4a584a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -37,6 +37,16 @@ def entity_types(self) -> List[str]: return ["dataset", "container"] +class ContainerTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta): + """Transformer that does transform sequentially on each dataset.""" + + def __init__(self): + super().__init__() + + def entity_types(self) -> List[str]: + return ["container"] + + class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "ownership" @@ -143,3 +153,8 @@ def aspect_name(self) -> str: class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "glossaryTerms" + + +class ContainerPropertiesTransformer(ContainerTransformer, metaclass=ABCMeta): + def aspect_name(self) -> str: + return "containerProperties" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py index c222450f87e63a..57af10d1040c8a 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/replace_external_url.py @@ -6,9 +6,13 @@ from datahub.emitter.mce_builder import Aspect from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import ( + ContainerPropertiesTransformer, DatasetPropertiesTransformer, ) -from datahub.metadata.schema_classes import DatasetPropertiesClass +from datahub.metadata.schema_classes import ( + ContainerPropertiesClass, + DatasetPropertiesClass, +) class ReplaceExternalUrlConfig(ConfigModel): @@ -16,8 +20,14 @@ class ReplaceExternalUrlConfig(ConfigModel): replacement: str -class ReplaceExternalUrl(DatasetPropertiesTransformer): - """Transformer that clean the ownership URN.""" +class ReplaceUrl: + def replace_url(self, pattern: str, replacement: str, external_url: str) -> str: + pattern_obj = re.compile(pattern) + return re.sub(pattern_obj, replacement, external_url) + + +class ReplaceExternalUrlDataset(DatasetPropertiesTransformer, ReplaceUrl): + """Transformer that replace the external URL for dataset properties.""" ctx: PipelineContext config: ReplaceExternalUrlConfig @@ -34,7 +44,9 @@ def __init__( self.resolver_args = resolver_args @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl": + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "ReplaceExternalUrlDataset": config = ReplaceExternalUrlConfig.parse_obj(config_dict) return cls(config, ctx) @@ -55,11 +67,60 @@ def transform_aspect( in_dataset_properties_aspect ) - pattern = re.compile(self.config.input_pattern) - replacement = self.config.replacement - - out_dataset_properties_aspect.externalUrl = re.sub( - pattern, replacement, in_dataset_properties_aspect.externalUrl + out_dataset_properties_aspect.externalUrl = self.replace_url( + self.config.input_pattern, + self.config.replacement, + in_dataset_properties_aspect.externalUrl, ) return cast(Aspect, out_dataset_properties_aspect) + + +class ReplaceExternalUrlContainer(ContainerPropertiesTransformer, ReplaceUrl): + """Transformer that replace the external URL for container properties.""" + + ctx: PipelineContext + config: ReplaceExternalUrlConfig + + def __init__( + self, + config: ReplaceExternalUrlConfig, + ctx: PipelineContext, + **resolver_args: Dict[str, Any], + ): + super().__init__() + self.ctx = ctx + self.config = config + self.resolver_args = resolver_args + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "ReplaceExternalUrlContainer": + config = ReplaceExternalUrlConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + + in_container_properties_aspect: ContainerPropertiesClass = cast( + ContainerPropertiesClass, aspect + ) + if ( + not hasattr(in_container_properties_aspect, "externalUrl") + or not in_container_properties_aspect.externalUrl + ): + return cast(Aspect, in_container_properties_aspect) + else: + out_container_properties_aspect: ContainerPropertiesClass = copy.deepcopy( + in_container_properties_aspect + ) + + out_container_properties_aspect.externalUrl = self.replace_url( + self.config.input_pattern, + self.config.replacement, + in_container_properties_aspect.externalUrl, + ) + + return cast(Aspect, out_container_properties_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 4c4e1620f4faae..b8a1222125d103 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -71,6 +71,7 @@ DatasetTagDomainMapper, ) from datahub.ingestion.transformer.dataset_transformer import ( + ContainerTransformer, DatasetTransformer, TagTransformer, ) @@ -88,7 +89,10 @@ from datahub.ingestion.transformer.remove_dataset_ownership import ( SimpleRemoveDatasetOwnership, ) -from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl +from datahub.ingestion.transformer.replace_external_url import ( + ReplaceExternalUrlContainer, + ReplaceExternalUrlDataset, +) from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper from datahub.metadata.schema_classes import ( BrowsePathsClass, @@ -134,6 +138,22 @@ def make_generic_dataset_mcp( ) +def make_generic_container_mcp( + entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee", + aspect_name: str = "status", + aspect: Any = None, +) -> MetadataChangeProposalWrapper: + if aspect is None: + aspect = models.StatusClass(removed=False) + return MetadataChangeProposalWrapper( + entityUrn=entity_urn, + entityType=Urn.create_from_string(entity_urn).get_type(), + aspectName=aspect_name, + changeType="UPSERT", + aspect=aspect, + ) + + def create_and_run_test_pipeline( events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]], transformers: List[Dict[str, Any]], @@ -1929,6 +1949,41 @@ def run_dataset_transformer_pipeline( return outputs +def run_container_transformer_pipeline( + transformer_type: Type[ContainerTransformer], + aspect: Optional[builder.Aspect], + config: dict, + pipeline_context: Optional[PipelineContext] = None, + use_mce: bool = False, +) -> List[RecordEnvelope]: + if pipeline_context is None: + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + transformer: ContainerTransformer = cast( + ContainerTransformer, transformer_type.create(config, pipeline_context) + ) + + container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper] + if use_mce: + container = MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde", + aspects=[], + ) + ) + else: + assert aspect + container = make_generic_container_mcp( + aspect=aspect, aspect_name=transformer.aspect_name() + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]] + ) + ) + return outputs + + def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph): pipeline_context: PipelineContext = PipelineContext( run_id="test_simple_add_dataset_domain" @@ -3235,7 +3290,7 @@ def test_replace_external_url_word_replace( pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) output = run_dataset_transformer_pipeline( - transformer_type=ReplaceExternalUrl, + transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), @@ -3262,7 +3317,7 @@ def test_replace_external_regex_replace_1( pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) output = run_dataset_transformer_pipeline( - transformer_type=ReplaceExternalUrl, + transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), @@ -3289,7 +3344,7 @@ def test_replace_external_regex_replace_2( pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) output = run_dataset_transformer_pipeline( - transformer_type=ReplaceExternalUrl, + transformer_type=ReplaceExternalUrlDataset, aspect=models.DatasetPropertiesClass( externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", customProperties=EXISTING_PROPERTIES.copy(), @@ -3867,3 +3922,87 @@ def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass: assert isinstance(terms_aspect, models.GlossaryTermsClass) assert len(terms_aspect.terms) == 1 assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1" + + +def test_replace_external_url_container_word_replace( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_container_transformer_pipeline( + transformer_type=ReplaceExternalUrlContainer, + aspect=models.ContainerPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + name="sample_test", + ), + config={"input_pattern": "datahub", "replacement": "starhub"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml" + ) + + +def test_replace_external_regex_container_replace_1( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_container_transformer_pipeline( + transformer_type=ReplaceExternalUrlContainer, + aspect=models.ContainerPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + name="sample_test", + ), + config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://github.com/starhub/test/foo.view.lkml" + ) + + +def test_replace_external_regex_container_replace_2( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_replace_external_url_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + output = run_container_transformer_pipeline( + transformer_type=ReplaceExternalUrlContainer, + aspect=models.ContainerPropertiesClass( + externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml", + customProperties=EXISTING_PROPERTIES.copy(), + name="sample_test", + ), + config={"input_pattern": r"\b\w*hub\b", "replacement": "test"}, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert ( + output[0].record.aspect.externalUrl + == "https://test.com/test/looker-demo/blob/master/foo.view.lkml" + )