Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 6, 2025
2 parents 6fcce4d + b86bbf7 commit ad83535
Show file tree
Hide file tree
Showing 18 changed files with 284 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ source:
# This option is recommended to be used to ingest all lineage
ignore_start_time_lineage: true

# This flag tells the snowflake ingestion to use the more advanced query parsing. This will become the default eventually.
use_queries_v2: true

# Coordinates
account_id: "abc48144"
warehouse: "COMPUTE_WH"
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector>=2.1.0",
"redshift-connector>=2.1.5",
*path_spec_common,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ def truncate_indices(self) -> None:
self._truncate_timeseries_helper(
aspect_name="dashboardUsageStatistics", entity_type="dashboard"
)
self._truncate_timeseries_helper(
aspect_name="queryusagestatistics", entity_type="query"
)

def _truncate_timeseries_helper(self, aspect_name: str, entity_type: str) -> None:
self._truncate_timeseries_with_watch_optional(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def _scroll_execution_requests(
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
title="Too many read errors, aborting",
message="Too many read errors, aborting",
context=str(self.instance_id),
)
break
try:
Expand All @@ -158,8 +160,11 @@ def _scroll_execution_requests(
break
params["scrollId"] = document["scrollId"]
except Exception as e:
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
self.report.failure(
title="Failed to fetch next batch of execution requests",
message="Failed to fetch next batch of execution requests",
context=str(self.instance_id),
exc=e,
)
self.report.ergc_read_errors += 1

Expand Down Expand Up @@ -231,8 +236,11 @@ def _delete_entry(self, entry: CleanupRecord) -> None:
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.ergc_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
self.report.failure(
title="Failed to delete ExecutionRequest",
message="Failed to delete ExecutionRequest",
context=str(self.instance_id),
exc=e,
)

def _reached_runtime_limit(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class SoftDeletedEntitiesReport(SourceReport):
sample_hard_deleted_aspects_by_type: TopKDict[str, LossyList[str]] = field(
default_factory=TopKDict
)
runtime_limit_reached: bool = False
deletion_limit_reached: bool = False


class SoftDeletedEntitiesCleanup:
Expand Down Expand Up @@ -163,6 +165,8 @@ def delete_entity(self, urn: str) -> None:
f"Dry run is on otherwise it would have deleted {urn} with hard deletion"
)
return
if self._deletion_limit_reached() or self._times_up():
return
self._increment_removal_started_count()
self.ctx.graph.delete_entity(urn=urn, hard=True)
self.ctx.graph.delete_references_to_urn(
Expand Down Expand Up @@ -203,11 +207,10 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]:
for future in done:
self._print_report()
if future.exception():
logger.error(
f"Failed to delete entity {futures[future]}: {future.exception()}"
)
self.report.failure(
f"Failed to delete entity {futures[future]}",
title="Failed to delete entity",
message="Failed to delete entity",
context=futures[future],
exc=future.exception(),
)
self.report.num_soft_deleted_entity_processed += 1
Expand Down Expand Up @@ -274,6 +277,26 @@ def _get_urns(self) -> Iterable[str]:
)
yield from self._get_soft_deleted_queries()

def _times_up(self) -> bool:
if (
self.config.runtime_limit_seconds
and time.time() - self.start_time > self.config.runtime_limit_seconds
):
with self._report_lock:
self.report.runtime_limit_reached = True
return True
return False

def _deletion_limit_reached(self) -> bool:
if (
self.config.limit_entities_delete
and self.report.num_hard_deleted > self.config.limit_entities_delete
):
with self._report_lock:
self.report.deletion_limit_reached = True
return True
return False

def cleanup_soft_deleted_entities(self) -> None:
if not self.config.enabled:
return
Expand All @@ -285,24 +308,8 @@ def cleanup_soft_deleted_entities(self) -> None:
self._print_report()
while len(futures) >= self.config.futures_max_at_time:
futures = self._process_futures(futures)
if (
self.config.limit_entities_delete
and self.report.num_hard_deleted > self.config.limit_entities_delete
):
logger.info(
f"Limit of {self.config.limit_entities_delete} entities reached. Stopped adding more."
)
if self._deletion_limit_reached() or self._times_up():
break
if (
self.config.runtime_limit_seconds
and time.time() - self.start_time
> self.config.runtime_limit_seconds
):
logger.info(
f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Not submitting more futures."
)
break

future = executor.submit(self.delete_soft_deleted_entity, urn)
futures[future] = urn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
ColumnRef,
DownstreamColumnRef,
)
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList
from datahub.utilities.perf_timer import PerfTimer

Expand Down Expand Up @@ -475,10 +476,11 @@ def _parse_audit_log_row(

entry = PreparsedQuery(
# Despite having Snowflake's fingerprints available, our own fingerprinting logic does a better
# job at eliminating redundant / repetitive queries. As such, we don't include the fingerprint
# here so that the aggregator auto-generates one.
# query_id=res["query_fingerprint"],
query_id=None,
# job at eliminating redundant / repetitive queries. As such, we include the fast fingerprint
# here
query_id=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
query_text=res["query_text"],
upstreams=upstreams,
downstream=downstream,
Expand Down
68 changes: 50 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import logging
import re
import time
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
from collections import OrderedDict, defaultdict
from dataclasses import dataclass, field as dataclass_field
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from typing import (
Any,
Expand Down Expand Up @@ -196,6 +196,11 @@
504, # Gateway Timeout
]

# From experience, this expiry time typically ranges from 50 minutes
# to 2 hours but might as well be configurable. We will allow upto
# 10 minutes of such expiry time
REGULAR_AUTH_EXPIRY_PERIOD = timedelta(minutes=10)

logger: logging.Logger = logging.getLogger(__name__)

# Replace / with |
Expand Down Expand Up @@ -637,6 +642,7 @@ class SiteIdContentUrl:
site_content_url: str


@dataclass
class TableauSourceReport(StaleEntityRemovalSourceReport):
get_all_datasources_query_failed: bool = False
num_get_datasource_query_failures: int = 0
Expand All @@ -653,7 +659,14 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
num_upstream_table_lineage_failed_parse_sql: int = 0
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
num_hidden_assets_skipped: int = 0
logged_in_user: List[UserInfo] = []
logged_in_user: List[UserInfo] = dataclass_field(default_factory=list)
last_authenticated_at: Optional[datetime] = None

num_expected_tableau_metadata_queries: int = 0
num_actual_tableau_metadata_queries: int = 0
tableau_server_error_stats: Dict[str, int] = dataclass_field(
default_factory=(lambda: defaultdict(int))
)


def report_user_role(report: TableauSourceReport, server: Server) -> None:
Expand Down Expand Up @@ -724,6 +737,7 @@ def _authenticate(self, site_content_url: str) -> None:
try:
logger.info(f"Authenticated to Tableau site: '{site_content_url}'")
self.server = self.config.make_tableau_client(site_content_url)
self.report.last_authenticated_at = datetime.now(timezone.utc)
report_user_role(report=self.report, server=self.server)
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
Expand Down Expand Up @@ -807,10 +821,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
site_source = TableauSiteSource(
config=self.config,
ctx=self.ctx,
site=site
if site
else SiteIdContentUrl(
site_id=self.server.site_id, site_content_url=self.config.site
site=(
site
if site
else SiteIdContentUrl(
site_id=self.server.site_id,
site_content_url=self.config.site,
)
),
report=self.report,
server=self.server,
Expand Down Expand Up @@ -925,6 +942,7 @@ def _re_authenticate(self) -> None:
# Sign-in again may not be enough because Tableau sometimes caches invalid sessions
# so we need to recreate the Tableau Server object
self.server = self.config.make_tableau_client(self.site_content_url)
self.report.last_authenticated_at = datetime.now(timezone.utc)

def _populate_usage_stat_registry(self) -> None:
if self.server is None:
Expand Down Expand Up @@ -1190,6 +1208,7 @@ def get_connection_object_page(
)
try:
assert self.server is not None
self.report.num_actual_tableau_metadata_queries += 1
query_data = query_metadata_cursor_based_pagination(
server=self.server,
main_query=query,
Expand All @@ -1199,25 +1218,36 @@ def get_connection_object_page(
qry_filter=query_filter,
)

except REAUTHENTICATE_ERRORS:
if not retry_on_auth_error:
except REAUTHENTICATE_ERRORS as e:
self.report.tableau_server_error_stats[e.__class__.__name__] += 1
if not retry_on_auth_error or retries_remaining <= 0:
raise

# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown, and we need to re-authenticate and retry.
self._re_authenticate()
# We have been getting some irregular authorization errors like below well before the expected expiry time
# - within few seconds of initial authentication . We'll retry without re-auth for such cases.
# <class 'tableauserverclient.server.endpoint.exceptions.NonXMLResponseError'>:
# b'{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}'
if self.report.last_authenticated_at and (
datetime.now(timezone.utc) - self.report.last_authenticated_at
> REGULAR_AUTH_EXPIRY_PERIOD
):
# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown, and we need to re-authenticate and retry.
self._re_authenticate()

return self.get_connection_object_page(
query=query,
connection_type=connection_type,
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining - 1,
)

except InternalServerError as ise:
self.report.tableau_server_error_stats[InternalServerError.__name__] += 1
# In some cases Tableau Server returns 504 error, which is a timeout error, so it worths to retry.
# Extended with other retryable errors.
if ise.code in RETRIABLE_ERROR_CODES:
Expand All @@ -1230,13 +1260,14 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining - 1,
)
else:
raise ise

except OSError:
self.report.tableau_server_error_stats[OSError.__name__] += 1
# In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04),
# the request logic was changed to use threads.
# https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081
Expand All @@ -1251,7 +1282,7 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining - 1,
)

Expand Down Expand Up @@ -1339,7 +1370,7 @@ def get_connection_object_page(
query_filter=query_filter,
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retry_on_auth_error=True,
retries_remaining=retries_remaining,
)
raise RuntimeError(f"Query {connection_type} error: {errors}")
Expand Down Expand Up @@ -1377,6 +1408,7 @@ def get_connection_objects(
while has_next_page:
filter_: str = make_filter(filter_page)

self.report.num_expected_tableau_metadata_queries += 1
(
connection_objects,
current_cursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ def default_user_urn_builder(email: str) -> str:
return builder.make_user_urn(email.split("@")[0])


def extract_user_email(user: str) -> Optional[str]:
"""Extracts user email from user input
>>> extract_user_email('urn:li:corpuser:[email protected]')
'[email protected]'
>>> extract_user_email('urn:li:corpuser:abc')
>>> extract_user_email('[email protected]')
'[email protected]'
"""
if user.startswith(("urn:li:corpuser:", "urn:li:corpGroup:")):
user = user.split(":")[-1]
return user if "@" in user else None


def make_usage_workunit(
bucket_start_time: datetime,
resource: ResourceType,
Expand Down Expand Up @@ -104,7 +118,7 @@ def make_usage_workunit(
DatasetUserUsageCountsClass(
user=user_urn_builder(user),
count=count,
userEmail=user if "@" in user else None,
userEmail=extract_user_email(user),
)
for user, count in user_freq
],
Expand Down
Loading

0 comments on commit ad83535

Please sign in to comment.