diff --git a/metadata-ingestion/docs/sources/snowflake/snowflake_recipe.yml b/metadata-ingestion/docs/sources/snowflake/snowflake_recipe.yml index 7e8dbcff88e1c..3226f23c963dd 100644 --- a/metadata-ingestion/docs/sources/snowflake/snowflake_recipe.yml +++ b/metadata-ingestion/docs/sources/snowflake/snowflake_recipe.yml @@ -4,6 +4,9 @@ source: # This option is recommended to be used to ingest all lineage ignore_start_time_lineage: true + # This flag tells the snowflake ingestion to use the more advanced query parsing. This will become the default eventually. + use_queries_v2: true + # Coordinates account_id: "abc48144" warehouse: "COMPUTE_WH" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8357262537bcf..5a48f8b7918dc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -207,7 +207,7 @@ # Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x "sqlalchemy-redshift>=0.8.3", "GeoAlchemy2", - "redshift-connector>=2.1.0", + "redshift-connector>=2.1.5", *path_spec_common, } diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 168b787b85e8b..443368e6d8b4f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -188,6 +188,9 @@ def truncate_indices(self) -> None: self._truncate_timeseries_helper( aspect_name="dashboardUsageStatistics", entity_type="dashboard" ) + self._truncate_timeseries_helper( + aspect_name="queryusagestatistics", entity_type="query" + ) def _truncate_timeseries_helper(self, aspect_name: str, entity_type: str) -> None: self._truncate_timeseries_with_watch_optional( diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index 170a6ada3e336..f9a00d7f00905 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -141,7 +141,9 @@ def _scroll_execution_requests( break if self.report.ergc_read_errors >= self.config.max_read_errors: self.report.failure( - f"ergc({self.instance_id}): too many read errors, aborting." + title="Too many read errors, aborting", + message="Too many read errors, aborting", + context=str(self.instance_id), ) break try: @@ -158,8 +160,11 @@ def _scroll_execution_requests( break params["scrollId"] = document["scrollId"] except Exception as e: - logger.error( - f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" + self.report.failure( + title="Failed to fetch next batch of execution requests", + message="Failed to fetch next batch of execution requests", + context=str(self.instance_id), + exc=e, ) self.report.ergc_read_errors += 1 @@ -231,8 +236,11 @@ def _delete_entry(self, entry: CleanupRecord) -> None: self.graph.delete_entity(entry.urn, True) except Exception as e: self.report.ergc_delete_errors += 1 - logger.error( - f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" + self.report.failure( + title="Failed to delete ExecutionRequest", + message="Failed to delete ExecutionRequest", + context=str(self.instance_id), + exc=e, ) def _reached_runtime_limit(self) -> bool: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 4c0355834f9b4..cf810d05aa2ca 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -105,6 +105,8 @@ class SoftDeletedEntitiesReport(SourceReport): sample_hard_deleted_aspects_by_type: TopKDict[str, LossyList[str]] = field( default_factory=TopKDict ) + runtime_limit_reached: bool = False + deletion_limit_reached: bool = False class SoftDeletedEntitiesCleanup: @@ -163,6 +165,8 @@ def delete_entity(self, urn: str) -> None: f"Dry run is on otherwise it would have deleted {urn} with hard deletion" ) return + if self._deletion_limit_reached() or self._times_up(): + return self._increment_removal_started_count() self.ctx.graph.delete_entity(urn=urn, hard=True) self.ctx.graph.delete_references_to_urn( @@ -203,11 +207,10 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: for future in done: self._print_report() if future.exception(): - logger.error( - f"Failed to delete entity {futures[future]}: {future.exception()}" - ) self.report.failure( - f"Failed to delete entity {futures[future]}", + title="Failed to delete entity", + message="Failed to delete entity", + context=futures[future], exc=future.exception(), ) self.report.num_soft_deleted_entity_processed += 1 @@ -274,6 +277,26 @@ def _get_urns(self) -> Iterable[str]: ) yield from self._get_soft_deleted_queries() + def _times_up(self) -> bool: + if ( + self.config.runtime_limit_seconds + and time.time() - self.start_time > self.config.runtime_limit_seconds + ): + with self._report_lock: + self.report.runtime_limit_reached = True + return True + return False + + def _deletion_limit_reached(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.num_hard_deleted > self.config.limit_entities_delete + ): + with self._report_lock: + self.report.deletion_limit_reached = True + return True + return False + def cleanup_soft_deleted_entities(self) -> None: if not self.config.enabled: return @@ -285,24 +308,8 @@ def cleanup_soft_deleted_entities(self) -> None: self._print_report() while len(futures) >= self.config.futures_max_at_time: futures = self._process_futures(futures) - if ( - self.config.limit_entities_delete - and self.report.num_hard_deleted > self.config.limit_entities_delete - ): - logger.info( - f"Limit of {self.config.limit_entities_delete} entities reached. Stopped adding more." - ) + if self._deletion_limit_reached() or self._times_up(): break - if ( - self.config.runtime_limit_seconds - and time.time() - self.start_time - > self.config.runtime_limit_seconds - ): - logger.info( - f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Not submitting more futures." - ) - break - future = executor.submit(self.delete_soft_deleted_entity, urn) futures[future] = urn diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index 36825dc33fe7d..b82734cbbe84e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -61,6 +61,7 @@ ColumnRef, DownstreamColumnRef, ) +from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList from datahub.utilities.perf_timer import PerfTimer @@ -475,10 +476,11 @@ def _parse_audit_log_row( entry = PreparsedQuery( # Despite having Snowflake's fingerprints available, our own fingerprinting logic does a better - # job at eliminating redundant / repetitive queries. As such, we don't include the fingerprint - # here so that the aggregator auto-generates one. - # query_id=res["query_fingerprint"], - query_id=None, + # job at eliminating redundant / repetitive queries. As such, we include the fast fingerprint + # here + query_id=get_query_fingerprint( + res["query_text"], self.identifiers.platform, fast=True + ), query_text=res["query_text"], upstreams=upstreams, downstream=downstream, diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 008216fea8950..d149402741e82 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -2,9 +2,9 @@ import logging import re import time -from collections import OrderedDict -from dataclasses import dataclass -from datetime import datetime +from collections import OrderedDict, defaultdict +from dataclasses import dataclass, field as dataclass_field +from datetime import datetime, timedelta, timezone from functools import lru_cache from typing import ( Any, @@ -196,6 +196,11 @@ 504, # Gateway Timeout ] +# From experience, this expiry time typically ranges from 50 minutes +# to 2 hours but might as well be configurable. We will allow upto +# 10 minutes of such expiry time +REGULAR_AUTH_EXPIRY_PERIOD = timedelta(minutes=10) + logger: logging.Logger = logging.getLogger(__name__) # Replace / with | @@ -637,6 +642,7 @@ class SiteIdContentUrl: site_content_url: str +@dataclass class TableauSourceReport(StaleEntityRemovalSourceReport): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 @@ -653,7 +659,14 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_upstream_table_lineage_failed_parse_sql: int = 0 num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 - logged_in_user: List[UserInfo] = [] + logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) + last_authenticated_at: Optional[datetime] = None + + num_expected_tableau_metadata_queries: int = 0 + num_actual_tableau_metadata_queries: int = 0 + tableau_server_error_stats: Dict[str, int] = dataclass_field( + default_factory=(lambda: defaultdict(int)) + ) def report_user_role(report: TableauSourceReport, server: Server) -> None: @@ -724,6 +737,7 @@ def _authenticate(self, site_content_url: str) -> None: try: logger.info(f"Authenticated to Tableau site: '{site_content_url}'") self.server = self.config.make_tableau_client(site_content_url) + self.report.last_authenticated_at = datetime.now(timezone.utc) report_user_role(report=self.report, server=self.server) # Note that we're not catching ConfigurationError, since we want that to throw. except ValueError as e: @@ -807,10 +821,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: site_source = TableauSiteSource( config=self.config, ctx=self.ctx, - site=site - if site - else SiteIdContentUrl( - site_id=self.server.site_id, site_content_url=self.config.site + site=( + site + if site + else SiteIdContentUrl( + site_id=self.server.site_id, + site_content_url=self.config.site, + ) ), report=self.report, server=self.server, @@ -925,6 +942,7 @@ def _re_authenticate(self) -> None: # Sign-in again may not be enough because Tableau sometimes caches invalid sessions # so we need to recreate the Tableau Server object self.server = self.config.make_tableau_client(self.site_content_url) + self.report.last_authenticated_at = datetime.now(timezone.utc) def _populate_usage_stat_registry(self) -> None: if self.server is None: @@ -1190,6 +1208,7 @@ def get_connection_object_page( ) try: assert self.server is not None + self.report.num_actual_tableau_metadata_queries += 1 query_data = query_metadata_cursor_based_pagination( server=self.server, main_query=query, @@ -1199,25 +1218,36 @@ def get_connection_object_page( qry_filter=query_filter, ) - except REAUTHENTICATE_ERRORS: - if not retry_on_auth_error: + except REAUTHENTICATE_ERRORS as e: + self.report.tableau_server_error_stats[e.__class__.__name__] += 1 + if not retry_on_auth_error or retries_remaining <= 0: raise - # If ingestion has been running for over 2 hours, the Tableau - # temporary credentials will expire. If this happens, this exception - # will be thrown, and we need to re-authenticate and retry. - self._re_authenticate() + # We have been getting some irregular authorization errors like below well before the expected expiry time + # - within few seconds of initial authentication . We'll retry without re-auth for such cases. + # : + # b'{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}' + if self.report.last_authenticated_at and ( + datetime.now(timezone.utc) - self.report.last_authenticated_at + > REGULAR_AUTH_EXPIRY_PERIOD + ): + # If ingestion has been running for over 2 hours, the Tableau + # temporary credentials will expire. If this happens, this exception + # will be thrown, and we need to re-authenticate and retry. + self._re_authenticate() + return self.get_connection_object_page( query=query, connection_type=connection_type, query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining - 1, ) except InternalServerError as ise: + self.report.tableau_server_error_stats[InternalServerError.__name__] += 1 # In some cases Tableau Server returns 504 error, which is a timeout error, so it worths to retry. # Extended with other retryable errors. if ise.code in RETRIABLE_ERROR_CODES: @@ -1230,13 +1260,14 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining - 1, ) else: raise ise except OSError: + self.report.tableau_server_error_stats[OSError.__name__] += 1 # In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04), # the request logic was changed to use threads. # https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081 @@ -1251,7 +1282,7 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining - 1, ) @@ -1339,7 +1370,7 @@ def get_connection_object_page( query_filter=query_filter, fetch_size=fetch_size, current_cursor=current_cursor, - retry_on_auth_error=False, + retry_on_auth_error=True, retries_remaining=retries_remaining, ) raise RuntimeError(f"Query {connection_type} error: {errors}") @@ -1377,6 +1408,7 @@ def get_connection_objects( while has_next_page: filter_: str = make_filter(filter_page) + self.report.num_expected_tableau_metadata_queries += 1 ( connection_objects, current_cursor, diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py index 2b7aae8330905..95c2345232a1e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py @@ -54,6 +54,20 @@ def default_user_urn_builder(email: str) -> str: return builder.make_user_urn(email.split("@")[0]) +def extract_user_email(user: str) -> Optional[str]: + """Extracts user email from user input + + >>> extract_user_email('urn:li:corpuser:abc@xyz.com') + 'abc@xyz.com' + >>> extract_user_email('urn:li:corpuser:abc') + >>> extract_user_email('abc@xyz.com') + 'abc@xyz.com' + """ + if user.startswith(("urn:li:corpuser:", "urn:li:corpGroup:")): + user = user.split(":")[-1] + return user if "@" in user else None + + def make_usage_workunit( bucket_start_time: datetime, resource: ResourceType, @@ -104,7 +118,7 @@ def make_usage_workunit( DatasetUserUsageCountsClass( user=user_urn_builder(user), count=count, - userEmail=user if "@" in user else None, + userEmail=extract_user_email(user), ) for user, count in user_freq ], diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f81eb291e89e1..a4a49f7788216 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -198,7 +198,7 @@ def id(self) -> str: @dataclasses.dataclass class PreparsedQuery: - # If not provided, we will generate one using the fast fingerprint generator. + # If not provided, we will generate one using the fingerprint generator. query_id: Optional[QueryId] query_text: str @@ -622,7 +622,6 @@ def add_known_query_lineage( query_fingerprint = get_query_fingerprint( known_query_lineage.query_text, platform=self.platform.platform_name, - fast=True, ) formatted_query = self._maybe_format_query(known_query_lineage.query_text) @@ -848,7 +847,6 @@ def add_preparsed_query( query_fingerprint = get_query_fingerprint( parsed.query_text, platform=self.platform.platform_name, - fast=True, ) # Format the query. diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 71e5ad10c2fc5..d7868038a40aa 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -1,6 +1,6 @@ import json import pathlib -from typing import Any, Dict, List, cast +from typing import Any, Dict, List, Union, cast from unittest import mock import pytest @@ -13,10 +13,15 @@ GroupItem, ProjectItem, SiteItem, + UserItem, ViewItem, WorkbookItem, ) from tableauserverclient.models.reference_item import ResourceReference +from tableauserverclient.server.endpoint.exceptions import ( + NonXMLResponseError, + TableauError, +) from datahub.emitter.mce_builder import DEFAULT_ENV, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -270,7 +275,7 @@ def side_effect_site_get_by_id(id, *arg, **kwargs): def mock_sdk_client( - side_effect_query_metadata_response: List[dict], + side_effect_query_metadata_response: List[Union[dict, TableauError]], datasources_side_effect: List[dict], sign_out_side_effect: List[dict], ) -> mock.MagicMock: @@ -1312,6 +1317,61 @@ def test_permission_warning(pytestconfig, tmp_path, mock_datahub_graph): ) +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_retry_on_error(pytestconfig, tmp_path, mock_datahub_graph): + with mock.patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint: + mock_checkpoint.return_value = mock_datahub_graph + + with mock.patch("datahub.ingestion.source.tableau.tableau.Server") as mock_sdk: + mock_client = mock_sdk_client( + side_effect_query_metadata_response=[ + NonXMLResponseError( + """{"timestamp":"xxx","status":401,"error":"Unauthorized","path":"/relationship-service-war/graphql"}""" + ), + *mock_data(), + ], + sign_out_side_effect=[{}], + datasources_side_effect=[{}], + ) + mock_client.users = mock.Mock() + mock_client.users.get_by_id.side_effect = [ + UserItem( + name="name", site_role=UserItem.Roles.SiteAdministratorExplorer + ) + ] + mock_sdk.return_value = mock_client + + reporter = TableauSourceReport() + tableau_source = TableauSiteSource( + platform="tableau", + config=mock.MagicMock(), + ctx=mock.MagicMock(), + site=mock.MagicMock(spec=SiteItem, id="Site1", content_url="site1"), + server=mock_sdk.return_value, + report=reporter, + ) + + tableau_source.get_connection_object_page( + query=mock.MagicMock(), + connection_type=mock.MagicMock(), + query_filter=mock.MagicMock(), + current_cursor=None, + retries_remaining=1, + fetch_size=10, + ) + + assert reporter.num_actual_tableau_metadata_queries == 2 + assert reporter.tableau_server_error_stats + assert reporter.tableau_server_error_stats["NonXMLResponseError"] == 1 + + assert reporter.warnings == [] + assert reporter.failures == [] + + @freeze_time(FROZEN_TIME) @pytest.mark.parametrize( "extract_project_hierarchy, allowed_projects", diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json index 0d8822736c95e..31d7419b2c8cc 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json @@ -18,7 +18,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f" + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" } ], "fineGrainedLineages": [ @@ -32,7 +32,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" ], "confidenceScore": 1.0, - "query": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f" + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" }, { "upstreamType": "FIELD_SET", @@ -44,7 +44,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" ], "confidenceScore": 1.0, - "query": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f" + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" }, { "upstreamType": "FIELD_SET", @@ -56,7 +56,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)" ], "confidenceScore": 1.0, - "query": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f" + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" } ] } @@ -64,7 +64,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -87,7 +87,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -114,7 +114,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -137,7 +137,7 @@ }, "operationType": "INSERT", "customProperties": { - "query_urn": "urn:li:query:6ed1d12fbf2ccc8138ceec08cc35b981030d6d004bfad9743c7afd84260fa63f" + "query_urn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" }, "lastUpdatedTimestamp": 20000 } diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json index fd8475090f009..e22947fd96ce4 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json @@ -133,7 +133,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4" + "query": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b" } ], "fineGrainedLineages": [ @@ -147,7 +147,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" ], "confidenceScore": 1.0, - "query": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4" + "query": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b" }, { "upstreamType": "FIELD_SET", @@ -159,7 +159,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" ], "confidenceScore": 1.0, - "query": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4" + "query": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b" }, { "upstreamType": "FIELD_SET", @@ -171,7 +171,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)" ], "confidenceScore": 1.0, - "query": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4" + "query": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b" } ] } @@ -179,7 +179,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4", + "entityUrn": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -202,7 +202,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4", + "entityUrn": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -229,7 +229,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:88d742bcc0216d6ccb50c7430d1d97494d5dfcfa90160ffa123108844ad261e4", + "entityUrn": "urn:li:query:07a307ad99d3c4a7e54d20c004a4f2d52496f3f5283b33013f80e6323700d97b", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json index a4ac349c3c455..b657b46476cbb 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json @@ -133,7 +133,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332" + "query": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e" } ], "fineGrainedLineages": [ @@ -147,7 +147,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332" + "query": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e" }, { "upstreamType": "FIELD_SET", @@ -159,7 +159,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" ], "confidenceScore": 0.2, - "query": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332" + "query": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e" } ] } @@ -167,7 +167,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332", + "entityUrn": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -190,7 +190,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332", + "entityUrn": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -217,7 +217,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_2efc2a13ee673ccf7b195f8f2c0e4ba0570194d8200c3c20b1eb7e8ca4fb4332", + "entityUrn": "urn:li:query:composite_c035c933cc4ce5cf8a111bcaf419b8e66a1e41853bb154ff9aaa24cd00ecf51e", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json index d9d46a4b14a14..09a98a81f2602 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json @@ -133,7 +133,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405" + "query": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300" } ], "fineGrainedLineages": [ @@ -147,7 +147,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD),a)" ], "confidenceScore": 1.0, - "query": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405" + "query": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300" }, { "upstreamType": "FIELD_SET", @@ -159,7 +159,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD),b)" ], "confidenceScore": 1.0, - "query": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405" + "query": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300" }, { "upstreamType": "FIELD_SET", @@ -171,7 +171,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD),c)" ], "confidenceScore": 1.0, - "query": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405" + "query": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300" } ] } @@ -179,7 +179,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405", + "entityUrn": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -202,7 +202,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405", + "entityUrn": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -229,7 +229,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:b256c8cc8f386b209ef8da55485d46c3fbd471b942f804d370e24350b3087405", + "entityUrn": "urn:li:query:1ed34195f33514203e8359ca22772e03a3588b669e0db00b1681e1a8d0862300", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -257,7 +257,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559" + "query": "urn:li:query:76f0a8e1da90c4d33b5741c6e1014251ce2d1650ba0f58ab136ebaf1bb64dc8c" } ] } @@ -265,7 +265,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "entityUrn": "urn:li:query:76f0a8e1da90c4d33b5741c6e1014251ce2d1650ba0f58ab136ebaf1bb64dc8c", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -288,7 +288,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "entityUrn": "urn:li:query:76f0a8e1da90c4d33b5741c6e1014251ce2d1650ba0f58ab136ebaf1bb64dc8c", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -306,7 +306,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "entityUrn": "urn:li:query:76f0a8e1da90c4d33b5741c6e1014251ce2d1650ba0f58ab136ebaf1bb64dc8c", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -334,7 +334,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae" + "query": "urn:li:query:37c14a3bbb67360d19d1666fa4e11b67ef81926e1e2bcd46b87ea239d27a549d" } ] } @@ -342,7 +342,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "entityUrn": "urn:li:query:37c14a3bbb67360d19d1666fa4e11b67ef81926e1e2bcd46b87ea239d27a549d", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -365,7 +365,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "entityUrn": "urn:li:query:37c14a3bbb67360d19d1666fa4e11b67ef81926e1e2bcd46b87ea239d27a549d", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -383,7 +383,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "entityUrn": "urn:li:query:37c14a3bbb67360d19d1666fa4e11b67ef81926e1e2bcd46b87ea239d27a549d", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -411,7 +411,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:3886d427c84692923797048da6d3991693e89ce44e10d1917c12e8b6fd493904" + "query": "urn:li:query:f4eb748a53291bbea59e080f6d415b08dfd7003d0b7c3d538d02f4e404b30943" }, { "auditStamp": { @@ -424,7 +424,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f" + "query": "urn:li:query:29935c31db1f06edf50d62a59d2874a86c51570256ab3b3102984439c03be1f2" } ] } @@ -432,7 +432,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:3886d427c84692923797048da6d3991693e89ce44e10d1917c12e8b6fd493904", + "entityUrn": "urn:li:query:f4eb748a53291bbea59e080f6d415b08dfd7003d0b7c3d538d02f4e404b30943", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -455,7 +455,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:3886d427c84692923797048da6d3991693e89ce44e10d1917c12e8b6fd493904", + "entityUrn": "urn:li:query:f4eb748a53291bbea59e080f6d415b08dfd7003d0b7c3d538d02f4e404b30943", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -473,7 +473,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:3886d427c84692923797048da6d3991693e89ce44e10d1917c12e8b6fd493904", + "entityUrn": "urn:li:query:f4eb748a53291bbea59e080f6d415b08dfd7003d0b7c3d538d02f4e404b30943", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -484,7 +484,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "entityUrn": "urn:li:query:29935c31db1f06edf50d62a59d2874a86c51570256ab3b3102984439c03be1f2", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -507,7 +507,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "entityUrn": "urn:li:query:29935c31db1f06edf50d62a59d2874a86c51570256ab3b3102984439c03be1f2", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -525,7 +525,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "entityUrn": "urn:li:query:29935c31db1f06edf50d62a59d2874a86c51570256ab3b3102984439c03be1f2", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json index b4eaf76a14933..69bcd8eb10e95 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json @@ -133,7 +133,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3" + "query": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df" }, { "auditStamp": { @@ -146,7 +146,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3" + "query": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df" } ], "fineGrainedLineages": [ @@ -161,7 +161,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD),a)" ], "confidenceScore": 1.0, - "query": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3" + "query": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df" } ] } @@ -169,7 +169,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3", + "entityUrn": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -192,7 +192,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3", + "entityUrn": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -219,7 +219,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_9e36ef19163461d35b618fd1eea2a3f6a5d10a23a979a6d5ef688b31f277abb3", + "entityUrn": "urn:li:query:composite_a10e266957d5007837642526d09f058ca461e42e2159ff45c328ebd069c112df", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { @@ -247,7 +247,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80" + "query": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544" }, { "auditStamp": { @@ -260,7 +260,7 @@ }, "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80" + "query": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544" } ], "fineGrainedLineages": [ @@ -275,7 +275,7 @@ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD),a)" ], "confidenceScore": 1.0, - "query": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80" + "query": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544" } ] } @@ -283,7 +283,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80", + "entityUrn": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { @@ -306,7 +306,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80", + "entityUrn": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -330,7 +330,7 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:composite_49daa72ac1d22734879a6bed1224daa7f8c1293750d6d7b8a24a0aa0e9f74d80", + "entityUrn": "urn:li:query:composite_5d8360cfc2f57f023d9945749848ad52227674fefc9fec568e7fbb1787cfd544", "changeType": "UPSERT", "aspectName": "dataPlatformInstance", "aspect": { diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py index dbe24ade6944f..c3c3a4a15d915 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_utils.py @@ -186,3 +186,15 @@ def test_query_fingerprint(): assert get_query_fingerprint( "select 1 + 1", platform="postgres" ) != get_query_fingerprint("select 2", platform="postgres") + + +def test_redshift_query_fingerprint(): + query1 = "insert into insert_into_table (select * from base_table);" + query2 = "INSERT INTO insert_into_table (SELECT * FROM base_table)" + + assert get_query_fingerprint(query1, "redshift") == get_query_fingerprint( + query2, "redshift" + ) + assert get_query_fingerprint(query1, "redshift", True) != get_query_fingerprint( + query2, "redshift", True + ) diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py index 8f00d5e064db8..fde9a3f2e0cf0 100644 --- a/metadata-ingestion/tests/unit/test_gc.py +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -9,6 +9,34 @@ DataProcessCleanupConfig, DataProcessCleanupReport, ) +from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import ( + SoftDeletedEntitiesCleanup, + SoftDeletedEntitiesCleanupConfig, + SoftDeletedEntitiesReport, +) + + +class TestSoftDeletedEntitiesCleanup(unittest.TestCase): + def setUp(self): + self.ctx = PipelineContext(run_id="test_run") + self.ctx.graph = MagicMock() + self.config = SoftDeletedEntitiesCleanupConfig() + self.report = SoftDeletedEntitiesReport() + self.cleanup = SoftDeletedEntitiesCleanup( + self.ctx, self.config, self.report, dry_run=True + ) + + def test_update_report(self): + self.cleanup._update_report( + urn="urn:li:dataset:1", + entity_type="dataset", + ) + self.assertEqual(1, self.report.num_hard_deleted) + self.assertEqual(1, self.report.num_hard_deleted_by_type["dataset"]) + + def test_increment_retained_count(self): + self.cleanup._increment_retained_count() + self.assertEqual(1, self.report.num_soft_deleted_retained_due_to_age) class TestDataProcessCleanup(unittest.TestCase): diff --git a/metadata-ingestion/tests/unit/test_usage_common.py b/metadata-ingestion/tests/unit/test_usage_common.py index e01f0ea77df83..bd6d194835dd9 100644 --- a/metadata-ingestion/tests/unit/test_usage_common.py +++ b/metadata-ingestion/tests/unit/test_usage_common.py @@ -5,6 +5,7 @@ from freezegun import freeze_time from pydantic import ValidationError +import datahub.ingestion.source.usage.usage_common from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BucketDuration, get_time_bucket from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance @@ -28,6 +29,7 @@ UserUsageCountsClass, WindowDurationClass, ) +from datahub.testing.doctest import assert_doctest _TestTableRef = str @@ -373,3 +375,7 @@ def test_convert_usage_aggregation_class(): eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.MONTH), ), ) + + +def test_extract_user_email(): + assert_doctest(datahub.ingestion.source.usage.usage_common)