Skip to content

Commit

Permalink
Add custom user agent string when using LDAP connection to tag dbt ad…
Browse files Browse the repository at this point in the history
…apter connection. (#69)

Currently this user agent string is <b>dbt/cloudera-impala-v[ADAPTER_VERSION]</b>

Testplan (WIP):
a) Use the PR: cloudera/impyla#498 to first update impyla to enable this feature
b) In a sample dbt project, use dbt debug
c) Use Atlas to filter the above agent string and match the queries that were executed.
  • Loading branch information
tovganesh authored Sep 28, 2022
1 parent 5708c98 commit 255551a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
32 changes: 19 additions & 13 deletions dbt/adapters/impala/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from dbt.events.functions import fire_event
from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus

from dbt.logger import GLOBAL_LOGGER as LOGGER
from dbt.events import AdapterLogger

import impala.dbapi
from impala.error import DatabaseError
Expand All @@ -41,10 +41,13 @@

import json

from dbt.adapters.impala.__version__ import version as ADAPTER_VERSION

DEFAULT_IMPALA_HOST = "localhost"
DEFAULT_IMPALA_PORT = 21050
DEFAULT_MAX_RETRIES = 3

logger = AdapterLogger("Impala")

@dataclass
class ImpalaCredentials(Credentials):
Expand Down Expand Up @@ -119,22 +122,22 @@ def exception_handler(self, sql: str):
try:
yield
except HttpError as httpError:
LOGGER.debug("Authorization error: {}".format(httpError))
logger.debug("Authorization error: {}".format(httpError))
raise dbt.exceptions.RuntimeException ("HTTP Authorization error: " + str(httpError) + ", please check your credentials")
except HiveServer2Error as servError:
LOGGER.debug("Server connection error: {}".format(servError))
logger.debug("Server connection error: {}".format(servError))
raise dbt.exceptions.RuntimeException ("Unable to establish connection to Impala server: " + str(servError))
except DatabaseError as dbError:
LOGGER.debug("Database connection error: {}".format(str(dbError)))
logger.debug("Database connection error: {}".format(str(dbError)))
raise dbt.exceptions.DatabaseException("Database Connection error: " + str(dbError))
except Exception as exc:
LOGGER.debug("Error running SQL: {}".format(sql))
logger.debug("Error running SQL: {}".format(sql))
raise dbt.exceptions.RuntimeException(str(exc))

@classmethod
def open(cls, connection):
if connection.state == ConnectionState.OPEN:
LOGGER.debug("Connection is already open, skipping open.")
logger.debug("Connection is already open, skipping open.")
return connection

credentials = connection.credentials
Expand All @@ -148,6 +151,8 @@ def open(cls, connection):
if (
credentials.auth_type == "LDAP" or credentials.auth_type == "ldap"
): # ldap connection
custom_user_agent = 'dbt/cloudera-impala-v' + ADAPTER_VERSION
logger.debug("Using user agent: {}".format(custom_user_agent))
handle = impala.dbapi.connect(
host=credentials.host,
port=credentials.port,
Expand All @@ -158,6 +163,7 @@ def open(cls, connection):
use_ssl=credentials.use_ssl,
http_path=credentials.http_path,
retries=credentials.retries,
user_agent=custom_user_agent
)
auth_type = "ldap"
elif (
Expand Down Expand Up @@ -186,7 +192,7 @@ def open(cls, connection):
connection.state = ConnectionState.OPEN
connection.handle = handle
except Exception as ex:
LOGGER.debug("Connection error {}".format(ex))
logger.debug("Connection error {}".format(ex))
connection_ex = ex
connection.state = ConnectionState.FAIL
connection.handle = None
Expand Down Expand Up @@ -232,7 +238,7 @@ def close(cls, connection):

return connection
except Exception as err:
LOGGER.debug(f"Error closing connection {err}")
logger.debug(f"Error closing connection {err}")

@classmethod
def get_response(cls, cursor):
Expand All @@ -243,16 +249,16 @@ def cancel(self, connection):
connection.handle.close()

def add_begin_query(self, *args, **kwargs):
LOGGER.debug("NotImplemented: add_begin_query")
logger.debug("NotImplemented: add_begin_query")

def add_commit_query(self, *args, **kwargs):
LOGGER.debug("NotImplemented: add_commit_query")
logger.debug("NotImplemented: add_commit_query")

def commit(self, *args, **kwargs):
LOGGER.debug("NotImplemented: commit")
logger.debug("NotImplemented: commit")

def rollback(self, *args, **kwargs):
LOGGER.debug("NotImplemented: rollback")
logger.debug("NotImplemented: rollback")

def add_query(
self,
Expand All @@ -272,7 +278,7 @@ def add_query(
additional_info = json.loads(self.query_header.comment.query_comment.strip())
except Exception as ex: # silently ignore error for parsing
additional_info = {}
LOGGER.debug(f"Unable to get query header {ex}")
logger.debug(f"Unable to get query header {ex}")

with self.exception_handler(sql):
if abridge_sql_log:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_dbt_core_version():
include_package_data=True,
install_requires=[
'dbt-core~={}'.format(dbt_core_version),
"impyla>=0.18a5",
"impyla==0.18a7",
"python-decouple>=3.6"
],
classifiers=[
Expand Down

0 comments on commit 255551a

Please sign in to comment.