diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index d47e10c9eb5c6..008216fea8950 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -109,6 +109,7 @@ make_filter, make_fine_grained_lineage_class, make_upstream_class, + optimize_query_filter, published_datasource_graphql_query, query_metadata_cursor_based_pagination, sheet_graphql_query, @@ -1363,6 +1364,8 @@ def get_connection_objects( query_filter: dict = {}, page_size_override: Optional[int] = None, ) -> Iterable[dict]: + query_filter = optimize_query_filter(query_filter) + # Calls the get_connection_object_page function to get the objects, # and automatically handles pagination. page_size = page_size_override or self.config.page_size diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py index 61b56c4bee5bd..8f9d81eb9a18c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau_common.py @@ -1,3 +1,4 @@ +import copy import html import json import logging @@ -35,6 +36,7 @@ UpstreamClass, ) from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult +from datahub.utilities.ordered_set import OrderedSet logger = logging.getLogger(__name__) @@ -1000,3 +1002,19 @@ def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]: ] return filter_pages + + +def optimize_query_filter(query_filter: dict) -> dict: + """ + Duplicates in the filter cause duplicates in the result, + leading to entities/aspects being emitted multiple times unnecessarily + """ + optimized_query = copy.deepcopy(query_filter) + + if query_filter.get(c.ID_WITH_IN): + optimized_query[c.ID_WITH_IN] = list(OrderedSet(query_filter[c.ID_WITH_IN])) + if query_filter.get(c.PROJECT_NAME_WITH_IN): + optimized_query[c.PROJECT_NAME_WITH_IN] = list( + OrderedSet(query_filter[c.PROJECT_NAME_WITH_IN]) + ) + return optimized_query diff --git a/metadata-ingestion/tests/unit/test_tableau_source.py b/metadata-ingestion/tests/unit/test_tableau_source.py index 44e59decaecbd..227519fdb464a 100644 --- a/metadata-ingestion/tests/unit/test_tableau_source.py +++ b/metadata-ingestion/tests/unit/test_tableau_source.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, List import pytest @@ -7,6 +7,7 @@ from datahub.ingestion.source.tableau.tableau_common import ( get_filter_pages, make_filter, + optimize_query_filter, tableau_field_to_schema_field, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField @@ -203,3 +204,46 @@ def test_get_filter_pages_id_filter_splits_into_multiple_filters(): {c.ID_WITH_IN: filter_dict[c.ID_WITH_IN][i : i + page_size]} for i in range(0, num_ids, page_size) ] + + +def test_optimize_query_filter_removes_duplicates(): + query_filter = { + c.ID_WITH_IN: ["id1", "id2", "id1"], + c.PROJECT_NAME_WITH_IN: ["project1", "project2", "project1"], + } + result = optimize_query_filter(query_filter) + assert len(result) == 2 + assert result[c.ID_WITH_IN] == ["id1", "id2"] + assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"] + + +def test_optimize_query_filter_handles_empty_lists(): + query_filter: Dict[str, List[str]] = {c.ID_WITH_IN: [], c.PROJECT_NAME_WITH_IN: []} + result = optimize_query_filter(query_filter) + assert len(result) == 2 + assert result[c.ID_WITH_IN] == [] + assert result[c.PROJECT_NAME_WITH_IN] == [] + + +def test_optimize_query_filter_handles_missing_keys(): + query_filter: Dict[str, List[str]] = {} + result = optimize_query_filter(query_filter) + assert result == {} + + +def test_optimize_query_filter_handles_other_keys(): + query_filter = {"any_other_key": ["id1", "id2", "id1"]} + result = optimize_query_filter(query_filter) + assert len(result) == 1 + assert result["any_other_key"] == ["id1", "id2", "id1"] + + +def test_optimize_query_filter_handles_no_duplicates(): + query_filter = { + c.ID_WITH_IN: ["id1", "id2"], + c.PROJECT_NAME_WITH_IN: ["project1", "project2"], + } + result = optimize_query_filter(query_filter) + assert len(result) == 2 + assert result[c.ID_WITH_IN] == ["id1", "id2"] + assert result[c.PROJECT_NAME_WITH_IN] == ["project1", "project2"]