From fde71d450024f10b129be99bcadc4458ec771de3 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 24 Jul 2024 17:49:41 -0700 Subject: [PATCH] feat(ingest/graph): Add get_results_by_filter to DataHubGraph (#10987) --- .../src/datahub/ingestion/graph/client.py | 150 +++++++++++++++++- .../src/datahub/ingestion/graph/filters.py | 20 +++ 2 files changed, 169 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 1d6097da231f8f..5ce2fe9941d988 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -171,6 +171,22 @@ def test_connection(self) -> None: self.server_id = _MISSING_SERVER_ID logger.debug(f"Failed to get server id due to {e}") + @property + def frontend_base_url(self) -> str: + """Get the public-facing base url of the frontend + + This url can be used to construct links to the frontend. The url will not include a trailing slash. + Note: Only supported with Acryl Cloud. + """ + + if not self.server_config: + self.test_connection() + + base_url = self.server_config.get("baseUrl") + if not base_url: + raise ValueError("baseUrl not found in server config") + return base_url + @classmethod def from_emitter(cls, emitter: DatahubRestEmitter) -> "DataHubGraph": return cls( @@ -812,6 +828,7 @@ def get_urns_by_filter( status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, batch_size: int = 10000, extraFilters: Optional[List[SearchFilterRule]] = None, + extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None, ) -> Iterable[str]: """Fetch all urns that match all of the given filters. @@ -841,7 +858,13 @@ def get_urns_by_filter( # Env filter. orFilters = generate_filter( - platform, platform_instance, env, container, status, extraFilters + platform, + platform_instance, + env, + container, + status, + extraFilters, + extra_or_filters=extra_or_filters, ) graphql_query = textwrap.dedent( @@ -885,6 +908,131 @@ def get_urns_by_filter( for entity in self._scroll_across_entities(graphql_query, variables): yield entity["urn"] + def get_results_by_filter( + self, + *, + entity_types: Optional[List[str]] = None, + platform: Optional[str] = None, + platform_instance: Optional[str] = None, + env: Optional[str] = None, + query: Optional[str] = None, + container: Optional[str] = None, + status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, + batch_size: int = 10000, + extra_and_filters: Optional[List[SearchFilterRule]] = None, + extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None, + extra_source_fields: Optional[List[str]] = None, + skip_cache: bool = False, + ) -> Iterable[dict]: + """Fetch all results that match all of the given filters. + + Filters are combined conjunctively. If multiple filters are specified, the results will match all of them. + Note that specifying a platform filter will automatically exclude all entity types that do not have a platform. + The same goes for the env filter. + + :param entity_types: List of entity types to include. If None, all entity types will be returned. + :param platform: Platform to filter on. If None, all platforms will be returned. + :param platform_instance: Platform instance to filter on. If None, all platform instances will be returned. + :param env: Environment (e.g. PROD, DEV) to filter on. If None, all environments will be returned. + :param query: Query string to filter on. If None, all entities will be returned. + :param container: A container urn that entities must be within. + This works recursively, so it will include entities within sub-containers as well. + If None, all entities will be returned. + Note that this requires browsePathV2 aspects (added in 0.10.4+). + :param status: Filter on the deletion status of the entity. The default is only return non-soft-deleted entities. + :param extra_and_filters: Additional filters to apply. If specified, the + results will match all of the filters. + :param extra_or_filters: Additional filters to apply. If specified, the + results will match any of the filters. + + :return: An iterable of urns that match the filters. + """ + + types = self._get_types(entity_types) + + # Add the query default of * if no query is specified. + query = query or "*" + + or_filters_final = generate_filter( + platform, + platform_instance, + env, + container, + status, + extra_and_filters, + extra_or_filters, + ) + graphql_query = textwrap.dedent( + """ + query scrollUrnsWithFilters( + $types: [EntityType!], + $query: String!, + $orFilters: [AndFilterInput!], + $batchSize: Int!, + $scrollId: String, + $skipCache: Boolean!, + $fetchExtraFields: [String!]) { + + scrollAcrossEntities(input: { + query: $query, + count: $batchSize, + scrollId: $scrollId, + types: $types, + orFilters: $orFilters, + searchFlags: { + skipHighlighting: true + skipAggregates: true + skipCache: $skipCache + fetchExtraFields: $fetchExtraFields + } + }) { + nextScrollId + searchResults { + entity { + urn + } + } + } + } + """ + ) + + variables = { + "types": types, + "query": query, + "orFilters": or_filters_final, + "batchSize": batch_size, + "skipCache": "true" if skip_cache else "false", + "fetchExtraFields": extra_source_fields, + } + + for result in self._scroll_across_entities_results(graphql_query, variables): + yield result + + def _scroll_across_entities_results( + self, graphql_query: str, variables_orig: dict + ) -> Iterable[dict]: + variables = variables_orig.copy() + first_iter = True + scroll_id: Optional[str] = None + while first_iter or scroll_id: + first_iter = False + variables["scrollId"] = scroll_id + + response = self.execute_graphql( + graphql_query, + variables=variables, + ) + data = response["scrollAcrossEntities"] + scroll_id = data["nextScrollId"] + for entry in data["searchResults"]: + yield entry + + if scroll_id: + logger.debug( + f"Scrolling to next scrollAcrossEntities page: {scroll_id}" + ) + def _scroll_across_entities( self, graphql_query: str, variables_orig: dict ) -> Iterable[dict]: diff --git a/metadata-ingestion/src/datahub/ingestion/graph/filters.py b/metadata-ingestion/src/datahub/ingestion/graph/filters.py index 1a63aea8357296..8974f159171d1e 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/filters.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/filters.py @@ -30,7 +30,19 @@ def generate_filter( container: Optional[str], status: RemovedStatusFilter, extra_filters: Optional[List[SearchFilterRule]], + extra_or_filters: Optional[List[SearchFilterRule]] = None, ) -> List[Dict[str, List[SearchFilterRule]]]: + """ + Generate a search filter based on the provided parameters. + :param platform: The platform to filter by. + :param platform_instance: The platform instance to filter by. + :param env: The environment to filter by. + :param container: The container to filter by. + :param status: The status to filter by. + :param extra_filters: Extra AND filters to apply. + :param extra_or_filters: Extra OR filters to apply. These are combined with + the AND filters using an OR at the top level. + """ and_filters: List[SearchFilterRule] = [] # Platform filter. @@ -66,6 +78,14 @@ def generate_filter( for and_filter in or_filters ] + # Extra OR filters are distributed across the top level and lists. + if extra_or_filters: + or_filters = [ + {"and": and_filter["and"] + [extra_or_filter]} + for extra_or_filter in extra_or_filters + for and_filter in or_filters + ] + return or_filters