diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 0c6797cf..fc6c0210 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -77,8 +77,48 @@ def add_query( abridge_sql_log: bool = False, retryable_exceptions: Iterable[Type[Exception]] = [], retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, ) -> Tuple[Connection, Any]: + + """ + Retry function encapsulated here to avoid commitment to some + user-facing interface. Right now, Redshift commits to a 1 second + retry timeout so this serves as a default. + """ + def __execute_query_with_retry( + cursor: Any, + sql: str, + bindings: Optional[Any], + retryable_exceptions: Tuple[Type[Exception], ...], + retry_limit: int, + attempt: int, + ): + """ + A success sees the try exit cleanly and avoid any recursive + retries. Failure begins a sleep and retry routine. + """ + try: + cursor.execute(sql, bindings) + except retryable_exceptions as e: + # Cease retries and fail when limit is hit. + if attempt >= retry_limit: + raise e + + fire_event( + AdapterEventDebug( + message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}" + ) + ) + time.sleep(1) + + return self.__execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=1, + ) + connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: self.begin() @@ -107,13 +147,13 @@ def add_query( pre = time.perf_counter() cursor = connection.handle.cursor() - self._retryable_cursor_execute( - execute_fn=cursor.execute, + self.__execute_query_with_retry( + cursor=cursor, sql=sql, bindings=bindings, retryable_exceptions=retryable_exceptions, retry_limit=retry_limit, - retry_timeout=retry_timeout, + attempt=1, ) result = self.get_response(cursor) @@ -223,45 +263,3 @@ def commit(self): connection.transaction_open = False return connection - - def _retryable_cursor_execute( - self, - execute_fn: Callable, - sql: str, - bindings: Optional[Any] = None, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, - _attempts: int = 0, - ) -> None: - timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout - if timeout < 0: - raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.") - - try: - execute_fn(sql, bindings) - - except tuple(retryable_exceptions) as e: - retry_limit -= 1 - if retry_limit <= 0: - raise e - fire_event( - AdapterEventDebug( - message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" - f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" - f"Error:\n{e}" - ) - ) - - time.sleep(timeout) - return self._retryable_cursor_execute( - execute_fn=execute_fn, - sql=sql, - retry_limit=retry_limit - 1, - retry_timeout=retry_timeout, - retryable_exceptions=retryable_exceptions, - _attempts=_attempts + 1, - ) - - except Exception as e: - raise e