From 99d7eb756c09a3313a4c1bda6f96a0953004b58c Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Sat, 16 Sep 2023 02:06:04 +0530 Subject: [PATCH] feat(ingest/bigquery): support bigquery profiling with sampling (#8794) --- .../ingestion/source/ge_data_profiler.py | 222 ++++++++++++------ .../ingestion/source/ge_profiling_config.py | 20 +- 2 files changed, 162 insertions(+), 80 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 4394d108486be8..01e083d566168d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -616,6 +616,9 @@ def generate_dataset_profile( # noqa: C901 (complexity) logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries") self.query_combiner.flush() + if self.config.use_sampling and not self.config.limit: + self.update_dataset_batch_use_sampling(profile) + columns_profiling_queue: List[_SingleColumnSpec] = [] if columns_to_profile: for column in all_columns: @@ -737,6 +740,61 @@ def generate_dataset_profile( # noqa: C901 (complexity) self.query_combiner.flush() return profile + def update_dataset_batch_use_sampling(self, profile: DatasetProfileClass) -> None: + if ( + self.dataset.engine.dialect.name.lower() == BIGQUERY + and profile.rowCount + and profile.rowCount > self.config.sample_size + ): + """ + According to BigQuery Sampling Docs(https://cloud.google.com/bigquery/docs/table-sampling), + BigQuery does not cache the results of a query that includes a TABLESAMPLE clause and the + query may return different results every time. Calculating different column level metrics + on different sampling results is possible however each query execution would incur the cost + of reading data from storage. Also, using different table samples may create non-coherent + representation of column level metrics, for example, minimum value of a column in one sample + can be greater than maximum value of the column in another sample. + + It is observed that for a simple select * query with TABLESAMPLE, results are cached and + stored in temporary table. This can be (ab)used and all column level profiling calculations + can be performed against it. + + Risks: + 1. All the risks mentioned in notes of `create_bigquery_temp_table` are also + applicable here. + 2. TABLESAMPLE query may read entire table for small tables that are written + as single data block. This may incorrectly label datasetProfile's partition as + "SAMPLE", although profile is for entire table. + 3. Table Sampling in BigQuery is a Pre-GA (Preview) feature. + """ + sample_pc = 100 * self.config.sample_size / profile.rowCount + sql = ( + f"SELECT * FROM {str(self.dataset._table)} " + + f"TABLESAMPLE SYSTEM ({sample_pc:.3f} percent)" + ) + temp_table_name = create_bigquery_temp_table( + self, + sql, + self.dataset_name, + self.dataset.engine.engine.raw_connection(), + ) + if temp_table_name: + self.dataset._table = sa.text(temp_table_name) + logger.debug(f"Setting table name to be {self.dataset._table}") + + if ( + profile.partitionSpec + and profile.partitionSpec.type == PartitionTypeClass.FULL_TABLE + ): + profile.partitionSpec = PartitionSpecClass( + type=PartitionTypeClass.QUERY, partition="SAMPLE" + ) + elif ( + profile.partitionSpec + and profile.partitionSpec.type == PartitionTypeClass.PARTITION + ): + profile.partitionSpec.partition += " SAMPLE" + @dataclasses.dataclass class GEContext: @@ -961,84 +1019,18 @@ def _generate_single_profile( if platform == BIGQUERY and ( custom_sql or self.config.limit or self.config.offset ): - # On BigQuery, we need to bypass GE's mechanism for creating temporary tables because - # it requires create/delete table permissions. - import google.cloud.bigquery.job.query - from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor - - raw_connection = self.base_engine.raw_connection() - try: - cursor: "BigQueryCursor" = cast( - "BigQueryCursor", raw_connection.cursor() - ) - if custom_sql is not None: - # Note that limit and offset are not supported for custom SQL. - # Presence of custom SQL represents that the bigquery table - # is either partitioned or sharded - bq_sql = custom_sql - else: - bq_sql = f"SELECT * FROM `{table}`" - if self.config.limit: - bq_sql += f" LIMIT {self.config.limit}" - if self.config.offset: - bq_sql += f" OFFSET {self.config.offset}" - try: - cursor.execute(bq_sql) - except Exception as e: - if not self.config.catch_exceptions: - raise e - logger.exception( - f"Encountered exception while profiling {pretty_name}" - ) - self.report.report_warning( - pretty_name, - f"Profiling exception {e} when running custom sql {bq_sql}", - ) - return None - - # Great Expectations batch v2 API, which is the one we're using, requires - # a concrete table name against which profiling is executed. Normally, GE - # creates a table with an expiry time of 24 hours. However, we don't want the - # temporary tables to stick around that long, so we'd also have to delete them - # ourselves. As such, the profiler required create and delete table permissions - # on BigQuery. - # - # It turns out that we can (ab)use the BigQuery cached results feature - # to avoid creating temporary tables ourselves. For almost all queries, BigQuery - # will store the results in a temporary, cached results table when an explicit - # destination table is not provided. These tables are pretty easy to identify - # because they live in "anonymous datasets" and have a name that looks like - # "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3" - # These tables are per-user and per-project, so there's no risk of permissions escalation. - # As per the docs, the cached results tables typically have a lifetime of 24 hours, - # which should be plenty for our purposes. - # See https://cloud.google.com/bigquery/docs/cached-results for more details. - # - # The code below extracts the name of the cached results table from the query job - # and points GE to that table for profiling. - # - # Risks: - # 1. If the query results are larger than the maximum response size, BigQuery will - # not cache the results. According to the docs https://cloud.google.com/bigquery/quotas, - # the maximum response size is 10 GB compressed. - # 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed. - # 3. Tables with column-level security may not be cached, and tables with row-level - # security will not be cached. - # 4. BigQuery "discourages" using cached results directly, but notes that - # the current semantics do allow it. - # - # The better long-term solution would be to use a subquery avoid this whole - # temporary table dance. However, that would require either a) upgrading to - # use GE's batch v3 API or b) bypassing GE altogether. - - query_job: Optional[ - "google.cloud.bigquery.job.query.QueryJob" - ] = cursor._query_job - assert query_job - temp_destination_table = query_job.destination - bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}" - finally: - raw_connection.close() + if custom_sql is not None: + # Note that limit and offset are not supported for custom SQL. + bq_sql = custom_sql + else: + bq_sql = f"SELECT * FROM `{table}`" + if self.config.limit: + bq_sql += f" LIMIT {self.config.limit}" + if self.config.offset: + bq_sql += f" OFFSET {self.config.offset}" + bigquery_temp_table = create_bigquery_temp_table( + self, bq_sql, pretty_name, self.base_engine.raw_connection() + ) if platform == BIGQUERY: if bigquery_temp_table: @@ -1128,6 +1120,7 @@ def _get_ge_dataset( **batch_kwargs, }, ) + if platform == BIGQUERY: # This is done as GE makes the name as DATASET.TABLE # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups @@ -1153,3 +1146,76 @@ def _get_column_types_to_ignore(dialect_name: str) -> List[str]: return ["JSON"] return [] + + +def create_bigquery_temp_table( + instance: Union[DatahubGEProfiler, _SingleDatasetProfiler], + bq_sql: str, + table_pretty_name: str, + raw_connection: Any, +) -> Optional[str]: + # On BigQuery, we need to bypass GE's mechanism for creating temporary tables because + # it requires create/delete table permissions. + import google.cloud.bigquery.job.query + from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor + + try: + cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor()) + try: + cursor.execute(bq_sql) + except Exception as e: + if not instance.config.catch_exceptions: + raise e + logger.exception( + f"Encountered exception while profiling {table_pretty_name}" + ) + instance.report.report_warning( + table_pretty_name, + f"Profiling exception {e} when running custom sql {bq_sql}", + ) + return None + + # Great Expectations batch v2 API, which is the one we're using, requires + # a concrete table name against which profiling is executed. Normally, GE + # creates a table with an expiry time of 24 hours. However, we don't want the + # temporary tables to stick around that long, so we'd also have to delete them + # ourselves. As such, the profiler required create and delete table permissions + # on BigQuery. + # + # It turns out that we can (ab)use the BigQuery cached results feature + # to avoid creating temporary tables ourselves. For almost all queries, BigQuery + # will store the results in a temporary, cached results table when an explicit + # destination table is not provided. These tables are pretty easy to identify + # because they live in "anonymous datasets" and have a name that looks like + # "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3" + # These tables are per-user and per-project, so there's no risk of permissions escalation. + # As per the docs, the cached results tables typically have a lifetime of 24 hours, + # which should be plenty for our purposes. + # See https://cloud.google.com/bigquery/docs/cached-results for more details. + # + # The code below extracts the name of the cached results table from the query job + # and points GE to that table for profiling. + # + # Risks: + # 1. If the query results are larger than the maximum response size, BigQuery will + # not cache the results. According to the docs https://cloud.google.com/bigquery/quotas, + # the maximum response size is 10 GB compressed. + # 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed. + # 3. Tables with column-level security may not be cached, and tables with row-level + # security will not be cached. + # 4. BigQuery "discourages" using cached results directly, but notes that + # the current semantics do allow it. + # + # The better long-term solution would be to use a subquery avoid this whole + # temporary table dance. However, that would require either a) upgrading to + # use GE's batch v3 API or b) bypassing GE altogether. + + query_job: Optional[ + "google.cloud.bigquery.job.query.QueryJob" + ] = cursor._query_job + assert query_job + temp_destination_table = query_job.destination + bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}" + return bigquery_temp_table + finally: + raw_connection.close() diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 1488b55062b684..77761c529ba0b1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -145,10 +145,26 @@ class GEProfilingConfig(ConfigModel): # Hidden option - used for debugging purposes. catch_exceptions: bool = Field(default=True, description="") - partition_profiling_enabled: bool = Field(default=True, description="") + partition_profiling_enabled: bool = Field( + default=True, + description="Whether to profile partitioned tables. Only BigQuery supports this. " + "If enabled, latest partition data is used for profiling.", + ) partition_datetime: Optional[datetime.datetime] = Field( default=None, - description="For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.", + description="If specified, profile only the partition which matches this datetime. " + "If not specified, profile the latest partition. Only Bigquery supports this.", + ) + use_sampling: bool = Field( + default=True, + description="Whether to profile column level stats on sample of table. Only BigQuery supports this. " + "If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. ", + ) + + sample_size: int = Field( + default=1000, + description="Number of rows to be sampled from table for column level profiling." + "Applicable only if `use_sampling` is set to True.", ) @pydantic.root_validator(pre=True)