From caef6771b828d8ee94f76801a9121f4e1a2e7561 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 15 Dec 2023 15:07:56 -0500 Subject: [PATCH] feat(ingest/redshift): drop repeated operations (#9440) --- metadata-ingestion/setup.py | 6 +- .../ingestion/source/redshift/report.py | 3 +- .../ingestion/source/redshift/usage.py | 68 +++++++++++++++++-- .../redshift-usage/test_redshift_usage.py | 54 ++++++++++++++- 4 files changed, 121 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 5d15d7167b63e8..1bc1bc5100b08d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -368,7 +368,11 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common | redshift_common | usage_common | sqlglot_lib, + "redshift": sql_common + | redshift_common + | usage_common + | sqlglot_lib + | {"cachetools"}, "s3": {*s3_base, *data_lake_profiling}, "gcs": {*s3_base, *data_lake_profiling}, "sagemaker": aws_common, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index b845580f359394..333c851650fb3a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -29,7 +29,8 @@ class RedshiftReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowRep lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict) tables_in_mem_size: Dict[str, str] = field(default_factory=TopKDict) views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict) - num_operational_stats_skipped: int = 0 + num_operational_stats_filtered: int = 0 + num_repeated_operations_dropped: int = 0 num_usage_stat_skipped: int = 0 num_lineage_tables_dropped: int = 0 num_lineage_dropped_query_parser: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index c789e605b9c29f..409027a8805a0d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -4,6 +4,7 @@ from datetime import datetime from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union +import cachetools import pydantic.error_wrappers import redshift_connector from pydantic.fields import Field @@ -251,7 +252,7 @@ def _get_workunits_internal( ) -> Iterable[MetadataWorkUnit]: self.report.num_usage_workunits_emitted = 0 self.report.num_usage_stat_skipped = 0 - self.report.num_operational_stats_skipped = 0 + self.report.num_operational_stats_filtered = 0 if self.config.include_operational_stats: self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS) @@ -304,8 +305,13 @@ def _gen_operation_aspect_workunits( ) # Generate operation aspect work units from the access events - yield from self._gen_operation_aspect_workunits_from_access_events( - access_events_iterable, all_tables=all_tables + yield from ( + mcpw.as_workunit() + for mcpw in self._drop_repeated_operations( + self._gen_operation_aspect_workunits_from_access_events( + access_events_iterable, all_tables=all_tables + ) + ) ) def _should_process_event( @@ -366,11 +372,61 @@ def _gen_access_events_from_history_query( yield access_event results = cursor.fetchmany() + def _drop_repeated_operations( + self, events: Iterable[MetadataChangeProposalWrapper] + ) -> Iterable[MetadataChangeProposalWrapper]: + """Drop repeated operations on the same entity. + + ASSUMPTION: Events are ordered by lastUpdatedTimestamp, descending. + + Operations are only dropped if they were within 1 minute of each other, + and have the same operation type, user, and entity. + + This is particularly useful when we see a string of insert operations + that are all really part of the same overall operation. + """ + + OPERATION_CACHE_MAXSIZE = 1000 + DROP_WINDOW_SEC = 10 + + # All timestamps are in milliseconds. + timestamp_low_watermark = 0 + + def timer(): + return -timestamp_low_watermark + + # dict of entity urn -> (last event's actor, operation type) + # TODO: Remove the type ignore and use TTLCache[key_type, value_type] directly once that's supported in Python 3.9. + last_events: Dict[str, Tuple[Optional[str], str]] = cachetools.TTLCache( # type: ignore[assignment] + maxsize=OPERATION_CACHE_MAXSIZE, ttl=DROP_WINDOW_SEC * 1000, timer=timer + ) + + for event in events: + assert isinstance(event.aspect, OperationClass) + + timestamp_low_watermark = min( + timestamp_low_watermark, event.aspect.lastUpdatedTimestamp + ) + + urn = event.entityUrn + assert urn + assert isinstance(event.aspect.operationType, str) + value: Tuple[Optional[str], str] = ( + event.aspect.actor, + event.aspect.operationType, + ) + if urn in last_events and last_events[urn] == value: + self.report.num_repeated_operations_dropped += 1 + continue + + last_events[urn] = value + yield event + def _gen_operation_aspect_workunits_from_access_events( self, events_iterable: Iterable[RedshiftAccessEvent], all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]], - ) -> Iterable[MetadataWorkUnit]: + ) -> Iterable[MetadataChangeProposalWrapper]: self.report.num_operational_stats_workunits_emitted = 0 for event in events_iterable: if not ( @@ -384,7 +440,7 @@ def _gen_operation_aspect_workunits_from_access_events( continue if not self._should_process_event(event, all_tables=all_tables): - self.report.num_operational_stats_skipped += 1 + self.report.num_operational_stats_filtered += 1 continue assert event.operation_type in ["insert", "delete"] @@ -406,7 +462,7 @@ def _gen_operation_aspect_workunits_from_access_events( resource: str = f"{event.database}.{event.schema_}.{event.table}".lower() yield MetadataChangeProposalWrapper( entityUrn=self.dataset_urn_builder(resource), aspect=operation_aspect - ).as_workunit() + ) self.report.num_operational_stats_workunits_emitted += 1 def _aggregate_access_events( diff --git a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py index 74eec82b39ba3d..a9eebb8d54154e 100644 --- a/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py +++ b/metadata-ingestion/tests/integration/redshift-usage/test_redshift_usage.py @@ -2,11 +2,11 @@ import pathlib from pathlib import Path from typing import Dict, List, Union -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch from freezegun import freeze_time -from datahub.emitter.mce_builder import make_dataset_urn +from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.sink.file import write_metadata_file from datahub.ingestion.source.redshift.config import RedshiftConfig @@ -20,6 +20,7 @@ MetadataChangeEvent, MetadataChangeProposal, ) +from datahub.metadata.schema_classes import OperationClass, OperationTypeClass from tests.test_helpers import mce_helpers FROZEN_TIME = "2021-09-15 09:00:00" @@ -243,3 +244,52 @@ def load_access_events(test_resources_dir: pathlib.Path) -> List[Dict]: with access_events_history_file.open() as access_events_json: access_events = json.loads(access_events_json.read()) return access_events + + +def test_duplicate_operations_dropped(): + report = RedshiftReport() + usage_extractor = RedshiftUsageExtractor( + config=MagicMock(), + connection=MagicMock(), + report=report, + dataset_urn_builder=MagicMock(), + redundant_run_skip_handler=None, + ) + + user = make_user_urn("jdoe") + urnA = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableA,PROD)" + urnB = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableB,PROD)" + + opA1 = MetadataChangeProposalWrapper( + entityUrn=urnA, + aspect=OperationClass( + timestampMillis=100 * 1000, + lastUpdatedTimestamp=95 * 1000, + actor=user, + operationType=OperationTypeClass.INSERT, + ), + ) + opB1 = MetadataChangeProposalWrapper( + entityUrn=urnB, + aspect=OperationClass( + timestampMillis=101 * 1000, + lastUpdatedTimestamp=94 * 1000, + actor=user, + operationType=OperationTypeClass.INSERT, + ), + ) + opA2 = MetadataChangeProposalWrapper( + entityUrn=urnA, + aspect=OperationClass( + timestampMillis=102 * 1000, + lastUpdatedTimestamp=90 * 1000, + actor=user, + operationType=OperationTypeClass.INSERT, + ), + ) + + dedups = list(usage_extractor._drop_repeated_operations([opA1, opB1, opA2])) + assert dedups == [ + opA1, + opB1, + ]