Skip to content

Commit

Permalink
feat(ingest/redshift): drop repeated operations (datahub-project#9440)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 15, 2023
1 parent 824df5a commit caef677
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 10 deletions.
6 changes: 5 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,11 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common | redshift_common | usage_common | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"cachetools"},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class RedshiftReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowRep
lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict)
tables_in_mem_size: Dict[str, str] = field(default_factory=TopKDict)
views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict)
num_operational_stats_skipped: int = 0
num_operational_stats_filtered: int = 0
num_repeated_operations_dropped: int = 0
num_usage_stat_skipped: int = 0
num_lineage_tables_dropped: int = 0
num_lineage_dropped_query_parser: int = 0
Expand Down
68 changes: 62 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union

import cachetools
import pydantic.error_wrappers
import redshift_connector
from pydantic.fields import Field
Expand Down Expand Up @@ -251,7 +252,7 @@ def _get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
self.report.num_usage_workunits_emitted = 0
self.report.num_usage_stat_skipped = 0
self.report.num_operational_stats_skipped = 0
self.report.num_operational_stats_filtered = 0

if self.config.include_operational_stats:
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS)
Expand Down Expand Up @@ -304,8 +305,13 @@ def _gen_operation_aspect_workunits(
)

# Generate operation aspect work units from the access events
yield from self._gen_operation_aspect_workunits_from_access_events(
access_events_iterable, all_tables=all_tables
yield from (
mcpw.as_workunit()
for mcpw in self._drop_repeated_operations(
self._gen_operation_aspect_workunits_from_access_events(
access_events_iterable, all_tables=all_tables
)
)
)

def _should_process_event(
Expand Down Expand Up @@ -366,11 +372,61 @@ def _gen_access_events_from_history_query(
yield access_event
results = cursor.fetchmany()

def _drop_repeated_operations(
self, events: Iterable[MetadataChangeProposalWrapper]
) -> Iterable[MetadataChangeProposalWrapper]:
"""Drop repeated operations on the same entity.
ASSUMPTION: Events are ordered by lastUpdatedTimestamp, descending.
Operations are only dropped if they were within 1 minute of each other,
and have the same operation type, user, and entity.
This is particularly useful when we see a string of insert operations
that are all really part of the same overall operation.
"""

OPERATION_CACHE_MAXSIZE = 1000
DROP_WINDOW_SEC = 10

# All timestamps are in milliseconds.
timestamp_low_watermark = 0

def timer():
return -timestamp_low_watermark

# dict of entity urn -> (last event's actor, operation type)
# TODO: Remove the type ignore and use TTLCache[key_type, value_type] directly once that's supported in Python 3.9.
last_events: Dict[str, Tuple[Optional[str], str]] = cachetools.TTLCache( # type: ignore[assignment]
maxsize=OPERATION_CACHE_MAXSIZE, ttl=DROP_WINDOW_SEC * 1000, timer=timer
)

for event in events:
assert isinstance(event.aspect, OperationClass)

timestamp_low_watermark = min(
timestamp_low_watermark, event.aspect.lastUpdatedTimestamp
)

urn = event.entityUrn
assert urn
assert isinstance(event.aspect.operationType, str)
value: Tuple[Optional[str], str] = (
event.aspect.actor,
event.aspect.operationType,
)
if urn in last_events and last_events[urn] == value:
self.report.num_repeated_operations_dropped += 1
continue

last_events[urn] = value
yield event

def _gen_operation_aspect_workunits_from_access_events(
self,
events_iterable: Iterable[RedshiftAccessEvent],
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]],
) -> Iterable[MetadataWorkUnit]:
) -> Iterable[MetadataChangeProposalWrapper]:
self.report.num_operational_stats_workunits_emitted = 0
for event in events_iterable:
if not (
Expand All @@ -384,7 +440,7 @@ def _gen_operation_aspect_workunits_from_access_events(
continue

if not self._should_process_event(event, all_tables=all_tables):
self.report.num_operational_stats_skipped += 1
self.report.num_operational_stats_filtered += 1
continue

assert event.operation_type in ["insert", "delete"]
Expand All @@ -406,7 +462,7 @@ def _gen_operation_aspect_workunits_from_access_events(
resource: str = f"{event.database}.{event.schema_}.{event.table}".lower()
yield MetadataChangeProposalWrapper(
entityUrn=self.dataset_urn_builder(resource), aspect=operation_aspect
).as_workunit()
)
self.report.num_operational_stats_workunits_emitted += 1

def _aggregate_access_events(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import pathlib
from pathlib import Path
from typing import Dict, List, Union
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from freezegun import freeze_time

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.redshift.config import RedshiftConfig
Expand All @@ -20,6 +20,7 @@
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from tests.test_helpers import mce_helpers

FROZEN_TIME = "2021-09-15 09:00:00"
Expand Down Expand Up @@ -243,3 +244,52 @@ def load_access_events(test_resources_dir: pathlib.Path) -> List[Dict]:
with access_events_history_file.open() as access_events_json:
access_events = json.loads(access_events_json.read())
return access_events


def test_duplicate_operations_dropped():
report = RedshiftReport()
usage_extractor = RedshiftUsageExtractor(
config=MagicMock(),
connection=MagicMock(),
report=report,
dataset_urn_builder=MagicMock(),
redundant_run_skip_handler=None,
)

user = make_user_urn("jdoe")
urnA = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableA,PROD)"
urnB = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableB,PROD)"

opA1 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=100 * 1000,
lastUpdatedTimestamp=95 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opB1 = MetadataChangeProposalWrapper(
entityUrn=urnB,
aspect=OperationClass(
timestampMillis=101 * 1000,
lastUpdatedTimestamp=94 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opA2 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=102 * 1000,
lastUpdatedTimestamp=90 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)

dedups = list(usage_extractor._drop_repeated_operations([opA1, opB1, opA2]))
assert dedups == [
opA1,
opB1,
]

0 comments on commit caef677

Please sign in to comment.