diff --git a/dbt/adapters/impala/connections.py b/dbt/adapters/impala/connections.py index 7ed6fdc..4c301e5 100644 --- a/dbt/adapters/impala/connections.py +++ b/dbt/adapters/impala/connections.py @@ -29,7 +29,7 @@ from dbt.events.functions import fire_event from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus -from dbt.logger import GLOBAL_LOGGER as LOGGER +from dbt.events import AdapterLogger import impala.dbapi from impala.error import DatabaseError @@ -41,10 +41,13 @@ import json +from dbt.adapters.impala.__version__ import version as ADAPTER_VERSION + DEFAULT_IMPALA_HOST = "localhost" DEFAULT_IMPALA_PORT = 21050 DEFAULT_MAX_RETRIES = 3 +logger = AdapterLogger("Impala") @dataclass class ImpalaCredentials(Credentials): @@ -119,22 +122,22 @@ def exception_handler(self, sql: str): try: yield except HttpError as httpError: - LOGGER.debug("Authorization error: {}".format(httpError)) + logger.debug("Authorization error: {}".format(httpError)) raise dbt.exceptions.RuntimeException ("HTTP Authorization error: " + str(httpError) + ", please check your credentials") except HiveServer2Error as servError: - LOGGER.debug("Server connection error: {}".format(servError)) + logger.debug("Server connection error: {}".format(servError)) raise dbt.exceptions.RuntimeException ("Unable to establish connection to Impala server: " + str(servError)) except DatabaseError as dbError: - LOGGER.debug("Database connection error: {}".format(str(dbError))) + logger.debug("Database connection error: {}".format(str(dbError))) raise dbt.exceptions.DatabaseException("Database Connection error: " + str(dbError)) except Exception as exc: - LOGGER.debug("Error running SQL: {}".format(sql)) + logger.debug("Error running SQL: {}".format(sql)) raise dbt.exceptions.RuntimeException(str(exc)) @classmethod def open(cls, connection): if connection.state == ConnectionState.OPEN: - LOGGER.debug("Connection is already open, skipping open.") + logger.debug("Connection is already open, skipping open.") return connection credentials = connection.credentials @@ -148,6 +151,8 @@ def open(cls, connection): if ( credentials.auth_type == "LDAP" or credentials.auth_type == "ldap" ): # ldap connection + custom_user_agent = 'dbt/cloudera-impala-v' + ADAPTER_VERSION + logger.debug("Using user agent: {}".format(custom_user_agent)) handle = impala.dbapi.connect( host=credentials.host, port=credentials.port, @@ -158,6 +163,7 @@ def open(cls, connection): use_ssl=credentials.use_ssl, http_path=credentials.http_path, retries=credentials.retries, + user_agent=custom_user_agent ) auth_type = "ldap" elif ( @@ -186,7 +192,7 @@ def open(cls, connection): connection.state = ConnectionState.OPEN connection.handle = handle except Exception as ex: - LOGGER.debug("Connection error {}".format(ex)) + logger.debug("Connection error {}".format(ex)) connection_ex = ex connection.state = ConnectionState.FAIL connection.handle = None @@ -232,7 +238,7 @@ def close(cls, connection): return connection except Exception as err: - LOGGER.debug(f"Error closing connection {err}") + logger.debug(f"Error closing connection {err}") @classmethod def get_response(cls, cursor): @@ -243,16 +249,16 @@ def cancel(self, connection): connection.handle.close() def add_begin_query(self, *args, **kwargs): - LOGGER.debug("NotImplemented: add_begin_query") + logger.debug("NotImplemented: add_begin_query") def add_commit_query(self, *args, **kwargs): - LOGGER.debug("NotImplemented: add_commit_query") + logger.debug("NotImplemented: add_commit_query") def commit(self, *args, **kwargs): - LOGGER.debug("NotImplemented: commit") + logger.debug("NotImplemented: commit") def rollback(self, *args, **kwargs): - LOGGER.debug("NotImplemented: rollback") + logger.debug("NotImplemented: rollback") def add_query( self, @@ -272,7 +278,7 @@ def add_query( additional_info = json.loads(self.query_header.comment.query_comment.strip()) except Exception as ex: # silently ignore error for parsing additional_info = {} - LOGGER.debug(f"Unable to get query header {ex}") + logger.debug(f"Unable to get query header {ex}") with self.exception_handler(sql): if abridge_sql_log: diff --git a/setup.py b/setup.py index c50ddc9..8c50419 100644 --- a/setup.py +++ b/setup.py @@ -65,7 +65,7 @@ def _get_dbt_core_version(): include_package_data=True, install_requires=[ 'dbt-core~={}'.format(dbt_core_version), - "impyla>=0.18a5", + "impyla==0.18a7", "python-decouple>=3.6" ], classifiers=[