diff --git a/docker/datahub-gms/Dockerfile b/docker/datahub-gms/Dockerfile index 62cb56bcf0a10e..b1ac5fcbb41f97 100644 --- a/docker/datahub-gms/Dockerfile +++ b/docker/datahub-gms/Dockerfile @@ -14,7 +14,7 @@ RUN apk --no-cache --update-cache --available upgrade \ else \ echo >&2 "Unsupported architecture $(arch)" ; exit 1; \ fi \ - && apk --no-cache add tar curl openjdk8-jre bash \ + && apk --no-cache add tar curl openjdk8-jre bash coreutils \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.20.v20190813/jetty-runner-9.4.20.v20190813.jar --output jetty-runner.jar \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-jmx/9.4.20.v20190813/jetty-jmx-9.4.20.v20190813.jar --output jetty-jmx.jar \ && curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-util/9.4.20.v20190813/jetty-util-9.4.20.v20190813.jar --output jetty-util.jar \ diff --git a/docker/datahub-mae-consumer/Dockerfile b/docker/datahub-mae-consumer/Dockerfile index 5240100de3bca1..e0488f0d0d33a4 100644 --- a/docker/datahub-mae-consumer/Dockerfile +++ b/docker/datahub-mae-consumer/Dockerfile @@ -3,7 +3,7 @@ ARG APP_ENV=prod FROM adoptopenjdk/openjdk8:alpine-jre as base ENV DOCKERIZE_VERSION v0.6.1 -RUN apk --no-cache add curl tar wget bash \ +RUN apk --no-cache add curl tar wget bash coreutils \ && wget https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.4.1/opentelemetry-javaagent-all.jar \ && wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar -O jmx_prometheus_javaagent.jar \ && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv diff --git a/docker/elasticsearch-setup/Dockerfile b/docker/elasticsearch-setup/Dockerfile index d9a662321b3b93..2ef2441816eade 100644 --- a/docker/elasticsearch-setup/Dockerfile +++ b/docker/elasticsearch-setup/Dockerfile @@ -5,7 +5,7 @@ ARG APP_ENV=prod FROM alpine:3 as base ENV DOCKERIZE_VERSION v0.6.1 -RUN apk add --no-cache curl jq tar bash \ +RUN apk add --no-cache curl jq tar bash coreutils \ && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv FROM base AS prod-install diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 38e2ca47e744e0..cc6fadeb982adc 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1,6 +1,8 @@ +import json +import logging import urllib.parse from json.decoder import JSONDecodeError -from typing import Dict, Optional, Type, TypeVar +from typing import Any, Dict, List, Optional, Type, TypeVar from avrogen.dict_wrapper import DictWrapper from requests.models import HTTPError @@ -8,10 +10,11 @@ from datahub.configuration.common import ConfigModel, OperationalError from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import OwnershipClass +from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, OwnershipClass # This bound isn't tight, but it's better than nothing. Aspect = TypeVar("Aspect", bound=DictWrapper) +logger = logging.getLogger(__name__) class DatahubClientConfig(ConfigModel): @@ -85,3 +88,63 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]: aspect_type_name="com.linkedin.common.Ownership", aspect_type=OwnershipClass, ) + + def get_usage_aspects_from_urn( + self, entity_urn: str, start_timestamp: int, end_timestamp: int + ) -> Optional[List[DatasetUsageStatisticsClass]]: + payload = { + "urn": entity_urn, + "entity": "dataset", + "aspect": "datasetUsageStatistics", + "startTimeMillis": start_timestamp, + "endTimeMillis": end_timestamp, + } + headers: Dict[str, Any] = {} + url = f"{self._gms_server}/aspects?action=getTimeseriesAspectValues" + try: + usage_aspects: List[DatasetUsageStatisticsClass] = [] + response = self.g_session.post( + url, data=json.dumps(payload), headers=headers + ) + if response.status_code != 200: + logger.debug( + f"Non 200 status found while fetching usage aspects - {response.status_code}" + ) + return None + json_resp = response.json() + all_aspects = json_resp.get("value", {}).get("values", []) + for aspect in all_aspects: + if aspect.get("aspect") and aspect.get("aspect").get("value"): + usage_aspects.append( + DatasetUsageStatisticsClass.from_obj( + json.loads(aspect.get("aspect").get("value")), tuples=True + ) + ) + return usage_aspects + except Exception as e: + logger.error("Error while getting usage aspects.", e) + return None + + def list_all_entity_urns( + self, entity_type: str, start: int, count: int + ) -> Optional[List[str]]: + url = f"{self._gms_server}/entities?action=listUrns" + payload = {"entity": entity_type, "start": start, "count": count} + headers = { + "X-RestLi-Protocol-Version": "2.0.0", + "Content-Type": "application/json", + } + try: + response = self.g_session.post( + url, data=json.dumps(payload), headers=headers + ) + if response.status_code != 200: + logger.debug( + f"Non 200 status found while fetching entity urns - {response.status_code}" + ) + return None + json_resp = response.json() + return json_resp.get("value", {}).get("entities") + except Exception as e: + logger.error("Error while fetching entity urns.", e) + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 9086f07c7e5d9a..8219a60d95c8f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -66,7 +66,10 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): } ) self.schema_registry_client = SchemaRegistryClient( - {"url": self.source_config.connection.schema_registry_url} + { + "url": self.source_config.connection.schema_registry_url, + **self.source_config.connection.schema_registry_config, + } ) self.report = KafkaSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index fa607d858322ef..bc252edcb9dc26 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -188,7 +188,9 @@ def construct_dashboard_from_api_data(self, dashboard_data): chart_urns = [] raw_position_data = dashboard_data.get("position_json", "{}") - position_data = json.loads(raw_position_data) + position_data = ( + json.loads(raw_position_data) if raw_position_data is not None else {} + ) for key, value in position_data.items(): if not key.startswith("CHART-"): continue