Skip to content

Commit

Permalink
fix(ingest/bq): do not query PARTITIONS for biglake tables (datahub-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Sep 27, 2024
1 parent 9ffafb6 commit 6a58493
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class BigqueryTable(BaseTable):
long_term_billable_bytes: Optional[int] = None
partition_info: Optional[PartitionInfo] = None
columns_ignore_from_profiling: List[str] = field(default_factory=list)
external: bool = False


@dataclass
Expand Down Expand Up @@ -252,7 +253,16 @@ def get_datasets_for_project_id(
self.report.num_list_datasets_api_requests += 1
datasets = self.bq_client.list_datasets(project_id, max_results=maxResults)
return [
BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets
BigqueryDataset(
name=d.dataset_id,
labels=d.labels,
location=(
d._properties.get("location")
if hasattr(d, "_properties") and isinstance(d._properties, dict)
else None
),
)
for d in datasets
]

# This is not used anywhere
Expand Down Expand Up @@ -295,12 +305,12 @@ def get_tables_for_dataset(
dataset_name: str,
tables: Dict[str, TableListItem],
report: BigQueryV2Report,
with_data_read_permission: bool = False,
with_partitions: bool = False,
) -> Iterator[BigqueryTable]:
with PerfTimer() as current_timer:
filter_clause: str = ", ".join(f"'{table}'" for table in tables.keys())

if with_data_read_permission:
if with_partitions:
query_template = BigqueryQuery.tables_for_dataset
else:
query_template = BigqueryQuery.tables_for_dataset_without_partition_data
Expand Down Expand Up @@ -374,6 +384,7 @@ def _make_bigquery_table(
num_partitions=table.get("num_partitions"),
active_billable_bytes=table.get("active_billable_bytes"),
long_term_billable_bytes=table.get("long_term_billable_bytes"),
external=(table.table_type == BigqueryTableType.EXTERNAL),
)

def get_views_for_dataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit]
)

def gen_dataset_containers(
self, dataset: str, project_id: str, tags: Optional[Dict[str, str]] = None
self,
dataset: str,
project_id: str,
tags: Optional[Dict[str, str]] = None,
extra_properties: Optional[Dict[str, str]] = None,
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(project_id, dataset)

Expand Down Expand Up @@ -272,6 +276,7 @@ def gen_dataset_containers(
else None
),
tags=tags_joined,
extra_properties=extra_properties,
)

def _process_project(
Expand Down Expand Up @@ -400,7 +405,14 @@ def _process_schema(

if self.config.include_schema_metadata:
yield from self.gen_dataset_containers(
dataset_name, project_id, bigquery_dataset.labels
dataset_name,
project_id,
bigquery_dataset.labels,
(
{"location": bigquery_dataset.location}
if bigquery_dataset.location
else None
),
)

columns = None
Expand Down Expand Up @@ -445,7 +457,7 @@ def _process_schema(

if self.config.include_tables:
db_tables[dataset_name] = list(
self.get_tables_for_dataset(project_id, dataset_name)
self.get_tables_for_dataset(project_id, bigquery_dataset)
)

for table in db_tables[dataset_name]:
Expand Down Expand Up @@ -686,7 +698,9 @@ def gen_table_dataset_workunits(
if table.max_shard_id:
custom_properties["max_shard_id"] = str(table.max_shard_id)
custom_properties["is_sharded"] = str(True)
sub_types = ["sharded table"] + sub_types
sub_types = [DatasetSubTypes.SHARDED_TABLE] + sub_types
if table.external:
sub_types = [DatasetSubTypes.EXTERNAL_TABLE] + sub_types

tags_to_add = None
if table.labels and self.config.capture_table_label_as_tag:
Expand Down Expand Up @@ -971,25 +985,36 @@ def gen_schema_metadata(
def get_tables_for_dataset(
self,
project_id: str,
dataset_name: str,
dataset: BigqueryDataset,
) -> Iterable[BigqueryTable]:
# In bigquery there is no way to query all tables in a Project id
with PerfTimer() as timer:

# PARTITIONS INFORMATION_SCHEMA view is not available for BigLake tables
# based on Amazon S3 and Blob Storage data.
# https://cloud.google.com/bigquery/docs/omni-introduction#limitations
# Omni Locations - https://cloud.google.com/bigquery/docs/omni-introduction#locations
with_partitions = self.config.have_table_data_read_permission and not (
dataset.location
and dataset.location.lower().startswith(("aws-", "azure-"))
)

# Partitions view throw exception if we try to query partition info for too many tables
# so we have to limit the number of tables we query partition info.
# The conn.list_tables returns table infos that information_schema doesn't contain and this
# way we can merge that info with the queried one.
# https://cloud.google.com/bigquery/docs/information-schema-partitions
max_batch_size: int = (
self.config.number_of_datasets_process_in_batch
if not self.config.have_table_data_read_permission
else self.config.number_of_datasets_process_in_batch_if_profiling_enabled
)
if with_partitions:
max_batch_size = (
self.config.number_of_datasets_process_in_batch_if_profiling_enabled
)
else:
max_batch_size = self.config.number_of_datasets_process_in_batch

# We get the list of tables in the dataset to get core table properties and to be able to process the tables in batches
# We collect only the latest shards from sharded tables (tables with _YYYYMMDD suffix) and ignore temporary tables
table_items = self.get_core_table_details(
dataset_name, project_id, self.config.temp_table_dataset_prefix
dataset.name, project_id, self.config.temp_table_dataset_prefix
)

items_to_get: Dict[str, TableListItem] = {}
Expand All @@ -998,23 +1023,23 @@ def get_tables_for_dataset(
if len(items_to_get) % max_batch_size == 0:
yield from self.schema_api.get_tables_for_dataset(
project_id,
dataset_name,
dataset.name,
items_to_get,
with_data_read_permission=self.config.have_table_data_read_permission,
with_partitions=with_partitions,
report=self.report,
)
items_to_get.clear()

if items_to_get:
yield from self.schema_api.get_tables_for_dataset(
project_id,
dataset_name,
dataset.name,
items_to_get,
with_data_read_permission=self.config.have_table_data_read_permission,
with_partitions=with_partitions,
report=self.report,
)

self.report.metadata_extraction_sec[f"{project_id}.{dataset_name}"] = round(
self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
timer.elapsed_seconds(), 2
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def metadata_read_capability_test(
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=config.have_table_data_read_permission,
with_partitions=config.have_table_data_read_permission,
report=BigQueryV2Report(),
)
if len(list(tables)) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ def get_workunits(
f"^{normalized_table_name}.{column}$"
)

if table.external and not self.config.profiling.profile_external_tables:
self.report.profiling_skipped_other[f"{project_id}.{dataset}"] += 1
logger.info(
f"Skipping profiling of external table {project_id}.{dataset}.{table.name}"
)
continue

# Emit the profile work unit
logger.debug(
f"Creating profile request for table {normalized_table_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class DatasetSubTypes(StrEnum):
POWERBI_DATASET_TABLE = "PowerBI Dataset Table"
QLIK_DATASET = "Qlik Dataset"
BIGQUERY_TABLE_SNAPSHOT = "Bigquery Table Snapshot"
SHARDED_TABLE = "Sharded Table"
EXTERNAL_TABLE = "External Table"
SIGMA_DATASET = "Sigma Dataset"
SAC_MODEL = "Model"
SAC_IMPORT_DATA_MODEL = "Import Data Model"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@
"platform": "bigquery",
"env": "PROD",
"project_id": "project-id-1",
"dataset_id": "bigquery-dataset-1"
"dataset_id": "bigquery-dataset-1",
"location": "US"
},
"externalUrl": "https://console.cloud.google.com/bigquery?project=project-id-1&ws=!1m4!1m3!3m2!1sproject-id-1!2sbigquery-dataset-1",
"name": "bigquery-dataset-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ def test_bigquery_v2_ingest(
mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json")

dataset_name = "bigquery-dataset-1"
get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)]
get_datasets_for_project_id.return_value = [
BigqueryDataset(name=dataset_name, location="US")
]

table_list_item = TableListItem(
{"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}}
Expand Down Expand Up @@ -321,7 +323,9 @@ def test_bigquery_queries_v2_ingest(
mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json")

dataset_name = "bigquery-dataset-1"
get_datasets_for_project_id.return_value = [BigqueryDataset(name=dataset_name)]
get_datasets_for_project_id.return_value = [
BigqueryDataset(name=dataset_name, location="US")
]

table_list_item = TableListItem(
{"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}}
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ def test_table_processing_logic(

_ = list(
schema_gen.get_tables_for_dataset(
project_id="test-project", dataset_name="test-dataset"
project_id="test-project", dataset=BigqueryDataset("test-dataset")
)
)

Expand Down Expand Up @@ -890,7 +890,7 @@ def test_table_processing_logic_date_named_tables(

_ = list(
schema_gen.get_tables_for_dataset(
project_id="test-project", dataset_name="test-dataset"
project_id="test-project", dataset=BigqueryDataset("test-dataset")
)
)

Expand Down

0 comments on commit 6a58493

Please sign in to comment.