Skip to content

Commit

Permalink
perf(ingestion/fivetran): Connector performance optimization (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 authored Jun 12, 2024
1 parent 52ac314 commit 05aee03
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def auto_status_aspect(
"""
all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -127,14 +126,13 @@ def auto_status_aspect(

yield wu

for urn in sorted(all_urns - status_urns - skip_urns):
for urn in sorted(all_urns - status_urns):
entity_type = guess_entity_type(urn)
if not entity_supports_aspect(entity_type, StatusClass):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
continue

yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.report import Report
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
Expand All @@ -20,6 +21,7 @@
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -110,10 +112,26 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


@dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)
connectors_lineage_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)
connectors_jobs_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)


@dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field(
default_factory=MetadataExtractionPerfReport
)

def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count
Expand Down Expand Up @@ -163,3 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
default={},
description="A mapping of destination dataset to platform instance. Use destination id as key.",
)
history_sync_lookback_period: int = pydantic.Field(
7,
description="The number of days to look back when extracting connectors' sync history.",
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional
from typing import List


@dataclass
Expand All @@ -23,7 +23,7 @@ class Connector:
paused: bool
sync_frequency: int
destination_id: str
user_email: Optional[str]
user_id: str
table_lineage: List[TableLineage]
jobs: List["Job"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
env=self.config.env,
platform_instance=self.config.platform_instance,
)
owner_email = self.audit_log.get_user_email(connector.user_id)
datajob = DataJob(
id=connector.connector_id,
flow_urn=dataflow_urn,
name=connector.connector_name,
owners={connector.user_email} if connector.user_email else set(),
owners={owner_email} if owner_email else set(),
)

job_property_bag: Dict[str, str] = {}
Expand Down Expand Up @@ -281,7 +282,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
logger.info("Fivetran plugin execution is started")
connectors = self.audit_log.get_allowed_connectors_list(
self.config.connector_patterns, self.report
self.config.connector_patterns,
self.report,
self.config.history_sync_lookback_period,
)
for connector in connectors:
logger.info(f"Processing connector id: {connector.connector_id}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -151,9 +152,14 @@ def _get_table_lineage(

return table_lineage_list

def _get_all_connector_sync_logs(self) -> Dict[str, Dict]:
def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]:
sync_logs = {}
for row in self._query(self.fivetran_log_query.get_sync_logs_query()):
for row in self._query(
self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
syncs_interval=syncs_interval,
)
):
if row[Constant.CONNECTOR_ID] not in sync_logs:
sync_logs[row[Constant.CONNECTOR_ID]] = {
row[Constant.SYNC_ID]: {
Expand Down Expand Up @@ -208,50 +214,62 @@ def _get_jobs_list(
)
return jobs

def _get_user_email(self, user_id: Optional[str]) -> Optional[str]:
@functools.lru_cache()
def _get_users(self) -> Dict[str, str]:
users = self._query(self.fivetran_log_query.get_users_query())
if not users:
return {}
return {user[Constant.USER_ID]: user[Constant.EMAIL] for user in users}

def get_user_email(self, user_id: str) -> Optional[str]:
if not user_id:
return None
user_details = self._query(
self.fivetran_log_query.get_user_query(user_id=user_id)
)
return self._get_users().get(user_id)

if not user_details:
return None
def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
for connector in connectors:
connector.table_lineage = self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
)

return f"{user_details[0][Constant.EMAIL]}"
def _fill_connectors_jobs(
self, connectors: List[Connector], syncs_interval: int
) -> None:
sync_logs = self._get_all_connector_sync_logs(syncs_interval)
for connector in connectors:
connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id))

def get_allowed_connectors_list(
self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport
self,
connector_patterns: AllowDenyPattern,
report: FivetranSourceReport,
syncs_interval: int,
) -> List[Connector]:
connectors: List[Connector] = []
sync_logs = self._get_all_connector_sync_logs()
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_email=self._get_user_email(
connector[Constant.CONNECTING_USER_ID]
),
table_lineage=self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
table_lineage_result=table_lineage_metadata.get(
connector[Constant.CONNECTOR_ID]
),
),
jobs=self._get_jobs_list(
sync_logs.get(connector[Constant.CONNECTOR_ID])
),
with report.metadata_extraction_perf.connectors_metadata_extraction_sec:
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_id=connector[Constant.CONNECTING_USER_ID],
table_lineage=[],
jobs=[],
)
)
)
with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
self._fill_connectors_table_lineage(connectors)
with report.metadata_extraction_perf.connectors_jobs_extraction_sec:
self._fill_connectors_jobs(connectors, syncs_interval)
return connectors
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ def get_connectors_query(self) -> str:
FROM {self.db_clause}connector
WHERE _fivetran_deleted = FALSE"""

def get_user_query(self, user_id: str) -> str:
def get_users_query(self) -> str:
return f"""
SELECT id as user_id,
given_name,
family_name,
email
FROM {self.db_clause}user
WHERE id = '{user_id}'"""
FROM {self.db_clause}user"""

def get_sync_logs_query(self) -> str:
return f"""
return """
SELECT connector_id,
sync_id,
message_event,
message_data,
time_stamp
FROM {self.db_clause}log
WHERE message_event in ('sync_start', 'sync_end')"""
FROM {db_clause}log
WHERE message_event in ('sync_start', 'sync_end')
and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'"""

def get_table_lineage_query(self) -> str:
return f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pydantic

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.source_helpers import auto_stale_entity_removal
Expand All @@ -23,6 +24,7 @@
)
from datahub.metadata.schema_classes import StatusClass
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.urns.urn import guess_entity_type

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -48,10 +50,14 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
@dataclass
class StaleEntityRemovalSourceReport(StatefulIngestionReport):
soft_deleted_stale_entities: LossyList[str] = field(default_factory=LossyList)
last_state_non_deletable_entities: LossyList[str] = field(default_factory=LossyList)

def report_stale_entity_soft_deleted(self, urn: str) -> None:
self.soft_deleted_stale_entities.append(urn)

def report_last_state_non_deletable_entities(self, urn: str) -> None:
self.last_state_non_deletable_entities.append(urn)


class StaleEntityRemovalHandler(
StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"]
Expand Down Expand Up @@ -272,11 +278,19 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
self.add_entity_to_state("", urn)
return

report = self.source.get_report()
assert isinstance(report, StaleEntityRemovalSourceReport)

# Everything looks good, emit the soft-deletion workunits
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
if not entity_supports_aspect(guess_entity_type(urn), StatusClass):
# If any entity does not support aspect 'status' then skip that entity urn
report.report_last_state_non_deletable_entities(urn)
continue
if urn in self._urns_to_skip:
report.report_last_state_non_deletable_entities(urn)
logger.debug(
f"Not soft-deleting entity {urn} since it is in urns_to_skip"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def default_query_results(
"destination_column_name": "name",
},
]
elif query == fivetran_log_query.get_user_query("reapply_phone"):
elif query == fivetran_log_query.get_users_query():
return [
{
"user_id": "reapply_phone",
Expand All @@ -98,7 +98,9 @@ def default_query_results(
"email": "[email protected]",
}
]
elif query == fivetran_log_query.get_sync_logs_query():
elif query == fivetran_log_query.get_sync_logs_query().format(
db_clause=fivetran_log_query.db_clause, syncs_interval=7
):
return [
{
"connector_id": "calendar_elected",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,27 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "job1",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]
Loading

0 comments on commit 05aee03

Please sign in to comment.