Skip to content

Commit

Permalink
feat(ingest/graph): Add get_results_by_filter to DataHubGraph (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Jul 25, 2024
1 parent 66f8930 commit fde71d4
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 1 deletion.
150 changes: 149 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ def test_connection(self) -> None:
self.server_id = _MISSING_SERVER_ID
logger.debug(f"Failed to get server id due to {e}")

@property
def frontend_base_url(self) -> str:
"""Get the public-facing base url of the frontend
This url can be used to construct links to the frontend. The url will not include a trailing slash.
Note: Only supported with Acryl Cloud.
"""

if not self.server_config:
self.test_connection()

base_url = self.server_config.get("baseUrl")
if not base_url:
raise ValueError("baseUrl not found in server config")
return base_url

@classmethod
def from_emitter(cls, emitter: DatahubRestEmitter) -> "DataHubGraph":
return cls(
Expand Down Expand Up @@ -812,6 +828,7 @@ def get_urns_by_filter(
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extraFilters: Optional[List[SearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None,
) -> Iterable[str]:
"""Fetch all urns that match all of the given filters.
Expand Down Expand Up @@ -841,7 +858,13 @@ def get_urns_by_filter(

# Env filter.
orFilters = generate_filter(
platform, platform_instance, env, container, status, extraFilters
platform,
platform_instance,
env,
container,
status,
extraFilters,
extra_or_filters=extra_or_filters,
)

graphql_query = textwrap.dedent(
Expand Down Expand Up @@ -885,6 +908,131 @@ def get_urns_by_filter(
for entity in self._scroll_across_entities(graphql_query, variables):
yield entity["urn"]

def get_results_by_filter(
self,
*,
entity_types: Optional[List[str]] = None,
platform: Optional[str] = None,
platform_instance: Optional[str] = None,
env: Optional[str] = None,
query: Optional[str] = None,
container: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extra_and_filters: Optional[List[SearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None,
extra_source_fields: Optional[List[str]] = None,
skip_cache: bool = False,
) -> Iterable[dict]:
"""Fetch all results that match all of the given filters.
Filters are combined conjunctively. If multiple filters are specified, the results will match all of them.
Note that specifying a platform filter will automatically exclude all entity types that do not have a platform.
The same goes for the env filter.
:param entity_types: List of entity types to include. If None, all entity types will be returned.
:param platform: Platform to filter on. If None, all platforms will be returned.
:param platform_instance: Platform instance to filter on. If None, all platform instances will be returned.
:param env: Environment (e.g. PROD, DEV) to filter on. If None, all environments will be returned.
:param query: Query string to filter on. If None, all entities will be returned.
:param container: A container urn that entities must be within.
This works recursively, so it will include entities within sub-containers as well.
If None, all entities will be returned.
Note that this requires browsePathV2 aspects (added in 0.10.4+).
:param status: Filter on the deletion status of the entity. The default is only return non-soft-deleted entities.
:param extra_and_filters: Additional filters to apply. If specified, the
results will match all of the filters.
:param extra_or_filters: Additional filters to apply. If specified, the
results will match any of the filters.
:return: An iterable of urns that match the filters.
"""

types = self._get_types(entity_types)

# Add the query default of * if no query is specified.
query = query or "*"

or_filters_final = generate_filter(
platform,
platform_instance,
env,
container,
status,
extra_and_filters,
extra_or_filters,
)
graphql_query = textwrap.dedent(
"""
query scrollUrnsWithFilters(
$types: [EntityType!],
$query: String!,
$orFilters: [AndFilterInput!],
$batchSize: Int!,
$scrollId: String,
$skipCache: Boolean!,
$fetchExtraFields: [String!]) {
scrollAcrossEntities(input: {
query: $query,
count: $batchSize,
scrollId: $scrollId,
types: $types,
orFilters: $orFilters,
searchFlags: {
skipHighlighting: true
skipAggregates: true
skipCache: $skipCache
fetchExtraFields: $fetchExtraFields
}
}) {
nextScrollId
searchResults {
entity {
urn
}
}
}
}
"""
)

variables = {
"types": types,
"query": query,
"orFilters": or_filters_final,
"batchSize": batch_size,
"skipCache": "true" if skip_cache else "false",
"fetchExtraFields": extra_source_fields,
}

for result in self._scroll_across_entities_results(graphql_query, variables):
yield result

def _scroll_across_entities_results(
self, graphql_query: str, variables_orig: dict
) -> Iterable[dict]:
variables = variables_orig.copy()
first_iter = True
scroll_id: Optional[str] = None
while first_iter or scroll_id:
first_iter = False
variables["scrollId"] = scroll_id

response = self.execute_graphql(
graphql_query,
variables=variables,
)
data = response["scrollAcrossEntities"]
scroll_id = data["nextScrollId"]
for entry in data["searchResults"]:
yield entry

if scroll_id:
logger.debug(
f"Scrolling to next scrollAcrossEntities page: {scroll_id}"
)

def _scroll_across_entities(
self, graphql_query: str, variables_orig: dict
) -> Iterable[dict]:
Expand Down
20 changes: 20 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ def generate_filter(
container: Optional[str],
status: RemovedStatusFilter,
extra_filters: Optional[List[SearchFilterRule]],
extra_or_filters: Optional[List[SearchFilterRule]] = None,
) -> List[Dict[str, List[SearchFilterRule]]]:
"""
Generate a search filter based on the provided parameters.
:param platform: The platform to filter by.
:param platform_instance: The platform instance to filter by.
:param env: The environment to filter by.
:param container: The container to filter by.
:param status: The status to filter by.
:param extra_filters: Extra AND filters to apply.
:param extra_or_filters: Extra OR filters to apply. These are combined with
the AND filters using an OR at the top level.
"""
and_filters: List[SearchFilterRule] = []

# Platform filter.
Expand Down Expand Up @@ -66,6 +78,14 @@ def generate_filter(
for and_filter in or_filters
]

# Extra OR filters are distributed across the top level and lists.
if extra_or_filters:
or_filters = [
{"and": and_filter["and"] + [extra_or_filter]}
for extra_or_filter in extra_or_filters
for and_filter in or_filters
]

return or_filters


Expand Down

0 comments on commit fde71d4

Please sign in to comment.