Skip to content

Commit

Permalink
Merge branch 'linkedin:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe-lyons authored Dec 13, 2021
2 parents 8661e2b + 83207b3 commit facb9c2
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docker/datahub-gms/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN apk --no-cache --update-cache --available upgrade \
else \
echo >&2 "Unsupported architecture $(arch)" ; exit 1; \
fi \
&& apk --no-cache add tar curl openjdk8-jre bash \
&& apk --no-cache add tar curl openjdk8-jre bash coreutils \
&& curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-runner/9.4.20.v20190813/jetty-runner-9.4.20.v20190813.jar --output jetty-runner.jar \
&& curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-jmx/9.4.20.v20190813/jetty-jmx-9.4.20.v20190813.jar --output jetty-jmx.jar \
&& curl https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-util/9.4.20.v20190813/jetty-util-9.4.20.v20190813.jar --output jetty-util.jar \
Expand Down
2 changes: 1 addition & 1 deletion docker/datahub-mae-consumer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG APP_ENV=prod

FROM adoptopenjdk/openjdk8:alpine-jre as base
ENV DOCKERIZE_VERSION v0.6.1
RUN apk --no-cache add curl tar wget bash \
RUN apk --no-cache add curl tar wget bash coreutils \
&& wget https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/download/v1.4.1/opentelemetry-javaagent-all.jar \
&& wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar -O jmx_prometheus_javaagent.jar \
&& curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv
Expand Down
2 changes: 1 addition & 1 deletion docker/elasticsearch-setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ARG APP_ENV=prod

FROM alpine:3 as base
ENV DOCKERIZE_VERSION v0.6.1
RUN apk add --no-cache curl jq tar bash \
RUN apk add --no-cache curl jq tar bash coreutils \
&& curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv

FROM base AS prod-install
Expand Down
67 changes: 65 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import json
import logging
import urllib.parse
from json.decoder import JSONDecodeError
from typing import Dict, Optional, Type, TypeVar
from typing import Any, Dict, List, Optional, Type, TypeVar

from avrogen.dict_wrapper import DictWrapper
from requests.models import HTTPError
from requests.sessions import Session

from datahub.configuration.common import ConfigModel, OperationalError
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import OwnershipClass
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass, OwnershipClass

# This bound isn't tight, but it's better than nothing.
Aspect = TypeVar("Aspect", bound=DictWrapper)
logger = logging.getLogger(__name__)


class DatahubClientConfig(ConfigModel):
Expand Down Expand Up @@ -85,3 +88,63 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
aspect_type_name="com.linkedin.common.Ownership",
aspect_type=OwnershipClass,
)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]:
payload = {
"urn": entity_urn,
"entity": "dataset",
"aspect": "datasetUsageStatistics",
"startTimeMillis": start_timestamp,
"endTimeMillis": end_timestamp,
}
headers: Dict[str, Any] = {}
url = f"{self._gms_server}/aspects?action=getTimeseriesAspectValues"
try:
usage_aspects: List[DatasetUsageStatisticsClass] = []
response = self.g_session.post(
url, data=json.dumps(payload), headers=headers
)
if response.status_code != 200:
logger.debug(
f"Non 200 status found while fetching usage aspects - {response.status_code}"
)
return None
json_resp = response.json()
all_aspects = json_resp.get("value", {}).get("values", [])
for aspect in all_aspects:
if aspect.get("aspect") and aspect.get("aspect").get("value"):
usage_aspects.append(
DatasetUsageStatisticsClass.from_obj(
json.loads(aspect.get("aspect").get("value")), tuples=True
)
)
return usage_aspects
except Exception as e:
logger.error("Error while getting usage aspects.", e)
return None

def list_all_entity_urns(
self, entity_type: str, start: int, count: int
) -> Optional[List[str]]:
url = f"{self._gms_server}/entities?action=listUrns"
payload = {"entity": entity_type, "start": start, "count": count}
headers = {
"X-RestLi-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
}
try:
response = self.g_session.post(
url, data=json.dumps(payload), headers=headers
)
if response.status_code != 200:
logger.debug(
f"Non 200 status found while fetching entity urns - {response.status_code}"
)
return None
json_resp = response.json()
return json_resp.get("value", {}).get("entities")
except Exception as e:
logger.error("Error while fetching entity urns.", e)
return None
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
}
)
self.schema_registry_client = SchemaRegistryClient(
{"url": self.source_config.connection.schema_registry_url}
{
"url": self.source_config.connection.schema_registry_url,
**self.source_config.connection.schema_registry_config,
}
)
self.report = KafkaSourceReport()

Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ def construct_dashboard_from_api_data(self, dashboard_data):

chart_urns = []
raw_position_data = dashboard_data.get("position_json", "{}")
position_data = json.loads(raw_position_data)
position_data = (
json.loads(raw_position_data) if raw_position_data is not None else {}
)
for key, value in position_data.items():
if not key.startswith("CHART-"):
continue
Expand Down

0 comments on commit facb9c2

Please sign in to comment.