From 05aee03f3f7f232872c462f30b78f82b8fe4ab85 Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Wed, 12 Jun 2024 08:49:57 +0530 Subject: [PATCH] perf(ingestion/fivetran): Connector performance optimization (#10556) --- .../datahub/ingestion/api/source_helpers.py | 4 +- .../ingestion/source/fivetran/config.py | 22 +++++ .../ingestion/source/fivetran/data_classes.py | 4 +- .../ingestion/source/fivetran/fivetran.py | 7 +- .../source/fivetran/fivetran_log_api.py | 96 +++++++++++-------- .../source/fivetran/fivetran_query.py | 12 +-- .../state/stale_entity_removal_handler.py | 14 +++ .../integration/fivetran/test_fivetran.py | 6 +- .../state/golden_test_checkpoint_state.json | 2 +- ...n_test_checkpoint_state_after_deleted.json | 2 +- .../state/golden_test_stateful_ingestion.json | 22 +++++ ...test_stateful_ingestion_after_deleted.json | 22 +++++ .../state/test_stateful_ingestion.py | 42 +++++++- 13 files changed, 198 insertions(+), 57 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7226258515155f..8cc2cc565db85c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -102,7 +102,6 @@ def auto_status_aspect( """ all_urns: Set[str] = set() status_urns: Set[str] = set() - skip_urns: Set[str] = set() for wu in stream: urn = wu.get_urn() all_urns.add(urn) @@ -127,14 +126,13 @@ def auto_status_aspect( yield wu - for urn in sorted(all_urns - status_urns - skip_urns): + for urn in sorted(all_urns - status_urns): entity_type = guess_entity_type(urn) if not entity_supports_aspect(entity_type, StatusClass): # If any entity does not support aspect 'status' then skip that entity from adding status aspect. # Example like dataProcessInstance doesn't suppport status aspect. # If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance continue - yield MetadataChangeProposalWrapper( entityUrn=urn, aspect=StatusClass(removed=False), diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index a689e9ee642aef..f55d9f89ad97f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -9,6 +9,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.api.report import Report from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryConnectionConfig, ) @@ -20,6 +21,7 @@ StatefulIngestionConfigBase, ) from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -110,10 +112,26 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict: return values +@dataclass +class MetadataExtractionPerfReport(Report): + connectors_metadata_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + connectors_lineage_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + connectors_jobs_extraction_sec: PerfTimer = dataclass_field( + default_factory=PerfTimer + ) + + @dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 filtered_connectors: List[str] = dataclass_field(default_factory=list) + metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( + default_factory=MetadataExtractionPerfReport + ) def report_connectors_scanned(self, count: int = 1) -> None: self.connectors_scanned += count @@ -163,3 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default={}, description="A mapping of destination dataset to platform instance. Use destination id as key.", ) + history_sync_lookback_period: int = pydantic.Field( + 7, + description="The number of days to look back when extracting connectors' sync history.", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py index 4ae71b990e5cde..18de2b01edd3b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List, Optional +from typing import List @dataclass @@ -23,7 +23,7 @@ class Connector: paused: bool sync_frequency: int destination_id: str - user_email: Optional[str] + user_id: str table_lineage: List[TableLineage] jobs: List["Job"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c8ae779b602b8a..56a80a2fd963e7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -173,11 +173,12 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob: env=self.config.env, platform_instance=self.config.platform_instance, ) + owner_email = self.audit_log.get_user_email(connector.user_id) datajob = DataJob( id=connector.connector_id, flow_urn=dataflow_urn, name=connector.connector_name, - owners={connector.user_email} if connector.user_email else set(), + owners={owner_email} if owner_email else set(), ) job_property_bag: Dict[str, str] = {} @@ -281,7 +282,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: """ logger.info("Fivetran plugin execution is started") connectors = self.audit_log.get_allowed_connectors_list( - self.config.connector_patterns, self.report + self.config.connector_patterns, + self.report, + self.config.history_sync_lookback_period, ) for connector in connectors: logger.info(f"Processing connector id: {connector.connector_id}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index a9eb59f9297992..51ef45c500c350 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -1,3 +1,4 @@ +import functools import json import logging from typing import Any, Dict, List, Optional, Tuple @@ -151,9 +152,14 @@ def _get_table_lineage( return table_lineage_list - def _get_all_connector_sync_logs(self) -> Dict[str, Dict]: + def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]: sync_logs = {} - for row in self._query(self.fivetran_log_query.get_sync_logs_query()): + for row in self._query( + self.fivetran_log_query.get_sync_logs_query().format( + db_clause=self.fivetran_log_query.db_clause, + syncs_interval=syncs_interval, + ) + ): if row[Constant.CONNECTOR_ID] not in sync_logs: sync_logs[row[Constant.CONNECTOR_ID]] = { row[Constant.SYNC_ID]: { @@ -208,50 +214,62 @@ def _get_jobs_list( ) return jobs - def _get_user_email(self, user_id: Optional[str]) -> Optional[str]: + @functools.lru_cache() + def _get_users(self) -> Dict[str, str]: + users = self._query(self.fivetran_log_query.get_users_query()) + if not users: + return {} + return {user[Constant.USER_ID]: user[Constant.EMAIL] for user in users} + + def get_user_email(self, user_id: str) -> Optional[str]: if not user_id: return None - user_details = self._query( - self.fivetran_log_query.get_user_query(user_id=user_id) - ) + return self._get_users().get(user_id) - if not user_details: - return None + def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: + table_lineage_metadata = self._get_connectors_table_lineage_metadata() + column_lineage_metadata = self._get_column_lineage_metadata() + for connector in connectors: + connector.table_lineage = self._get_table_lineage( + column_lineage_metadata=column_lineage_metadata, + table_lineage_result=table_lineage_metadata.get(connector.connector_id), + ) - return f"{user_details[0][Constant.EMAIL]}" + def _fill_connectors_jobs( + self, connectors: List[Connector], syncs_interval: int + ) -> None: + sync_logs = self._get_all_connector_sync_logs(syncs_interval) + for connector in connectors: + connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id)) def get_allowed_connectors_list( - self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport + self, + connector_patterns: AllowDenyPattern, + report: FivetranSourceReport, + syncs_interval: int, ) -> List[Connector]: connectors: List[Connector] = [] - sync_logs = self._get_all_connector_sync_logs() - table_lineage_metadata = self._get_connectors_table_lineage_metadata() - column_lineage_metadata = self._get_column_lineage_metadata() - connector_list = self._query(self.fivetran_log_query.get_connectors_query()) - for connector in connector_list: - if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): - report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) - continue - connectors.append( - Connector( - connector_id=connector[Constant.CONNECTOR_ID], - connector_name=connector[Constant.CONNECTOR_NAME], - connector_type=connector[Constant.CONNECTOR_TYPE_ID], - paused=connector[Constant.PAUSED], - sync_frequency=connector[Constant.SYNC_FREQUENCY], - destination_id=connector[Constant.DESTINATION_ID], - user_email=self._get_user_email( - connector[Constant.CONNECTING_USER_ID] - ), - table_lineage=self._get_table_lineage( - column_lineage_metadata=column_lineage_metadata, - table_lineage_result=table_lineage_metadata.get( - connector[Constant.CONNECTOR_ID] - ), - ), - jobs=self._get_jobs_list( - sync_logs.get(connector[Constant.CONNECTOR_ID]) - ), + with report.metadata_extraction_perf.connectors_metadata_extraction_sec: + connector_list = self._query(self.fivetran_log_query.get_connectors_query()) + for connector in connector_list: + if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): + report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + continue + connectors.append( + Connector( + connector_id=connector[Constant.CONNECTOR_ID], + connector_name=connector[Constant.CONNECTOR_NAME], + connector_type=connector[Constant.CONNECTOR_TYPE_ID], + paused=connector[Constant.PAUSED], + sync_frequency=connector[Constant.SYNC_FREQUENCY], + destination_id=connector[Constant.DESTINATION_ID], + user_id=connector[Constant.CONNECTING_USER_ID], + table_lineage=[], + jobs=[], + ) ) - ) + with report.metadata_extraction_perf.connectors_lineage_extraction_sec: + self._fill_connectors_table_lineage(connectors) + with report.metadata_extraction_perf.connectors_jobs_extraction_sec: + self._fill_connectors_jobs(connectors, syncs_interval) return connectors diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 8f621bc3ffd06e..0c8ade26943490 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -21,24 +21,24 @@ def get_connectors_query(self) -> str: FROM {self.db_clause}connector WHERE _fivetran_deleted = FALSE""" - def get_user_query(self, user_id: str) -> str: + def get_users_query(self) -> str: return f""" SELECT id as user_id, given_name, family_name, email - FROM {self.db_clause}user - WHERE id = '{user_id}'""" + FROM {self.db_clause}user""" def get_sync_logs_query(self) -> str: - return f""" + return """ SELECT connector_id, sync_id, message_event, message_data, time_stamp - FROM {self.db_clause}log - WHERE message_event in ('sync_start', 'sync_end')""" + FROM {db_clause}log + WHERE message_event in ('sync_start', 'sync_end') + and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'""" def get_table_lineage_query(self) -> str: return f""" diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index 9154a555f23090..97c9dd9e245ddf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -6,6 +6,7 @@ import pydantic from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId from datahub.ingestion.api.source_helpers import auto_stale_entity_removal @@ -23,6 +24,7 @@ ) from datahub.metadata.schema_classes import StatusClass from datahub.utilities.lossy_collections import LossyList +from datahub.utilities.urns.urn import guess_entity_type logger: logging.Logger = logging.getLogger(__name__) @@ -48,10 +50,14 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig): @dataclass class StaleEntityRemovalSourceReport(StatefulIngestionReport): soft_deleted_stale_entities: LossyList[str] = field(default_factory=LossyList) + last_state_non_deletable_entities: LossyList[str] = field(default_factory=LossyList) def report_stale_entity_soft_deleted(self, urn: str) -> None: self.soft_deleted_stale_entities.append(urn) + def report_last_state_non_deletable_entities(self, urn: str) -> None: + self.last_state_non_deletable_entities.append(urn) + class StaleEntityRemovalHandler( StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"] @@ -272,11 +278,19 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: self.add_entity_to_state("", urn) return + report = self.source.get_report() + assert isinstance(report, StaleEntityRemovalSourceReport) + # Everything looks good, emit the soft-deletion workunits for urn in last_checkpoint_state.get_urns_not_in( type="*", other_checkpoint_state=cur_checkpoint_state ): + if not entity_supports_aspect(guess_entity_type(urn), StatusClass): + # If any entity does not support aspect 'status' then skip that entity urn + report.report_last_state_non_deletable_entities(urn) + continue if urn in self._urns_to_skip: + report.report_last_state_non_deletable_entities(urn) logger.debug( f"Not soft-deleting entity {urn} since it is in urns_to_skip" ) diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index de1e5543f4be69..642d4ca992ca03 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -89,7 +89,7 @@ def default_query_results( "destination_column_name": "name", }, ] - elif query == fivetran_log_query.get_user_query("reapply_phone"): + elif query == fivetran_log_query.get_users_query(): return [ { "user_id": "reapply_phone", @@ -98,7 +98,9 @@ def default_query_results( "email": "abc.xyz@email.com", } ] - elif query == fivetran_log_query.get_sync_logs_query(): + elif query == fivetran_log_query.get_sync_logs_query().format( + db_clause=fivetran_log_query.db_clause, syncs_interval=7 + ): return [ { "connector_id": "calendar_elected", diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json index fcf73d9614f242..ce03804279097f 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json index 5477af72a1939c..6a00e67a2ca216 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json index 4a77651c930667..c5d0df1aeb59b5 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -46,5 +46,27 @@ "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "job1", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json index 9d6f755374462b..c1bdc8ffeee052 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -31,6 +31,28 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "job2", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index 50d9b86b3a0171..e3a2a6cccea794 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -1,3 +1,4 @@ +import time from dataclasses import dataclass, field as dataclass_field from typing import Any, Dict, Iterable, List, Optional, cast from unittest import mock @@ -7,6 +8,7 @@ from freezegun import freeze_time from pydantic import Field +from datahub.api.entities.dataprocess.dataprocess_instance import DataProcessInstance from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -24,7 +26,10 @@ StatefulIngestionConfigBase, StatefulIngestionSourceBase, ) -from datahub.metadata.schema_classes import StatusClass +from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( + DataProcessInstanceProperties, +) +from datahub.metadata.schema_classes import AuditStampClass, StatusClass from datahub.utilities.urns.dataset_urn import DatasetUrn from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( @@ -62,6 +67,10 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default=False, description="Should this dummy source report a failure.", ) + dpi_id_to_ingest: Optional[str] = Field( + default=None, + description="Data process instance id to ingest.", + ) class DummySource(StatefulIngestionSourceBase): @@ -109,6 +118,24 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aspect=StatusClass(removed=False), ).as_workunit() + if self.source_config.dpi_id_to_ingest: + dpi = DataProcessInstance( + id=self.source_config.dpi_id_to_ingest, + orchestrator="dummy", + ) + + yield MetadataChangeProposalWrapper( + entityUrn=str(dpi.urn), + aspect=DataProcessInstanceProperties( + name=dpi.id, + created=AuditStampClass( + time=int(time.time() * 1000), + actor="urn:li:corpuser:datahub", + ), + type=dpi.type, + ), + ).as_workunit() + if self.source_config.report_failure: self.reporter.report_failure("Dummy error", "Error") @@ -152,6 +179,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): "stateful_ingestion": { "enabled": True, "remove_stale_metadata": True, + "fail_safe_threshold": 100, "state_provider": { "type": "file", "config": { @@ -159,6 +187,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): }, }, }, + "dpi_id_to_ingest": "job1", }, }, "sink": { @@ -207,6 +236,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): pipeline_run2_config["source"]["config"]["dataset_patterns"] = { "allow": ["dummy_dataset1", "dummy_dataset2"], } + pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2" pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name_after_deleted}" @@ -253,6 +283,16 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): ] assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) + report = pipeline_run2.source.get_report() + assert isinstance(report, StaleEntityRemovalSourceReport) + # assert report last ingestion state non_deletable entity urns + non_deletable_urns: List[str] = [ + "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + ] + assert sorted(non_deletable_urns) == sorted( + report.last_state_non_deletable_entities + ) + @freeze_time(FROZEN_TIME) def test_stateful_ingestion_failure(pytestconfig, tmp_path, mock_time):