Skip to content

Commit

Permalink
feat(ingest): adding utilities methods to DataHubGraph class. (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
varunbharill authored Dec 13, 2021
1 parent d4d6218 commit e55cbd1
Showing 1 changed file with 65 additions and 2 deletions.
67 changes: 65 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import json
import logging
import urllib.parse
from json.decoder import JSONDecodeError
from typing import Dict, Optional, Type, TypeVar
from typing import Any, Dict, List, Optional, Type, TypeVar

from avrogen.dict_wrapper import DictWrapper
from requests.models import HTTPError
from requests.sessions import Session

from datahub.configuration.common import ConfigModel, OperationalError
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import OwnershipClass
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, OwnershipClass

# This bound isn't tight, but it's better than nothing.
Aspect = TypeVar("Aspect", bound=DictWrapper)
logger = logging.getLogger(__name__)


class DatahubClientConfig(ConfigModel):
Expand Down Expand Up @@ -85,3 +88,63 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
aspect_type_name="com.linkedin.common.Ownership",
aspect_type=OwnershipClass,
)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]:
payload = {
"urn": entity_urn,
"entity": "dataset",
"aspect": "datasetUsageStatistics",
"startTimeMillis": start_timestamp,
"endTimeMillis": end_timestamp,
}
headers: Dict[str, Any] = {}
url = f"{self._gms_server}/aspects?action=getTimeseriesAspectValues"
try:
usage_aspects: List[DatasetUsageStatisticsClass] = []
response = self.g_session.post(
url, data=json.dumps(payload), headers=headers
)
if response.status_code != 200:
logger.debug(
f"Non 200 status found while fetching usage aspects - {response.status_code}"
)
return None
json_resp = response.json()
all_aspects = json_resp.get("value", {}).get("values", [])
for aspect in all_aspects:
if aspect.get("aspect") and aspect.get("aspect").get("value"):
usage_aspects.append(
DatasetUsageStatisticsClass.from_obj(
json.loads(aspect.get("aspect").get("value")), tuples=True
)
)
return usage_aspects
except Exception as e:
logger.error("Error while getting usage aspects.", e)
return None

def list_all_entity_urns(
self, entity_type: str, start: int, count: int
) -> Optional[List[str]]:
url = f"{self._gms_server}/entities?action=listUrns"
payload = {"entity": entity_type, "start": start, "count": count}
headers = {
"X-RestLi-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
}
try:
response = self.g_session.post(
url, data=json.dumps(payload), headers=headers
)
if response.status_code != 200:
logger.debug(
f"Non 200 status found while fetching entity urns - {response.status_code}"
)
return None
json_resp = response.json()
return json_resp.get("value", {}).get("entities")
except Exception as e:
logger.error("Error while fetching entity urns.", e)
return None

0 comments on commit e55cbd1

Please sign in to comment.