Skip to content

Commit

Permalink
Handle warnings from dbt ls commands (Issue #12) (#15)
Browse files Browse the repository at this point in the history
* Handle warnings from dbt ls commands

Signed-off-by: Robert Astel <[email protected]>

* Reformat with Black

Signed-off-by: Robert Astel <[email protected]>

* Make json default for dbt ls, do not rerun dbt ls unnecessarily

Signed-off-by: Robert Astel <[email protected]>
  • Loading branch information
robastel authored Jan 3, 2022
1 parent 3819164 commit 949c0eb
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 37 deletions.
51 changes: 35 additions & 16 deletions dbt_invoke/internal/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import platform

import yaml

from dbt.task.base import get_nearest_project_dir

MACROS = {
Expand All @@ -21,6 +20,9 @@
"\n{% endmacro %}\n"
)
}
DBT_GLOBAL_ARGS = {
'log-format': 'json',
}
DBT_LS_ARG_HELP = (
'An argument for listing dbt resources (run "dbt ls --help" for details)'
)
Expand Down Expand Up @@ -126,7 +128,7 @@ def dbt_ls(
ctx,
supported_resource_types=None,
hide=True,
output='path',
output='json',
logger=None,
**kwargs,
):
Expand Down Expand Up @@ -161,16 +163,36 @@ def dbt_ls(
default_arguments.append(f'{get_cli_kwargs(resource_type=rt)}')
default_arguments = ' '.join(default_arguments)
arguments = get_cli_kwargs(**kwargs)
all_arguments = f'{default_arguments} {arguments} --output {output}'
command = f"dbt ls {all_arguments}"
dbt_command_cli_args = f'{default_arguments} {arguments} --output {output}'
dbt_global_cli_args = get_cli_kwargs(**DBT_GLOBAL_ARGS)
command = f"dbt {dbt_global_cli_args} ls {dbt_command_cli_args}"
logger.debug(f'Running command: {command}')
result = ctx.run(command, hide=hide)
result_lines = result.stdout.splitlines()
if output == 'json':
result_lines = [
json.loads(result_json) for result_json in result_lines
]
return result_lines
result_lines_filtered = list()
for line in result_lines:
# Because we set the dbt global arg "--log-format json", if
# line is valid json then it may be an actual result or it
# may be some other output from dbt, like a warning.
try:
line_dict = json.loads(line)
# If line is not valid json, then it should be an actual
# result. This is because even when the "dbt ls" command
# arg "--output" is not set to json, non-result logs will
# still be in json format (due to the dbt global arg
# "--log-format json").
except ValueError:
result_lines_filtered.append(line)
continue
# If 'resource_type' is in line_dict, then this is likely
# an actual result and not something else like a warning.
if 'resource_type' in line_dict:
result_lines_filtered.append(line_dict)
# Else, if 'resource_type' is not in line_dict, this may be
# a warning from dbt, so log it.
else:
logger.warning(f'Extra output from "dbt ls" command: {line}')
return result_lines_filtered


def get_cli_kwargs(**kwargs):
Expand Down Expand Up @@ -227,19 +249,16 @@ def dbt_run_operation(
"""
if not logger:
logger = get_logger('')
dbt_kwargs = {
dbt_command_args = {
'project_dir': project_dir or ctx.config['project_path'],
'profiles_dir': profiles_dir,
'profile': profile,
'target': target,
'vars': vars,
'bypass_cache': bypass_cache,
}
dbt_cli_kwargs = get_cli_kwargs(**dbt_kwargs)

dbt_global_kwargs = {'log-format': 'json'}
dbt_global_cli_kwargs = get_cli_kwargs(**dbt_global_kwargs)

dbt_command_cli_args = get_cli_kwargs(**dbt_command_args)
dbt_global_cli_args = get_cli_kwargs(**DBT_GLOBAL_ARGS)
macro_kwargs = json.dumps(kwargs, sort_keys=False)
if platform.system().lower().startswith('win'):
# Format YAML string for Windows Command Prompt
Expand All @@ -253,7 +272,7 @@ def dbt_run_operation(
macro_kwargs = macro_kwargs.replace("'", """'"'"'""")
macro_kwargs = f"'{macro_kwargs}'"
command = (
f"dbt {dbt_global_cli_kwargs} run-operation {dbt_cli_kwargs}"
f"dbt {dbt_global_cli_args} run-operation {dbt_command_cli_args}"
f" {macro_name} --args {macro_kwargs}"
)
logger.debug(f'Running command: {command}')
Expand Down
2 changes: 1 addition & 1 deletion dbt_invoke/internal/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.3'
__version__ = '0.1.4'
42 changes: 26 additions & 16 deletions dbt_invoke/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import ast

from invoke import task

from dbt_invoke.internal import _utils
import ast

_LOGGER = _utils.get_logger('dbt-invoke')
_MACRO_NAME = '_log_columns_list'
Expand Down Expand Up @@ -258,27 +258,37 @@ def _transform_ls_results(ctx, **kwargs):
"""
# Run dbt ls to retrieve resource path and json information
_LOGGER.info('Searching for matching resources...')
result_lines_path = _utils.dbt_ls(
ctx,
supported_resource_types=_SUPPORTED_RESOURCE_TYPES,
logger=_LOGGER,
**kwargs,
)
result_lines_dict = _utils.dbt_ls(
potential_results = _utils.dbt_ls(
ctx,
supported_resource_types=_SUPPORTED_RESOURCE_TYPES,
logger=_LOGGER,
output='json',
**kwargs,
)
results = dict(zip(result_lines_path, result_lines_dict))
# Filter dictionary for existing files and supported resource types
results = {
k: v
for k, v in results.items()
if v['resource_type'] in _SUPPORTED_RESOURCE_TYPES
and Path(ctx.config['project_path'], k).exists()
}
potential_result_paths = None
results = dict()
for i, potential_result in enumerate(potential_results):
if 'original_file_path' in potential_result:
potential_result_path = potential_result['original_file_path']
# Before dbt version 0.20.0, original_file_path was not
# included in the json response of "dbt ls". For older
# versions of dbt, we need to run "dbt ls" with the
# "--output path" argument in order to retrieve paths
else:
if potential_result_paths is None:
potential_result_paths = _utils.dbt_ls(
ctx,
supported_resource_types=_SUPPORTED_RESOURCE_TYPES,
logger=_LOGGER,
output='path',
**kwargs,
)
assert len(potential_result_paths) == len(
potential_results
), 'Length of results differs from length of result details'
potential_result_path = potential_result_paths[i]
if Path(ctx.config['project_path'], potential_result_path).exists():
results[potential_result_path] = potential_result
_LOGGER.info(
f"Found {len(results)} matching resources in dbt project"
f' "{ctx.config["project_name"]}"'
Expand Down
7 changes: 3 additions & 4 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
import unittest
from pathlib import Path
from unittest.mock import patch
import sys
import pkg_resources
import shutil

import invoke
import sys

from dbt_invoke import properties
from dbt_invoke.internal import _utils

import pkg_resources
import shutil

PARENT_DIR = Path(__file__).parent


Expand Down
1 change: 1 addition & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def test_dbt_ls(self):
project_dir=self.project_dir,
profiles_dir=self.profiles_dir,
supported_resource_types=SUPPORTED_RESOURCE_TYPES,
output='path',
logger=self.logger,
**dbt_ls_kwargs,
)
Expand Down

0 comments on commit 949c0eb

Please sign in to comment.