Skip to content

Commit

Permalink
feat(ingest/fivetran): add safeguards on table/column lineage (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 21, 2024
1 parent 2f85bc1 commit dd1a06f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 123 deletions.
19 changes: 10 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from dataclasses import dataclass, field as dataclass_field
from typing import Dict, List, Optional
from typing import Dict, Optional

import pydantic
from pydantic import Field, root_validator
Expand All @@ -23,6 +23,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,24 +115,24 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


@dataclass
@dataclasses.dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclass_field(
connectors_metadata_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_lineage_extraction_sec: PerfTimer = dataclass_field(
connectors_lineage_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)
connectors_jobs_extraction_sec: PerfTimer = dataclass_field(
connectors_jobs_extraction_sec: PerfTimer = dataclasses.field(
default_factory=PerfTimer
)


@dataclass
@dataclasses.dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field(
filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field(
default_factory=MetadataExtractionPerfReport
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Connector:
sync_frequency: int
destination_id: str
user_id: str
table_lineage: List[TableLineage]
lineage: List[TableLineage]
jobs: List["Job"]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import (
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_query import (
MAX_JOBS_PER_CONNECTOR,
FivetranLogAPI,
MAX_TABLE_LINEAGE_PER_CONNECTOR,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -106,13 +107,21 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
)

for table_lineage in connector.table_lineage:
if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
self.report.warning(
title="Table lineage truncated",
message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. "
f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)

for lineage in connector.lineage:
input_dataset_urn = DatasetUrn.create_from_ids(
platform_id=source_platform,
table_name=(
f"{source_database.lower()}.{table_lineage.source_table}"
f"{source_database.lower()}.{lineage.source_table}"
if source_database
else table_lineage.source_table
else lineage.source_table
),
env=source_platform_detail.env,
platform_instance=source_platform_detail.platform_instance,
Expand All @@ -121,14 +130,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:

output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}",
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)

if self.config.include_column_lineage:
for column_lineage in table_lineage.column_lineage:
for column_lineage in lineage.column_lineage:
fine_grained_lineage.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import json
import logging
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple

import sqlglot
Expand All @@ -22,10 +23,6 @@

logger: logging.Logger = logging.getLogger(__name__)

# We don't want to generate a massive number of dataProcesses for a single connector.
# This is primarily used as a safeguard to prevent performance issues.
MAX_JOBS_PER_CONNECTOR = 1000


class FivetranLogAPI:
def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
Expand Down Expand Up @@ -91,55 +88,51 @@ def _query(self, query: str) -> List[Dict]:
resp = self.engine.execute(query)
return [row for row in resp]

def _get_column_lineage_metadata(self) -> Dict[str, List]:
def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
"""
Return's dict of column lineage metadata with key as '<SOURCE_TABLE_ID>-<DESTINATION_TABLE_ID>'
Returns dict of column lineage metadata with key as (<SOURCE_TABLE_ID>, <DESTINATION_TABLE_ID>)
"""
all_column_lineage: Dict[str, List] = {}
all_column_lineage = defaultdict(list)
column_lineage_result = self._query(
self.fivetran_log_query.get_column_lineage_query()
)
for column_lineage in column_lineage_result:
key = f"{column_lineage[Constant.SOURCE_TABLE_ID]}-{column_lineage[Constant.DESTINATION_TABLE_ID]}"
if key not in all_column_lineage:
all_column_lineage[key] = [column_lineage]
else:
all_column_lineage[key].append(column_lineage)
return all_column_lineage
key = (
column_lineage[Constant.SOURCE_TABLE_ID],
column_lineage[Constant.DESTINATION_TABLE_ID],
)
all_column_lineage[key].append(column_lineage)
return dict(all_column_lineage)

def _get_connectors_table_lineage_metadata(self) -> Dict[str, List]:
def _get_table_lineage_metadata(self) -> Dict[str, List]:
"""
Return's dict of table lineage metadata with key as 'CONNECTOR_ID'
Returns dict of table lineage metadata with key as 'CONNECTOR_ID'
"""
connectors_table_lineage_metadata: Dict[str, List] = {}
connectors_table_lineage_metadata = defaultdict(list)
table_lineage_result = self._query(
self.fivetran_log_query.get_table_lineage_query()
)
for table_lineage in table_lineage_result:
if (
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
not in connectors_table_lineage_metadata
):
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
] = [table_lineage]
else:
connectors_table_lineage_metadata[
table_lineage[Constant.CONNECTOR_ID]
].append(table_lineage)
return connectors_table_lineage_metadata
].append(table_lineage)
return dict(connectors_table_lineage_metadata)

def _get_table_lineage(
def _extract_connector_lineage(
self,
column_lineage_metadata: Dict[str, List],
table_lineage_result: Optional[List],
column_lineage_metadata: Dict[Tuple[str, str], List],
) -> List[TableLineage]:
table_lineage_list: List[TableLineage] = []
if table_lineage_result is None:
return table_lineage_list
for table_lineage in table_lineage_result:
# Join the column lineage into the table lineage.
column_lineage_result = column_lineage_metadata.get(
f"{table_lineage[Constant.SOURCE_TABLE_ID]}-{table_lineage[Constant.DESTINATION_TABLE_ID]}"
(
table_lineage[Constant.SOURCE_TABLE_ID],
table_lineage[Constant.DESTINATION_TABLE_ID],
)
)
column_lineage_list: List[ColumnLineage] = []
if column_lineage_result:
Expand All @@ -152,6 +145,7 @@ def _get_table_lineage(
)
for column_lineage in column_lineage_result
]

table_lineage_list.append(
TableLineage(
source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}",
Expand All @@ -167,14 +161,9 @@ def _get_all_connector_sync_logs(
) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]:
sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {}

# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

query = self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
query = self.fivetran_log_query.get_sync_logs_query(
syncs_interval=syncs_interval,
max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR,
connector_ids=formatted_connector_ids,
connector_ids=connector_ids,
)

for row in self._query(query):
Expand Down Expand Up @@ -234,13 +223,13 @@ def get_user_email(self, user_id: str) -> Optional[str]:
return None
return self._get_users().get(user_id)

def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
for connector in connectors:
connector.table_lineage = self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
connector.lineage = self._extract_connector_lineage(
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
column_lineage_metadata=column_lineage_metadata,
)

def _fill_connectors_jobs(
Expand All @@ -262,6 +251,7 @@ def get_allowed_connectors_list(
) -> List[Connector]:
connectors: List[Connector] = []
with report.metadata_extraction_perf.connectors_metadata_extraction_sec:
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
Expand All @@ -279,12 +269,20 @@ def get_allowed_connectors_list(
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_id=connector[Constant.CONNECTING_USER_ID],
table_lineage=[],
jobs=[],
lineage=[], # filled later
jobs=[], # filled later
)
)

if not connectors:
# Some of our queries don't work well when there's no connectors, since
# we push down connector id filters.
return []

with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
self._fill_connectors_table_lineage(connectors)
logger.info("Fetching connector lineage")
self._fill_connectors_lineage(connectors)
with report.metadata_extraction_perf.connectors_jobs_extraction_sec:
logger.info("Fetching connector job run history")
self._fill_connectors_jobs(connectors, syncs_interval)
return connectors
Loading

0 comments on commit dd1a06f

Please sign in to comment.