diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 76f24bfd63d47..8101f0110509e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -203,7 +203,9 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: with PerfTimer() as timer: table = thread_local.local_catalog.load_table(dataset_path) time_taken = timer.elapsed_seconds() - self.report.report_table_load_time(time_taken) + self.report.report_table_load_time( + time_taken, dataset_name, table.metadata_location + ) LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}") yield from self._create_iceberg_workunit(dataset_name, table) except NoSuchPropertyException as e: @@ -247,7 +249,10 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." ) except Exception as e: - self.report.report_failure("general", f"Failed to create workunit: {e}") + self.report.report_failure( + "general", + f"Failed to create workunit for dataset {dataset_name}: {e}", + ) LOGGER.exception( f"Exception while processing table {dataset_path}, skipping it.", ) @@ -312,7 +317,9 @@ def _create_iceberg_workunit( dataset_snapshot.aspects.append(schema_metadata) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - self.report.report_table_processing_time(timer.elapsed_seconds()) + self.report.report_table_processing_time( + timer.elapsed_seconds(), dataset_name, table.metadata_location + ) yield MetadataWorkUnit(id=dataset_name, mce=mce) dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 4a7f6bf4d60c1..83fe3d1c079f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -5,6 +5,7 @@ from humanfriendly import format_timespan from pydantic import Field, validator from pyiceberg.catalog import Catalog, load_catalog +from sortedcontainers import SortedList from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DatasetSourceConfigMixin @@ -146,19 +147,40 @@ def get_catalog(self) -> Catalog: return load_catalog(name=catalog_name, **catalog_config) +class TopTableTimings: + _VALUE_FIELD: str = "timing" + top_entites: SortedList + _size: int + + def __init__(self, size: int = 10): + self._size = size + self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0)) + + def add(self, entity: Dict[str, Any]) -> None: + if self._VALUE_FIELD not in entity: + return + self.top_entites.add(entity) + if len(self.top_entites) > self._size: + self.top_entites.pop() + + def __str__(self) -> str: + if len(self.top_entites) == 0: + return "no timings reported" + return str(list(self.top_entites)) + + class TimingClass: - times: List[int] + times: SortedList def __init__(self): - self.times = [] + self.times = SortedList() - def add_timing(self, t): - self.times.append(t) + def add_timing(self, t: float) -> None: + self.times.add(t) - def __str__(self): + def __str__(self) -> str: if len(self.times) == 0: return "no timings reported" - self.times.sort() total = sum(self.times) avg = total / len(self.times) return str( @@ -180,6 +202,9 @@ class IcebergSourceReport(StaleEntityRemovalSourceReport): load_table_timings: TimingClass = field(default_factory=TimingClass) processing_table_timings: TimingClass = field(default_factory=TimingClass) profiling_table_timings: TimingClass = field(default_factory=TimingClass) + tables_load_timings: TopTableTimings = field(default_factory=TopTableTimings) + tables_profile_timings: TopTableTimings = field(default_factory=TopTableTimings) + tables_process_timings: TopTableTimings = field(default_factory=TopTableTimings) listed_namespaces: int = 0 total_listed_tables: int = 0 tables_listed_per_namespace: TopKDict[str, int] = field( @@ -201,11 +226,26 @@ def report_table_scanned(self, name: str) -> None: def report_dropped(self, ent_name: str) -> None: self.filtered.append(ent_name) - def report_table_load_time(self, t: float) -> None: + def report_table_load_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.load_table_timings.add_timing(t) + self.tables_load_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) - def report_table_processing_time(self, t: float) -> None: + def report_table_processing_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.processing_table_timings.add_timing(t) + self.tables_process_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) - def report_table_profiling_time(self, t: float) -> None: + def report_table_profiling_time( + self, t: float, table_name: str, table_metadata_location: str + ) -> None: self.profiling_table_timings.add_timing(t) + self.tables_profile_timings.add( + {"table": table_name, "timing": t, "metadata_file": table_metadata_location} + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py index 9cc6dd08544e4..7642cabbd1404 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py @@ -204,7 +204,9 @@ def profile_table( ) dataset_profile.fieldProfiles.append(column_profile) time_taken = timer.elapsed_seconds() - self.report.report_table_profiling_time(time_taken) + self.report.report_table_profiling_time( + time_taken, dataset_name, table.metadata_location + ) LOGGER.debug( f"Finished profiling of dataset: {dataset_name} in {time_taken}" )