Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 27, 2024
2 parents 88f9d98 + 48d711b commit 0ac2090
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
SystemMetadataClass,
TelemetryClientIdClass,
)
from datahub.telemetry.telemetry import telemetry_instance
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.str_enum import StrEnum
from datahub.utilities.urns.urn import Urn, guess_entity_type
Expand Down Expand Up @@ -1819,4 +1820,5 @@ def get_default_graph() -> DataHubGraph:
graph_config = config_utils.load_client_config()
graph = DataHubGraph(graph_config)
graph.test_connection()
telemetry_instance.set_context(server=graph)
return graph
9 changes: 5 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
)
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
from datahub.telemetry import stats
from datahub.telemetry.telemetry import telemetry_instance
from datahub.utilities._custom_package_loader import model_version_name
from datahub.utilities.global_warning_util import (
clear_global_warnings,
Expand Down Expand Up @@ -273,8 +274,9 @@ def __init__(
if self.graph is None and isinstance(self.sink, DatahubRestSink):
with _add_init_error_context("setup default datahub client"):
self.graph = self.sink.emitter.to_graph()
self.graph.test_connection()
self.ctx.graph = self.graph
telemetry.telemetry_instance.update_capture_exception_context(server=self.graph)
telemetry_instance.set_context(server=self.graph)

with set_graph_context(self.graph):
with _add_init_error_context("configure reporters"):
Expand Down Expand Up @@ -615,7 +617,7 @@ def log_ingestion_stats(self) -> None:
sink_warnings = len(self.sink.get_report().warnings)
global_warnings = len(get_global_warnings())

telemetry.telemetry_instance.ping(
telemetry_instance.ping(
"ingest_stats",
{
"source_type": self.source_type,
Expand All @@ -637,7 +639,6 @@ def log_ingestion_stats(self) -> None:
),
"has_pipeline_name": bool(self.config.pipeline_name),
},
self.ctx.graph,
)

def _approx_all_vals(self, d: LossyList[Any]) -> int:
Expand Down
27 changes: 22 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,32 @@ def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
try:
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
self.truncate_indices()
try:
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.dataprocess_cleanup:
yield from self.dataprocess_cleanup.get_workunits_internal()
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
try:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.execution_request_cleanup:
self.execution_request_cleanup.run()
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
yield from []

def truncate_indices(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
try:
self.delete_dpi_from_datajobs(datajob_entity)
except Exception as e:
logger.error(f"While trying to delete {datajob_entity} got {e}")
self.report.failure(
f"While trying to delete {datajob_entity} ", exc=e
)
if (
datajob_entity.total_runs == 0
and self.config.delete_empty_data_jobs
Expand Down
32 changes: 23 additions & 9 deletions metadata-ingestion/src/datahub/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar

from mixpanel import Consumer, Mixpanel
from typing_extensions import ParamSpec
Expand All @@ -16,10 +16,12 @@
from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ExceptionWithProps
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import _custom_package_path
from datahub.utilities.perf_timer import PerfTimer

if TYPE_CHECKING:
from datahub.ingestion.graph.client import DataHubGraph

logger = logging.getLogger(__name__)

DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER)
Expand Down Expand Up @@ -117,7 +119,11 @@ class Telemetry:
tracking_init: bool = False
sentry_enabled: bool = False

context_properties: Dict[str, Any] = {}

def __init__(self):
self.context_properties = {}

if SENTRY_DSN:
self.sentry_enabled = True
try:
Expand Down Expand Up @@ -157,6 +163,9 @@ def __init__(self):
except Exception as e:
logger.debug(f"Error connecting to mixpanel: {e}")

# Initialize the default properties for all events.
self.set_context()

def update_config(self) -> bool:
"""
Update the config file with the current client ID and enabled status.
Expand Down Expand Up @@ -238,18 +247,22 @@ def load_config(self) -> bool:

return False

def update_capture_exception_context(
def set_context(
self,
server: Optional[DataHubGraph] = None,
server: Optional["DataHubGraph"] = None,
properties: Optional[Dict[str, Any]] = None,
) -> None:
self.context_properties = {
**self._server_props(server),
**(properties or {}),
}

if self.sentry_enabled:
from sentry_sdk import set_tag

properties = {
**_default_telemetry_properties(),
**self._server_props(server),
**(properties or {}),
**self.context_properties,
}

for key in properties:
Expand Down Expand Up @@ -297,7 +310,6 @@ def ping(
self,
event_name: str,
properties: Optional[Dict[str, Any]] = None,
server: Optional[DataHubGraph] = None,
) -> None:
"""
Send a single telemetry event.
Expand All @@ -323,14 +335,15 @@ def ping(

properties = {
**_default_telemetry_properties(),
**self._server_props(server),
**self.context_properties,
**properties,
}
self.mp.track(self.client_id, event_name, properties)
except Exception as e:
logger.debug(f"Error reporting telemetry: {e}")

def _server_props(self, server: Optional[DataHubGraph]) -> Dict[str, str]:
@classmethod
def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]:
if not server:
return {
"server_type": "n/a",
Expand Down Expand Up @@ -435,6 +448,7 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
**call_props,
"status": "error",
**_error_props(e),
"code": e.code,
},
)
telemetry_instance.capture_exception(e)
Expand Down

0 comments on commit 0ac2090

Please sign in to comment.