From 56a563b8c5ab23c0a14d1af9d496def565056434 Mon Sep 17 00:00:00 2001 From: Jay Feldman <8128360+feldjay@users.noreply.github.com> Date: Tue, 27 Aug 2024 00:19:32 -0400 Subject: [PATCH] feat(ingest/bigquery): Add query job retries for transient errors (#11162) Co-authored-by: Gabe Lyons Co-authored-by: Gabe Lyons --- .../source/bigquery_v2/bigquery_schema.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index ba8462a6f5ff3..f248533edec8d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -6,6 +6,7 @@ from google.api_core import retry from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -155,8 +156,23 @@ def __init__( self.datacatalog_client = datacatalog_client def get_query_result(self, query: str) -> RowIterator: + def _should_retry(exc: BaseException) -> bool: + logger.debug(f"Exception occured for job query. Reason: {exc}") + # Jobs sometimes fail with transient errors. + # This is not currently handled by the python-bigquery client. + # https://github.com/googleapis/python-bigquery/issues/23 + return "Retrying the job may solve the problem" in str(exc) + logger.debug(f"Query : {query}") - resp = self.bq_client.query(query) + resp = self.bq_client.query( + query, + job_retry=retry.Retry( + predicate=lambda exc: ( + bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc) + ), + deadline=bq_retry.DEFAULT_JOB_RETRY._deadline, + ), + ) return resp.result() def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: