From 0ea6145a9d491a1b882ba5a7a4667fb323d31dc4 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 15 Dec 2023 00:12:45 +0100 Subject: [PATCH] fix(ingest/profiling): Add option to enable external table profiling (#9463) --- .../datahub/ingestion/source/ge_profiling_config.py | 5 +++++ .../src/datahub/ingestion/source/redshift/profile.py | 9 +++++++++ .../ingestion/source/snowflake/snowflake_profiler.py | 10 ++++++++++ .../ingestion/source/snowflake/snowflake_schema.py | 3 +++ .../ingestion/source/sql/sql_generic_profiler.py | 3 +++ .../tests/integration/snowflake/common.py | 1 + 6 files changed, 31 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 24a3e520d8caf..f340a7b41b7af 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -167,6 +167,11 @@ class GEProfilingConfig(ConfigModel): "Applicable only if `use_sampling` is set to True.", ) + profile_external_tables: bool = Field( + default=False, + description="Whether to profile external tables. Only Snowflake and Redshift supports this.", + ) + @pydantic.root_validator(pre=True) def deprecate_bigquery_temp_table_schema(cls, values): # TODO: Update docs to remove mention of this field. diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index b05850cef6e94..eed82ec4d83e7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -48,6 +48,15 @@ def get_workunits( if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): + if ( + not self.config.profiling.profile_external_tables + and table.type == "EXTERNAL_TABLE" + ): + self.report.profiling_skipped_other[schema] += 1 + logger.info( + f"Skipping profiling of external table {db}.{schema}.{table.name}" + ) + continue # Emit the profile work unit profile_request = self.get_profile_request(table, schema, db) if profile_request is not None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 89857c4564267..4bda7da422e9d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -50,6 +50,16 @@ def get_workunits( profile_requests = [] for schema in database.schemas: for table in db_tables[schema.name]: + if ( + not self.config.profiling.profile_external_tables + and table.type == "EXTERNAL TABLE" + ): + logger.info( + f"Skipping profiling of external table {database.name}.{schema.name}.{table.name}" + ) + self.report.profiling_skipped_other[schema.name] += 1 + continue + profile_request = self.get_profile_request( table, schema.name, database.name ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index e5b214ba35e4b..9526bdec4b05d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -77,6 +77,7 @@ def get_precise_native_type(self): @dataclass class SnowflakeTable(BaseTable): + type: Optional[str] = None clustering_key: Optional[str] = None pk: Optional[SnowflakePK] = None columns: List[SnowflakeColumn] = field(default_factory=list) @@ -265,6 +266,7 @@ def get_tables_for_database( tables[table["TABLE_SCHEMA"]].append( SnowflakeTable( name=table["TABLE_NAME"], + type=table["TABLE_TYPE"], created=table["CREATED"], last_altered=table["LAST_ALTERED"], size_in_bytes=table["BYTES"], @@ -288,6 +290,7 @@ def get_tables_for_schema( tables.append( SnowflakeTable( name=table["TABLE_NAME"], + type=table["TABLE_TYPE"], created=table["CREATED"], last_altered=table["LAST_ALTERED"], size_in_bytes=table["BYTES"], diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index a2f91e5fae1a9..30fad9ad584c1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -35,6 +35,9 @@ class DetailedProfilerReportMixin: profiling_skipped_row_limit: TopKDict[str, int] = field( default_factory=int_top_k_dict ) + + profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) + num_tables_not_eligible_profiling: Dict[str, int] = field( default_factory=int_top_k_dict ) diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index b21cea5f0988d..53b87636068bf 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -79,6 +79,7 @@ def default_query_results( # noqa: C901 { "TABLE_SCHEMA": "TEST_SCHEMA", "TABLE_NAME": "TABLE_{}".format(tbl_idx), + "TABLE_TYPE": "BASE TABLE", "CREATED": datetime(2021, 6, 8, 0, 0, 0, 0), "LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0), "BYTES": 1024,