diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 38e2ca47e744e0..cc6fadeb982adc 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1,6 +1,8 @@ +import json +import logging import urllib.parse from json.decoder import JSONDecodeError -from typing import Dict, Optional, Type, TypeVar +from typing import Any, Dict, List, Optional, Type, TypeVar from avrogen.dict_wrapper import DictWrapper from requests.models import HTTPError @@ -8,10 +10,11 @@ from datahub.configuration.common import ConfigModel, OperationalError from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import OwnershipClass +from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, OwnershipClass # This bound isn't tight, but it's better than nothing. Aspect = TypeVar("Aspect", bound=DictWrapper) +logger = logging.getLogger(__name__) class DatahubClientConfig(ConfigModel): @@ -85,3 +88,63 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]: aspect_type_name="com.linkedin.common.Ownership", aspect_type=OwnershipClass, ) + + def get_usage_aspects_from_urn( + self, entity_urn: str, start_timestamp: int, end_timestamp: int + ) -> Optional[List[DatasetUsageStatisticsClass]]: + payload = { + "urn": entity_urn, + "entity": "dataset", + "aspect": "datasetUsageStatistics", + "startTimeMillis": start_timestamp, + "endTimeMillis": end_timestamp, + } + headers: Dict[str, Any] = {} + url = f"{self._gms_server}/aspects?action=getTimeseriesAspectValues" + try: + usage_aspects: List[DatasetUsageStatisticsClass] = [] + response = self.g_session.post( + url, data=json.dumps(payload), headers=headers + ) + if response.status_code != 200: + logger.debug( + f"Non 200 status found while fetching usage aspects - {response.status_code}" + ) + return None + json_resp = response.json() + all_aspects = json_resp.get("value", {}).get("values", []) + for aspect in all_aspects: + if aspect.get("aspect") and aspect.get("aspect").get("value"): + usage_aspects.append( + DatasetUsageStatisticsClass.from_obj( + json.loads(aspect.get("aspect").get("value")), tuples=True + ) + ) + return usage_aspects + except Exception as e: + logger.error("Error while getting usage aspects.", e) + return None + + def list_all_entity_urns( + self, entity_type: str, start: int, count: int + ) -> Optional[List[str]]: + url = f"{self._gms_server}/entities?action=listUrns" + payload = {"entity": entity_type, "start": start, "count": count} + headers = { + "X-RestLi-Protocol-Version": "2.0.0", + "Content-Type": "application/json", + } + try: + response = self.g_session.post( + url, data=json.dumps(payload), headers=headers + ) + if response.status_code != 200: + logger.debug( + f"Non 200 status found while fetching entity urns - {response.status_code}" + ) + return None + json_resp = response.json() + return json_resp.get("value", {}).get("entities") + except Exception as e: + logger.error("Error while fetching entity urns.", e) + return None