Skip to content

Commit

Permalink
fix(ingest/profiling): Add option to enable external table profiling (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 14, 2023
1 parent 4354af2 commit 0ea6145
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ class GEProfilingConfig(ConfigModel):
"Applicable only if `use_sampling` is set to True.",
)

profile_external_tables: bool = Field(
default=False,
description="Whether to profile external tables. Only Snowflake and Redshift supports this.",
)

@pydantic.root_validator(pre=True)
def deprecate_bigquery_temp_table_schema(cls, values):
# TODO: Update docs to remove mention of this field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def get_workunits(
if not self.config.schema_pattern.allowed(schema):
continue
for table in tables[db].get(schema, {}):
if (
not self.config.profiling.profile_external_tables
and table.type == "EXTERNAL_TABLE"
):
self.report.profiling_skipped_other[schema] += 1
logger.info(
f"Skipping profiling of external table {db}.{schema}.{table.name}"
)
continue
# Emit the profile work unit
profile_request = self.get_profile_request(table, schema, db)
if profile_request is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ def get_workunits(
profile_requests = []
for schema in database.schemas:
for table in db_tables[schema.name]:
if (
not self.config.profiling.profile_external_tables
and table.type == "EXTERNAL TABLE"
):
logger.info(
f"Skipping profiling of external table {database.name}.{schema.name}.{table.name}"
)
self.report.profiling_skipped_other[schema.name] += 1
continue

profile_request = self.get_profile_request(
table, schema.name, database.name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def get_precise_native_type(self):

@dataclass
class SnowflakeTable(BaseTable):
type: Optional[str] = None
clustering_key: Optional[str] = None
pk: Optional[SnowflakePK] = None
columns: List[SnowflakeColumn] = field(default_factory=list)
Expand Down Expand Up @@ -265,6 +266,7 @@ def get_tables_for_database(
tables[table["TABLE_SCHEMA"]].append(
SnowflakeTable(
name=table["TABLE_NAME"],
type=table["TABLE_TYPE"],
created=table["CREATED"],
last_altered=table["LAST_ALTERED"],
size_in_bytes=table["BYTES"],
Expand All @@ -288,6 +290,7 @@ def get_tables_for_schema(
tables.append(
SnowflakeTable(
name=table["TABLE_NAME"],
type=table["TABLE_TYPE"],
created=table["CREATED"],
last_altered=table["LAST_ALTERED"],
size_in_bytes=table["BYTES"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DetailedProfilerReportMixin:
profiling_skipped_row_limit: TopKDict[str, int] = field(
default_factory=int_top_k_dict
)

profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)

num_tables_not_eligible_profiling: Dict[str, int] = field(
default_factory=int_top_k_dict
)
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def default_query_results( # noqa: C901
{
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": "TABLE_{}".format(tbl_idx),
"TABLE_TYPE": "BASE TABLE",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"BYTES": 1024,
Expand Down

0 comments on commit 0ea6145

Please sign in to comment.