Skip to content

Commit

Permalink
Lint.
Browse files Browse the repository at this point in the history
  • Loading branch information
VersusFacit committed Dec 5, 2024
1 parent 1eeefc3 commit 128bb28
Showing 1 changed file with 48 additions and 37 deletions.
85 changes: 48 additions & 37 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Callable,
Type,
Union,
)

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
Expand All @@ -14,12 +26,12 @@
Connection,
ConnectionState,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import (
ConnectionUsed,
SQLCommit,
SQLQuery,
SQLQueryStatus, AdapterEventDebug,
SQLQueryStatus,
AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -58,14 +70,14 @@ def cancel_open(self) -> List[str]:
return names

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
) -> Tuple[Connection, Any]:
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
Expand Down Expand Up @@ -96,12 +108,12 @@ def add_query(

cursor = connection.handle.cursor()
self._retryable_cursor_execute(
execute_fn=cursor.execute,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
retry_timeout=retry_timeout
execute_fn=cursor.execute,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
retry_timeout=retry_timeout,
)

result = self.get_response(cursor)
Expand All @@ -125,7 +137,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse:

@classmethod
def process_results(
cls, column_names: Iterable[str], rows: Iterable[Any]
cls, column_names: Iterable[str], rows: Iterable[Any]
) -> Iterator[Dict[str, Any]]:
unique_col_names = dict() # type: ignore[var-annotated]
for idx in range(len(column_names)): # type: ignore[arg-type]
Expand Down Expand Up @@ -157,11 +169,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab
return table_from_data_flat(data, column_names)

def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
) -> Tuple[AdapterResponse, "agate.Table"]:
from dbt_common.clients.agate_helper import empty_table

Expand Down Expand Up @@ -212,20 +224,19 @@ def commit(self):

return connection

def _retryable_cursor_execute(self,
execute_fn: Callable,
sql: str,
bindings: Optional[Any] = None,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
_attempts: int = 0,
) -> None:
def _retryable_cursor_execute(
self,
execute_fn: Callable,
sql: str,
bindings: Optional[Any] = None,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
_attempts: int = 0,
) -> None:
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
if timeout < 0:
raise DbtRuntimeError(
"retry_timeout cannot be negative or return a negative time."
)
raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.")

try:
execute_fn(sql, bindings)
Expand All @@ -237,8 +248,8 @@ def _retryable_cursor_execute(self,
fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)} when attempting to execute a query.\n"
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n"
f"Error:\n{e}"
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n"
f"Error:\n{e}"
)
)

Expand Down

0 comments on commit 128bb28

Please sign in to comment.