Skip to content

Commit

Permalink
support retryable exceptions during query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Dec 4, 2024
1 parent 6c41bed commit cf28b9f
Showing 1 changed file with 70 additions and 15 deletions.
85 changes: 70 additions & 15 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError, NotImplementedError
from dbt_common.exceptions import DbtInternalError, NotImplementedError, DbtRuntimeError
from dbt_common.utils import cast_to_str

from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.base.connections import SleepTime
from dbt.adapters.contracts.connection import (
AdapterResponse,
Connection,
ConnectionState,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import (
ConnectionUsed,
SQLCommit,
SQLQuery,
SQLQueryStatus,
SQLQueryStatus, AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -56,11 +58,14 @@ def cancel_open(self) -> List[str]:
return names

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
) -> Tuple[Connection, Any]:
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
Expand Down Expand Up @@ -90,7 +95,14 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
self._retryable_cursor_execute(
execute_fn=cursor.execute,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
retry_timeout=retry_timeout
)

result = self.get_response(cursor)

Expand All @@ -113,7 +125,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse:

@classmethod
def process_results(
cls, column_names: Iterable[str], rows: Iterable[Any]
cls, column_names: Iterable[str], rows: Iterable[Any]
) -> Iterator[Dict[str, Any]]:
unique_col_names = dict() # type: ignore[var-annotated]
for idx in range(len(column_names)): # type: ignore[arg-type]
Expand Down Expand Up @@ -145,11 +157,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab
return table_from_data_flat(data, column_names)

def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
) -> Tuple[AdapterResponse, "agate.Table"]:
from dbt_common.clients.agate_helper import empty_table

Expand Down Expand Up @@ -199,3 +211,46 @@ def commit(self):
connection.transaction_open = False

return connection

def _retryable_cursor_execute(self,
execute_fn: Callable,
sql: str,
bindings: Optional[Any] = None,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
_attempts: int = 0,
) -> None:
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
if timeout < 0:
raise DbtRuntimeError(
"retry_timeout cannot be negative or return a negative time."
)

try:
execute_fn(sql, bindings)

except tuple(retryable_exceptions) as e:
retry_limit -= 1
if retry_limit <= 0:
raise e
fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)} when attempting to execute a query.\n"
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n"
f"Error:\n{e}"
)
)

time.sleep(timeout)
return self._retryable_cursor_execute(
execute_fn=execute_fn,
sql=sql,
retry_limit=retry_limit - 1,
retry_timeout=retry_timeout,
retryable_exceptions=retryable_exceptions,
_attempts=_attempts + 1,
)

except Exception as e:
raise e

0 comments on commit cf28b9f

Please sign in to comment.